MB-17766: Regression test that checks for race during takeover 66/59666/8
authorDave Rigby <daver@couchbase.com>
Tue, 9 Feb 2016 18:21:25 +0000 (18:21 +0000)
committerabhinav dangeti <abhinav@couchbase.com>
Wed, 10 Feb 2016 19:05:43 +0000 (19:05 +0000)
Module test: ep-engine_stream_test

Change-Id: I8e11722b1ed1029c8b969dcb88000c5903fbb0ca
Reviewed-on: http://review.couchbase.org/59666
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
CMakeLists.txt
src/dcp-stream.h
src/executorpool.cc
src/statwriter.h
src/tapconnection.h
tests/module_tests/stream_test.cc [new file with mode: 0644]

index 1090115..82c4504 100644 (file)
@@ -137,6 +137,51 @@ ADD_LIBRARY(ep SHARED
 SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
 
+ADD_EXECUTABLE(ep-engine_stream_test
+  tests/module_tests/stream_test.cc
+  src/access_scanner.cc
+  src/atomic.cc
+  src/backfill.cc
+  src/bgfetcher.cc
+  src/checkpoint.cc
+  src/checkpoint_remover.cc
+  src/conflict_resolution.cc
+  src/connmap.cc
+  src/dcp-consumer.cc
+  src/dcp-producer.cc
+  src/dcp-response.cc
+  src/dcp-stream.cc
+  src/ep.cc
+  src/ep_engine.cc
+  src/ep_time.c
+  src/executorpool.cc
+  src/executorthread.cc
+  src/failover-table.cc
+  src/flusher.cc
+  src/htresizer.cc
+  src/item.cc
+  src/item_pager.cc
+  src/kvshard.cc
+  src/memory_tracker.cc
+  src/mutation_log.cc
+  src/mutex.cc
+  src/objectregistry.cc
+  src/priority.cc
+  src/tapconnection.cc
+  src/stored-value.cc
+  src/tapthrottle.cc
+  src/tasks.cc
+  src/taskqueue.cc
+  src/vbucket.cc
+  src/vbucketmap.cc
+  src/warmup.cc
+  ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
+  ${CONFIG_SOURCE}
+  ${KVSTORE_SOURCE}
+  ${COUCH_KVSTORE_SOURCE}
+  ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
+TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform)
+
 ADD_EXECUTABLE(ep-engine_atomic_ptr_test
   tests/module_tests/atomic_ptr_test.cc
   src/atomic.cc
@@ -202,6 +247,7 @@ ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
 ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
 ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
 ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
+ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
 
 ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
 SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
index 543fe0a..e0c6888 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "config.h"
 
+#include "ep_engine.h"
+
 #include <queue>
 
 class EventuallyPersistentEngine;
index 56f2444..42d9a91 100644 (file)
@@ -21,6 +21,8 @@
 #include <queue>
 #include <sstream>
 
+#include "configuration.h"
+#include "ep_engine.h"
 #include "statwriter.h"
 #include "taskqueue.h"
 #include "executorpool.h"
index 09393b0..51ff436 100644 (file)
@@ -15,6 +15,9 @@
  *   limitations under the License.
  */
 
+#ifndef SRC_STATWRITER_H_
+#define SRC_STATWRITER_H_ 1
+
 #include "config.h"
 
 #include <memcached/engine.h>
 
 #include <iostream>
 
-#include "ep_engine.h"
 #include "histo.h"
+#include "objectregistry.h"
 
-#ifndef SRC_STATWRITER_H_
-#define SRC_STATWRITER_H_ 1
+class EventuallyPersistentEngine;
 
 namespace STATWRITER_NAMESPACE {
 
index 3841998..2884520 100644 (file)
 
 #include "atomic.h"
 #include "common.h"
+#include "item.h"
 #include "locks.h"
 #include "mutex.h"
 #include "statwriter.h"
+#include "tasks.h"
+#include "vbucket.h"
 
 // forward decl
 class ConnHandler;
diff --git a/tests/module_tests/stream_test.cc b/tests/module_tests/stream_test.cc
new file mode 100644 (file)
index 0000000..c8abdc9
--- /dev/null
@@ -0,0 +1,157 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "dcp-producer.h"
+#include "dcp-stream.h"
+#include "../programs/engine_testapp/mock_server.h"
+
+#include <platform/dirutils.h>
+
+// Simple backport of the GTest-style EXPECT_EQ macro.
+#define EXPECT_EQ(a, b, msg) \
+    do { \
+        if ((a) != (b)) { \
+            throw std::runtime_error(msg); \
+        } \
+    } while(0)
+
+// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+// normally protected methods publically for test purposes.
+class MockActiveStream : public ActiveStream {
+public:
+    MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
+                     const std::string &name, uint32_t flags, uint32_t opaque,
+                     uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
+                     uint64_t vb_uuid, uint64_t snap_start_seqno,
+                     uint64_t snap_end_seqno, ExTask task)
+    : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
+                   snap_start_seqno, snap_end_seqno, task) {}
+
+    // Expose underlying protected ActiveStream methods as public
+    void public_getOutstandingItems(RCPtr<VBucket> &vb,
+                                    std::deque<queued_item> &items) {
+        getOutstandingItems(vb, items);
+    }
+
+    void public_processItems(std::deque<queued_item>& items) {
+        processItems(items);
+    }
+
+    bool public_nextCheckpointItem() {
+        return nextCheckpointItem();
+    }
+};
+
+/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
+ * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
+ * doesn't incorrectly return false (meaning that there are no more checkpoint
+ * items to send).
+ */
+static void test_mb17766(const std::string& test_dbname) {
+    // Setup an engine with a single active vBucket.
+    ENGINE_HANDLE* handle;
+    EXPECT_EQ(ENGINE_SUCCESS,
+              create_instance(1, get_mock_server_api, &handle),
+              "Failed to created ep engine instance");
+
+    EventuallyPersistentEngine* engine =
+            reinterpret_cast<EventuallyPersistentEngine*>(handle);
+    ObjectRegistry::onSwitchThread(engine);
+    std::string config = "dbname=" + test_dbname;
+    EXPECT_EQ(ENGINE_SUCCESS,
+              engine->initialize(config.c_str()),
+              "Failed to initialize engine.");
+
+    const uint16_t vbid = 0;
+    engine->setVBucketState(vbid, vbucket_state_active, false);
+
+    // Wait for warmup to complete.
+    while (engine->getEpStore()->isWarmingUp()) {
+        usleep(10);
+    }
+
+    // Add an item.
+    std::string value("value");
+    Item item("key", /*flags*/0, /*exp*/0, value.c_str(), value.size());
+
+    uint64_t cas;
+    EXPECT_EQ(ENGINE_SUCCESS,
+              engine->store(NULL, &item, &cas, OPERATION_SET, vbid),
+              "Store failed");
+
+    // Create a DCP producer and register with checkpoint Manager.
+    dcp_producer_t producer = new DcpProducer(*engine, /*cookie*/NULL,
+                                              "test_mb_17766_producer",
+                                              /*notifyOnly*/false);
+    ExTask task = new ActiveStreamCheckpointProcessorTask(*engine);
+    stream_t stream = new MockActiveStream(engine, producer,
+                                           "test_mb17766", /*flags*/0,
+                                            /*opaque*/0, vbid,
+                                            /*st_seqno*/0,
+                                            /*en_seqno*/0,
+                                            /*vb_uuid*/0xabcd,
+                                            /*snap_start_seqno*/0,
+                                            /*snap_end_seqno*/0, task);
+
+    RCPtr<VBucket> vb0 = engine->getVBucket(0);
+    EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
+    EXPECT_EQ(false,
+              vb0->checkpointManager.registerTAPCursor("test_mb17766"),
+              "Found an existing TAP cursor when attemping to register ours");
+
+    // Should start with nextCheckpointItem() returning true.
+    MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
+    EXPECT_EQ(true,
+              mock_stream->public_nextCheckpointItem(),
+              "nextCheckpointItem() should initially be true.");
+
+    std::deque<queued_item> items;
+
+    // Get the set of outstanding items
+    mock_stream->public_getOutstandingItems(vb0, items);
+
+    // REGRESSION CHECK: nextCheckpointItem() should still return true
+    EXPECT_EQ(true,
+              mock_stream->public_nextCheckpointItem(),
+              "nextCheckpointItem() after getting outstanding items should be true.");
+
+    // Process the set of items
+    mock_stream->public_processItems(items);
+
+    // Should finish with nextCheckpointItem() returning false.
+    EXPECT_EQ(false,
+              mock_stream->public_nextCheckpointItem(),
+              "nextCheckpointItem() after processing items should be false.");
+}
+
+int main(int argc, char **argv) {
+    (void)argc; (void)argv;
+    putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
+
+    bool success = true;
+    try {
+        test_mb17766("stream_test_db");
+    } catch (std::exception& e) {
+        std::cerr << "FAILED: " << e.what() << std::endl;
+        success = false;
+    }
+
+    // Cleanup any files we created.
+    CouchbaseDirectoryUtilities::rmrf("stream_test_db");
+
+    return success ? 0 : 1;
+}