MB-16686: Remove sanity check while adding TAP over DCP 64/56564/3 v3.1.2
authorabhinavdangeti <abhinav@couchbase.com>
Fri, 30 Oct 2015 17:11:46 +0000 (10:11 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Fri, 30 Oct 2015 18:55:18 +0000 (18:55 +0000)
This check isn't accurate as certain TAP messages from
the producer carry no vbucket information - initialized to
zero (expected), as they aren't vbucket specific operations.
In such a scenario, if the TAP consumer needs to be created,
it wouldn't be allowed to if a DCP passive stream exists
for vbucket 0. This would break an online upgrade.

Change-Id: I310b9cf4dbaf652c233cba02de7ca72469efa89d
Reviewed-on: http://review.couchbase.org/56564
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/connmap.cc
src/connmap.h
src/ep_engine.cc
tests/ep_testsuite.cc

index 10262d7..23a2467 100644 (file)
@@ -957,8 +957,7 @@ DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
 
 }
 
-bool DcpConnMap::isPassiveStreamConnected(uint16_t vbucket) {
-    LockHolder lh(connsLock);
+bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(uint16_t vbucket) {
     std::list<connection_t>::iterator it;
     for(it = all.begin(); it != all.end(); it++) {
         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it->get());
@@ -979,15 +978,15 @@ ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
 {
     cb_assert(conn);
 
+    LockHolder lh(connsLock);
     /* Check if a stream (passive) for the vbucket is already present */
-    if (isPassiveStreamConnected(vbucket)) {
+    if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Failing to add passive stream, "
             "as one already exists for the vbucket!",
             conn->logHeader(), vbucket);
         return ENGINE_KEY_EEXISTS;
     }
 
-    LockHolder lh(connsLock);
     return conn->addStream(opaque, vbucket, flags);
 }
 
index a868074..4a28ac4 100644 (file)
@@ -464,12 +464,12 @@ public:
 
     void manageConnections();
 
-    bool isPassiveStreamConnected(uint16_t vbucket);
-
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
 private:
 
+    bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
+
     void disconnect_UNLOCKED(const void *cookie);
 
     void closeAllStreams_UNLOCKED();
index 044a548..255461b 100644 (file)
@@ -2547,14 +2547,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
             // tap producer is no longer connected..
             return ENGINE_DISCONNECT;
         } else {
-            // Create a new tap consumer only if a dcp stream is
-            // not active for the vbucket
-            if (dcpConnMap_->isPassiveStreamConnected(vbucket)) {
-                LOG(EXTENSION_LOG_WARNING, "(vb %d) Failing to add a TAP "
-                    "consumer, as a DCP passive stream is still live for the "
-                    "vbucket!", vbucket);
-                return ENGINE_KEY_EEXISTS;
-            }
             connection = tapConnMap->newConsumer(cookie);
             if (connection == NULL) {
                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
index dcb9385..007f3da 100644 (file)
@@ -4844,48 +4844,6 @@ static enum test_result test_dcp_add_stream_exists(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
-static enum test_result test_add_tap_rcvr_on_dcp_stream(ENGINE_HANDLE *h,
-                                                        ENGINE_HANDLE_V1 *h1) {
-
-    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
-          "Failed to set vbucket state.");
-
-    const void *cookie1 = testHarness.create_cookie();
-    uint32_t opaque = 0xFFFF0000;
-    uint32_t flags = 0;
-    const char *name = "unittest";
-    uint16_t nname = strlen(name);
-
-    /* Open DCP consumer connection */
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, 0, flags, (void*)name, nname),
-            "Failed dcp consumer open connection.");
-
-    /* Send add stream to consumer */
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.add_stream(h, cookie1, ++opaque, 0, 0),
-            "Add stream request failed");
-
-    const void *cookie2 = testHarness.create_cookie();
-    char eng_specific[9];
-    memset(eng_specific, 0, sizeof(eng_specific));
-    std::string key("key");
-    std::string val("value");
-    /* Creating a tap consumer */
-    checkeq(h1->tap_notify(h, cookie2, eng_specific, sizeof(eng_specific),
-                           1, 0, TAP_MUTATION, 1, key.c_str(), key.length(),
-                           828, 0, 0, PROTOCOL_BINARY_RAW_BYTES, val.c_str(),
-                           val.length(), 0),
-            ENGINE_KEY_EEXISTS,
-            "Tap_notify should've failed!");
-
-
-    testHarness.destroy_cookie(cookie1);
-    testHarness.destroy_cookie(cookie2);
-
-    return SUCCESS;
-}
-
 static enum test_result test_dcp_add_stream_nmvb(ENGINE_HANDLE *h,
                                                  ENGINE_HANDLE_V1 *h1) {
     const void *cookie = testHarness.create_cookie();
@@ -12258,9 +12216,6 @@ engine_test_t* get_tests(void) {
                  test_setup, teardown,
                  "dcp_enable_flow_control=true;dcp_enable_noop=false", prepare,
                  cleanup),
-        TestCase("test add tap consumer with existing dcp stream",
-                 test_add_tap_rcvr_on_dcp_stream,
-                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test add stream nmvb", test_dcp_add_stream_nmvb, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("test add stream prod exists", test_dcp_add_stream_prod_exists,