Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 82/65582/1
authorDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 13:59:04 +0000 (14:59 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 13:59:04 +0000 (14:59 +0100)
* couchbase/3.0.x:
  MB-19982: Don't hold connsLock for duration of dcp stats
  MB-19982: Fix potential deadlock between DcpConsumer::bufMutex & connsLock
  MB-14859: Handle quick successive BG Fetch of a key interleaved with exp pager

Change-Id: Ie192ce93370c3218948434794b335732a6a7ff18

1  2 
src/connmap.cc
src/connmap.h
src/ep.cc
src/ep_engine.cc
tests/ep_testsuite.cc

diff --cc src/connmap.cc
Simple merge
diff --cc src/connmap.h
@@@ -508,32 -488,10 +523,31 @@@ private
  
      bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
  
-     void disconnect_UNLOCKED(const void *cookie);
      void closeAllStreams_UNLOCKED();
  
+     // Guarded by the parent's classes `connsLock`
      std::list<connection_t> deadConnections;
 +
 +    SpinLock numBackfillsLock;
 +    /* Db file memory */
 +    static const uint32_t dbFileMem;
 +    uint16_t numActiveSnoozingBackfills;
 +    uint16_t maxActiveSnoozingBackfills;
 +    /* Max num of backfills we want to have irrespective of memory */
 +    static const uint16_t numBackfillsThreshold;
 +    /* Max percentage of memory we want backfills to occupy */
 +    static const uint8_t numBackfillsMemThreshold;
 +    /* Total memory used by all DCP consumer buffers */
 +    AtomicValue<size_t> aggrDcpConsumerBufferSize;
 +
 +    class DcpConfigChangeListener : public ValueChangedListener {
 +    public:
 +        DcpConfigChangeListener(DcpConnMap& connMap);
 +        virtual ~DcpConfigChangeListener() { }
 +        virtual void sizeValueChanged(const std::string &key, size_t value);
 +    private:
 +        DcpConnMap& myConnMap;
 +    };
  };
  
  
diff --cc src/ep.cc
Simple merge
Simple merge
@@@ -14523,6 -11876,151 +14523,101 @@@ static enum test_result test_mb19153(EN
      return SUCCESS;
  }
  
 -static void force_vbstate_to_25x(std::string dbname, int vbucket) {
 -    std::stringstream ss;
 -    ss << dbname << DIRECTORY_SEPARATOR_CHARACTER << vbucket << ".couch.1";
 -    std::string filename = ss.str();
 -    Db* handle;
 -    couchstore_error_t err = couchstore_open_db(filename.c_str(),
 -                                                COUCHSTORE_OPEN_FLAG_CREATE,
 -                                                &handle);
 -
 -    checkeq(COUCHSTORE_SUCCESS, err, "Failed to open new database");
 -
 -    // Create 2.5 _local/vbstate
 -    std::string vbstate2_5_x ="{\"state\": \"active\","
 -                              " \"checkpoint_id\": \"1\","
 -                              " \"max_deleted_seqno\": \"0\"}";
 -    LocalDoc vbstate;
 -    vbstate.id.buf = (char *)"_local/vbstate";
 -    vbstate.id.size = sizeof("_local/vbstate") - 1;
 -    vbstate.json.buf = (char *)vbstate2_5_x.c_str();
 -    vbstate.json.size = vbstate2_5_x.size();
 -    vbstate.deleted = 0;
 -
 -    err = couchstore_save_local_document(handle, &vbstate);
 -    checkeq(COUCHSTORE_SUCCESS, err, "Failed to write local document");
 -    couchstore_commit(handle);
 -    couchstore_close_db(handle);
 -}
 -
 -// Regression test for MB-19635
 -// Check that warming up from a 2.x couchfile doesn't end up with a UUID of 0
 -// we warmup 2 vbuckets and ensure they get unique IDs.
 -static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
 -                                                      ENGINE_HANDLE_V1 *h1) {
 -    std::string dbname = dbname_env;
 -
 -    force_vbstate_to_25x(dbname, 0);
 -    force_vbstate_to_25x(dbname, 1);
 -
 -    // Now shutdown engine force and restart to warmup from the 2.5.x data.
 -    testHarness.reload_engine(&h, &h1,
 -                              testHarness.engine_path,
 -                              testHarness.get_current_testcase()->cfg,
 -                              true, false);
 -    wait_for_warmup_complete(h, h1);
 -    uint64_t vb_uuid0 = get_ull_stat(h, h1, "vb_0:uuid", "vbucket-details");
 -    uint64_t vb_uuid1 = get_ull_stat(h, h1, "vb_1:uuid", "vbucket-details");
 -    checkne(vb_uuid0, vb_uuid1, "UUID is not unique");
 -    return SUCCESS;
 -}
 -
+ struct mb19982_ctx {
+     mb19982_ctx(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, int iterations) {
+         this->h = h;
+         this->h1 = h1;
+         this->iterations = iterations;
+     }
+     ENGINE_HANDLE* h;
+     ENGINE_HANDLE_V1* h1;
+     int iterations;
+ };
+ void mb19982_add_stat(const char *key, const uint16_t klen, const char *val,
+                       const uint32_t vlen, const void *cookie) {
+     // do nothing
+ }
+ extern "C" {
+     static void mb19982_thread_func(void *args) {
+         struct mb19982_ctx *ctx = static_cast<mb19982_ctx *>(args);
+         for (int ii = 0; ii < ctx->iterations; ii++) {
+             checkeq(ctx->h1->get_stats(ctx->h, NULL, "dcp", 3, &mb19982_add_stat),
+                     ENGINE_SUCCESS, "failed get_stats(dcp)");
+         }
+     }
+ }
+ /*
+  * This test creates a DCP consumer on a replica VB and then from a second thread
+  * fires get_stats("dcp") whilst the main thread changes VB state from
+  * replica->active->replica (and so on).
+  * MB-19982 idenified a lock inversion between these two functional paths and this
+  * test proves and protects the issue.
+  */
+ static enum test_result test_mb19982(ENGINE_HANDLE *h,
+                                      ENGINE_HANDLE_V1 *h1) {
+     // Load up vb0 with num_items
+     int num_items = 1000;
+     int iterations = 1000; // how many stats calls
+     struct mb19982_ctx ctx(h, h1, iterations);
+     cb_thread_t cp_thread;
+     const void *cookie = testHarness.create_cookie();
+     uint32_t opaque = 0xFFFF0000;
+     uint32_t flags = 0;
+     std::string name = "unittest";
+     // Switch to replica
+     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+           "Failed to set vbucket state.");
+     // Open consumer connection
+     checkeq(h1->dcp.open(h, cookie, opaque, 0, flags,
+                          (void*)name.c_str(), name.length()),
+             ENGINE_SUCCESS,
+             "Failed dcp Consumer open connection.");
+     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
+                             PROTOCOL_BINARY_RESPONSE_SUCCESS);
+     cb_assert(cb_create_thread(&cp_thread,
+                                mb19982_thread_func,
+                                &ctx, 0) == 0);
+     uint32_t stream_opaque = get_int_stat(h, h1,
+                                           "eq_dcpq:unittest:stream_0_opaque",
+                                           "dcp");
+     for (int i = 1; i <= num_items; i++) {
+         std::stringstream ss;
+         ss << "key-" << i;
+         checkeq(h1->dcp.snapshot_marker(h, cookie,
+                                         stream_opaque, 0/*vbid*/,
+                                         num_items, num_items + i, 2),
+                 ENGINE_SUCCESS,
+                 "Failed to send snapshot marker");
+         checkeq(h1->dcp.mutation(h, cookie, stream_opaque,
+                                  ss.str().c_str(), ss.str().length(),
+                                  "value", 5, i * 3, 0, 0, 0,
+                                  i + num_items, i + num_items,
+                                  0, 0, "", 0, INITIAL_NRU_VALUE),
+                 ENGINE_SUCCESS,
+                 "Failed to send dcp mutation");
+         // And flip VB state (this can have a lock inversion with stats)
+         checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_active),
+                 ENGINE_SUCCESS, "failed to change to active");
+         checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_replica),
+                 ENGINE_SUCCESS, "failed to change to replica");
+     }
+     cb_assert(cb_join_thread(cp_thread) == 0);
+     testHarness.destroy_cookie(cookie);
+     return SUCCESS;
+ }
  static enum test_result prepare(engine_test_t *test) {
  #ifdef __sun
          // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@@ -15583,18 -12972,9 +15678,21 @@@ engine_test_t* get_tests(void) 
                   test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
                   prepare, cleanup),
  
 +        TestCase("test_set_dcp_param",
 +                 test_set_dcp_param, test_setup, teardown, NULL,
 +                 prepare, cleanup),
 +
 +        TestCase("test_mb18452_largeYield",
 +                 test_mb18452_largeYield, test_setup, teardown, "max_num_nonio=1",
 +                 prepare, cleanup),
 +
 +        TestCase("test_mb18452_smallYield",
 +                 test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
 +                 prepare, cleanup),
 +
+         TestCase("test MB-19982", test_mb19982,
+                  test_setup, teardown, NULL, prepare, cleanup),
          TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
      };