Merge branch 'couchbase/3.0.x' into 'couchbase/sherlock' 66/59866/1
authorabhinavdangeti <abhinav@couchbase.com>
Fri, 12 Feb 2016 17:13:58 +0000 (09:13 -0800)
committerabhinavdangeti <abhinav@couchbase.com>
Fri, 12 Feb 2016 17:14:23 +0000 (09:14 -0800)
couchbase/3.0.x:
|\
| * b84d09d MB-17766: Regression test that checks for race during takeover
| * ba305c4 MB-17766: Incorrect ordering of messages during ActiveStream's takeover-send phase
| * 4f39683 MB-17766: Avoid copy overhead of std::deque in getOutstandingItems
| * e3f4855 MB-17766: Refactor nextCheckpointItemTask to allow testing

Change-Id: I57a2aa37abc4ab60f09648bd7b02740b4bf933e6

1  2 
CMakeLists.txt
src/dcp-backfill.h
src/dcp-stream.cc
src/dcp-stream.h
src/executorpool.cc
src/tapconnection.h
tests/module_tests/stream_test.cc

diff --cc CMakeLists.txt
@@@ -19,6 -15,6 +19,7 @@@ INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INST
                             ${CMAKE_CURRENT_BINARY_DIR}/src
                             ${SNAPPY_INCLUDE_DIR}
                             ${Platform_SOURCE_DIR}/include
++                           ${Memcached_SOURCE_DIR}
                             ${Memcached_SOURCE_DIR}/include
                             ${Couchstore_SOURCE_DIR}/include
                             ${CMAKE_CURRENT_BINARY_DIR})
@@@ -144,6 -137,51 +145,72 @@@ ADD_LIBRARY(ep SHARE
  SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
  TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
  
 -TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform)
++IF (APPLE)
++    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c
++                             ${Memcached_SOURCE_DIR}/daemon/darwin_zone.c)
++ELSE (APPLE)
++    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c)
++ENDIF (APPLE)
++
++IF (MEMORY_ALLOCATOR)
++    INCLUDE_DIRECTORIES(AFTER ${MALLOC_INCLUDE_DIR})
++ELSE (MEMORY_ALLOCATOR)
++    SET(MALLOC_LIBRARIES "")
++ENDIF (MEMORY_ALLOCATOR)
++
+ ADD_EXECUTABLE(ep-engine_stream_test
+   tests/module_tests/stream_test.cc
+   src/access_scanner.cc
+   src/atomic.cc
+   src/backfill.cc
+   src/bgfetcher.cc
++  src/bloomfilter.cc
+   src/checkpoint.cc
+   src/checkpoint_remover.cc
+   src/conflict_resolution.cc
+   src/connmap.cc
++  src/dcp-backfill.cc
++  src/dcp-backfill-manager.cc
+   src/dcp-consumer.cc
+   src/dcp-producer.cc
+   src/dcp-response.cc
+   src/dcp-stream.cc
++  src/defragmenter.cc
++  src/defragmenter_visitor.cc
+   src/ep.cc
+   src/ep_engine.cc
+   src/ep_time.c
+   src/executorpool.cc
+   src/executorthread.cc
++  src/ext_meta_parser.cc
+   src/failover-table.cc
+   src/flusher.cc
+   src/htresizer.cc
+   src/item.cc
+   src/item_pager.cc
+   src/kvshard.cc
+   src/memory_tracker.cc
++  src/murmurhash3.cc
+   src/mutation_log.cc
+   src/mutex.cc
+   src/objectregistry.cc
+   src/priority.cc
+   src/tapconnection.cc
+   src/stored-value.cc
+   src/tapthrottle.cc
+   src/tasks.cc
+   src/taskqueue.cc
+   src/vbucket.cc
+   src/vbucketmap.cc
+   src/warmup.cc
+   ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
+   ${CONFIG_SOURCE}
+   ${KVSTORE_SOURCE}
+   ${COUCH_KVSTORE_SOURCE}
++  ${MEMORY_TRACKING_SRCS}
+   ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
++TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform ${MALLOC_LIBRARIES})
  ADD_EXECUTABLE(ep-engine_atomic_ptr_test
    tests/module_tests/atomic_ptr_test.cc
    src/atomic.cc
index fafe0ad,0000000..be26689
mode 100644,000000..100644
--- /dev/null
@@@ -1,102 -1,0 +1,99 @@@
- class Stream;
- typedef SingleThreadedRCPtr<Stream> stream_t;
 +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 +/*
 + *     Copyright 2013 Couchbase, Inc
 + *
 + *   Licensed under the Apache License, Version 2.0 (the "License");
 + *   you may not use this file except in compliance with the License.
 + *   You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *   Unless required by applicable law or agreed to in writing, software
 + *   distributed under the License is distributed on an "AS IS" BASIS,
 + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *   See the License for the specific language governing permissions and
 + *   limitations under the License.
 + */
 +
 +#ifndef SRC_DCP_BACKFILL_H_
 +#define SRC_DCP_BACKFILL_H_ 1
 +
 +#include "config.h"
 +
 +#include "callbacks.h"
 +#include "dcp-stream.h"
 +
 +class EventuallyPersistentEngine;
 +class ScanContext;
 +
 +typedef enum {
 +    backfill_state_init,
 +    backfill_state_scanning,
 +    backfill_state_completing,
 +    backfill_state_done
 +} backfill_state_t;
 +
 +typedef enum {
 +    backfill_success,
 +    backfill_finished,
 +    backfill_snooze
 +} backfill_status_t;
 +
 +class CacheCallback : public Callback<CacheLookup> {
 +public:
 +    CacheCallback(EventuallyPersistentEngine* e, stream_t &s);
 +
 +    void callback(CacheLookup &lookup);
 +
 +private:
 +    EventuallyPersistentEngine* engine_;
 +    stream_t stream_;
 +};
 +
 +class DiskCallback : public Callback<GetValue> {
 +public:
 +    DiskCallback(stream_t &s);
 +
 +    void callback(GetValue &val);
 +
 +private:
 +    stream_t stream_;
 +};
 +
 +class DCPBackfill {
 +public:
 +    DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
 +                uint64_t start_seqno, uint64_t end_seqno);
 +
 +    backfill_status_t run();
 +
 +    uint16_t getVBucketId();
 +
 +    uint64_t getEndSeqno();
 +
 +    bool isDead() {
 +        return !stream->isActive();
 +    }
 +
 +    void cancel();
 +
 +private:
 +
 +    backfill_status_t create();
 +
 +    backfill_status_t scan();
 +
 +    backfill_status_t complete(bool cancelled);
 +
 +    void transitionState(backfill_state_t newState);
 +
 +    EventuallyPersistentEngine *engine;
 +    stream_t                    stream;
 +    uint64_t                    startSeqno;
 +    uint64_t                    endSeqno;
 +    ScanContext*                scanCtx;
 +    backfill_state_t            state;
 +    Mutex                       lock;
 +};
 +
 +#endif  // SRC_DCP_BACKFILL_H_
@@@ -132,9 -274,10 +132,10 @@@ ActiveStream::ActiveStream(EventuallyPe
                snap_start_seqno, snap_end_seqno),
         lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
         takeoverState(vbucket_state_pending), backfillRemaining(0),
 -       itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
 -       waitForSnapshot(0), engine(e), producer(p),
 -       isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
 -       checkpointCreatorTask(task), chkptItemsExtractionInProgress(false) {
 +       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
 +       engine(e), producer(p), isBackfillTaskRunning(false),
-        lastSentSnapEndSeqno(0), checkpointCreatorTask(task) {
++       lastSentSnapEndSeqno(0), checkpointCreatorTask(task),
++       chkptItemsExtractionInProgress(false) {
  
      const char* type = "";
      if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@@ -592,56 -691,66 +595,68 @@@ void ActiveStreamCheckpointProcessorTas
  
  void ActiveStream::nextCheckpointItemTask() {
      RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
-     if (!vbucket) {
+     if (vbucket) {
 -        std::deque<queued_item> items;
++        std::vector<queued_item> items;
+         getOutstandingItems(vbucket, items);
+         processItems(items);
+     } else {
          /* The entity deleting the vbucket must set stream to dead,
             calling setDead(END_STREAM_STATE) will cause deadlock because
             it will try to grab streamMutex which is already acquired at this
             point here */
          return;
      }
-     bool mark = false;
-     std::vector<queued_item> items;
-     std::deque<MutationResponse*> mutations;
-     vbucket->checkpointManager.getAllItemsForCursor(name_, items);
-     if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
-         engine->getEpStore()->wakeUpCheckpointRemover();
-     }
+ }
  
-     if (items.empty()) {
-         producer->notifyStreamReady(vb_, true);
-         return;
-     }
+ void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
 -                                       std::deque<queued_item> &items) {
++                                       std::vector<queued_item> &items) {
+     // Commencing item processing - set guard flag.
+     chkptItemsExtractionInProgress.store(true);
  
-     if (items.front()->getOperation() == queue_op_checkpoint_start) {
-         mark = true;
+     vb->checkpointManager.getAllItemsForCursor(name_, items);
+     if (vb->checkpointManager.getNumCheckpoints() > 1) {
+         engine->getEpStore()->wakeUpCheckpointRemover();
      }
+ }
  
-     std::vector<queued_item>::iterator itr = items.begin();
-     for (; itr != items.end(); ++itr) {
-         queued_item& qi = *itr;
-         if (qi->getOperation() == queue_op_set ||
-             qi->getOperation() == queue_op_del) {
-             curChkSeqno = qi->getBySeqno();
-             lastReadSeqno = qi->getBySeqno();
  
-             mutations.push_back(new MutationResponse(qi, opaque_,
-                            prepareExtendedMetaData(qi->getVBucketId(),
-                                                    qi->getConflictResMode())));
-         } else if (qi->getOperation() == queue_op_checkpoint_start) {
-             cb_assert(mutations.empty());
 -void ActiveStream::processItems(std::deque<queued_item>& items) {
++void ActiveStream::processItems(std::vector<queued_item>& items) {
+     if (!items.empty()) {
+         bool mark = false;
+         if (items.front()->getOperation() == queue_op_checkpoint_start) {
              mark = true;
          }
-     }
  
-     if (mutations.empty()) {
-         // If we only got checkpoint start or ends check to see if there are
-         // any more snapshots before pausing the stream.
-         nextCheckpointItemTask();
-     } else {
-         snapshot(mutations, mark);
+         std::deque<MutationResponse*> mutations;
 -        std::deque<queued_item>::iterator itemItr;
 -        for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
 -            queued_item& qi = *itemItr;
++        std::vector<queued_item>::iterator itr = items.begin();
++        for (; itr != items.end(); ++itr) {
++            queued_item& qi = *itr;
+             if (qi->getOperation() == queue_op_set ||
+                 qi->getOperation() == queue_op_del) {
+                 curChkSeqno = qi->getBySeqno();
+                 lastReadSeqno = qi->getBySeqno();
 -                mutations.push_back(new MutationResponse(qi, opaque_));
++                mutations.push_back(new MutationResponse(qi, opaque_,
++                            prepareExtendedMetaData(qi->getVBucketId(),
++                                                    qi->getConflictResMode())));
+             } else if (qi->getOperation() == queue_op_checkpoint_start) {
 -                snapshot(mutations, mark);
++                cb_assert(mutations.empty());
+                 mark = true;
+             }
+         }
+         if (mutations.empty()) {
+             // If we only got checkpoint start or ends check to see if there are
+             // any more snapshots before pausing the stream.
+             nextCheckpointItemTask();
+         } else {
+             snapshot(mutations, mark);
+         }
      }
-     // ...notify...
+     // Completed item processing - clear guard flag and notify producer.
+     chkptItemsExtractionInProgress.store(false);
      producer->notifyStreamReady(vb_, true);
  }
  
  
  #include "config.h"
  
- #include "vbucket.h"
 +#include "atomic.h"
 +#include "dcp-stream.h"
 +#include "dcp-producer.h"
+ #include "ep_engine.h"
 +#include "ext_meta_parser.h"
++#include "vbucket.h"
  
  #include <queue>
  
@@@ -37,8 -33,9 +38,12 @@@ class DcpResponse
  class DcpConsumer;
  typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
  
+ class DcpProducer;
  typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
  
++class Stream;
++typedef SingleThreadedRCPtr<Stream> stream_t;
++
  typedef enum {
      STREAM_PENDING,
      STREAM_BACKFILLING,
@@@ -217,6 -214,16 +222,16 @@@ public
      // Runs on ActiveStreamCheckpointProcessorTask
      void nextCheckpointItemTask();
  
 -    void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
+ protected:
+     // Returns the outstanding items for the stream's checkpoint cursor.
 -    void processItems(std::deque<queued_item>& items);
++    void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
+     // Given a set of queued items, create mutation responses for each item,
+     // and pass onto the producer associated with this stream.
++    void processItems(std::vector<queued_item>& items);
+     bool nextCheckpointItem();
  private:
  
      void transitionState(stream_state_t newState);
      dcp_producer_t producer;
      AtomicValue<bool> isBackfillTaskRunning;
  
 +    struct {
 +        AtomicValue<uint32_t> bytes;
 +        AtomicValue<uint32_t> items;
 +    } bufferedBackfill;
 +
      //! Last snapshot end seqno sent to the DCP client
      uint64_t lastSentSnapEndSeqno;
      ExTask checkpointCreatorTask;
+     /* Flag used by checkpointCreatorTask that is set before all items are
+        extracted for given checkpoint cursor, and is unset after all retrieved
+        items are added to the readyQ */
+     AtomicValue<bool> chkptItemsExtractionInProgress;
  };
  
  
Simple merge
Simple merge
index 0000000,c8abdc9..8baa599
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,157 +1,163 @@@
 -#include "../programs/engine_testapp/mock_server.h"
+ /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+ /*
+  *     Copyright 2016 Couchbase, Inc
+  *
+  *   Licensed under the Apache License, Version 2.0 (the "License");
+  *   you may not use this file except in compliance with the License.
+  *   You may obtain a copy of the License at
+  *
+  *       http://www.apache.org/licenses/LICENSE-2.0
+  *
+  *   Unless required by applicable law or agreed to in writing, software
+  *   distributed under the License is distributed on an "AS IS" BASIS,
+  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *   See the License for the specific language governing permissions and
+  *   limitations under the License.
+  */
+ #include "dcp-producer.h"
+ #include "dcp-stream.h"
 -                                    std::deque<queued_item> &items) {
++#include "programs/engine_testapp/mock_server.h"
+ #include <platform/dirutils.h>
+ // Simple backport of the GTest-style EXPECT_EQ macro.
+ #define EXPECT_EQ(a, b, msg) \
+     do { \
+         if ((a) != (b)) { \
+             throw std::runtime_error(msg); \
+         } \
+     } while(0)
+ // Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+ // normally protected methods publically for test purposes.
+ class MockActiveStream : public ActiveStream {
+ public:
+     MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
+                      const std::string &name, uint32_t flags, uint32_t opaque,
+                      uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
+                      uint64_t vb_uuid, uint64_t snap_start_seqno,
+                      uint64_t snap_end_seqno, ExTask task)
+     : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
+                    snap_start_seqno, snap_end_seqno, task) {}
+     // Expose underlying protected ActiveStream methods as public
+     void public_getOutstandingItems(RCPtr<VBucket> &vb,
 -    void public_processItems(std::deque<queued_item>& items) {
++                                    std::vector<queued_item> &items) {
+         getOutstandingItems(vb, items);
+     }
 -    Item item("key", /*flags*/0, /*exp*/0, value.c_str(), value.size());
++    void public_processItems(std::vector<queued_item>& items) {
+         processItems(items);
+     }
+     bool public_nextCheckpointItem() {
+         return nextCheckpointItem();
+     }
+ };
+ /* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
+  * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
+  * doesn't incorrectly return false (meaning that there are no more checkpoint
+  * items to send).
+  */
+ static void test_mb17766(const std::string& test_dbname) {
++    // Init alloc hooks
++    mock_init_alloc_hooks();
++
+     // Setup an engine with a single active vBucket.
+     ENGINE_HANDLE* handle;
+     EXPECT_EQ(ENGINE_SUCCESS,
+               create_instance(1, get_mock_server_api, &handle),
+               "Failed to created ep engine instance");
++    // Init mock server
++    init_mock_server(handle);
++
+     EventuallyPersistentEngine* engine =
+             reinterpret_cast<EventuallyPersistentEngine*>(handle);
+     ObjectRegistry::onSwitchThread(engine);
+     std::string config = "dbname=" + test_dbname;
+     EXPECT_EQ(ENGINE_SUCCESS,
+               engine->initialize(config.c_str()),
+               "Failed to initialize engine.");
+     const uint16_t vbid = 0;
+     engine->setVBucketState(vbid, vbucket_state_active, false);
+     // Wait for warmup to complete.
+     while (engine->getEpStore()->isWarmingUp()) {
+         usleep(10);
+     }
+     // Add an item.
+     std::string value("value");
 -              vb0->checkpointManager.registerTAPCursor("test_mb17766"),
++    Item item("key", 3, /*flags*/0, /*exp*/0, value.c_str(), value.size());
+     uint64_t cas;
+     EXPECT_EQ(ENGINE_SUCCESS,
+               engine->store(NULL, &item, &cas, OPERATION_SET, vbid),
+               "Store failed");
+     // Create a DCP producer and register with checkpoint Manager.
+     dcp_producer_t producer = new DcpProducer(*engine, /*cookie*/NULL,
+                                               "test_mb_17766_producer",
+                                               /*notifyOnly*/false);
+     ExTask task = new ActiveStreamCheckpointProcessorTask(*engine);
+     stream_t stream = new MockActiveStream(engine, producer,
+                                            "test_mb17766", /*flags*/0,
+                                             /*opaque*/0, vbid,
+                                             /*st_seqno*/0,
+                                             /*en_seqno*/0,
+                                             /*vb_uuid*/0xabcd,
+                                             /*snap_start_seqno*/0,
+                                             /*snap_end_seqno*/0, task);
+     RCPtr<VBucket> vb0 = engine->getVBucket(0);
+     EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
+     EXPECT_EQ(false,
 -    std::deque<queued_item> items;
++              vb0->checkpointManager.registerCursor("test_mb17766"),
+               "Found an existing TAP cursor when attemping to register ours");
+     // Should start with nextCheckpointItem() returning true.
+     MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
+     EXPECT_EQ(true,
+               mock_stream->public_nextCheckpointItem(),
+               "nextCheckpointItem() should initially be true.");
++    std::vector<queued_item> items;
+     // Get the set of outstanding items
+     mock_stream->public_getOutstandingItems(vb0, items);
+     // REGRESSION CHECK: nextCheckpointItem() should still return true
+     EXPECT_EQ(true,
+               mock_stream->public_nextCheckpointItem(),
+               "nextCheckpointItem() after getting outstanding items should be true.");
+     // Process the set of items
+     mock_stream->public_processItems(items);
+     // Should finish with nextCheckpointItem() returning false.
+     EXPECT_EQ(false,
+               mock_stream->public_nextCheckpointItem(),
+               "nextCheckpointItem() after processing items should be false.");
+ }
+ int main(int argc, char **argv) {
+     (void)argc; (void)argv;
+     putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
+     bool success = true;
+     try {
+         test_mb17766("stream_test_db");
+     } catch (std::exception& e) {
+         std::cerr << "FAILED: " << e.what() << std::endl;
+         success = false;
+     }
+     // Cleanup any files we created.
+     CouchbaseDirectoryUtilities::rmrf("stream_test_db");
+     return success ? 0 : 1;
+ }