Merge tag 'v3.1.5' into sherlock 55/64955/1
authorDave Rigby <daver@couchbase.com>
Wed, 15 Jun 2016 11:14:34 +0000 (12:14 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 15 Jun 2016 13:30:37 +0000 (14:30 +0100)
3.1.5 release (ep-engine)

* tag 'v3.1.5':
  MB-16656: Send snapshotEnd as highSeqno for replica vb in GET_ALL_VB_SEQNOS call
  MB-19153: Break circular dependency while deleting bucket
  MB-19113: Address false positive lock inversion seen with test_mb16357

Change-Id: I2e7cd72f09c8b2b3780568ed7f7ca81fde064cb9

1  2 
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc

@@@ -637,54 -518,6 +637,56 @@@ bool set_vbucket_state(ENGINE_HANDLE *h
      return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
  }
  
-                           int vb_start, int vb_end) {
 +bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 +                       vbucket_state_t state, const void *cookie) {
 +    protocol_binary_request_header *pkt;
 +    if (state) {
 +        char ext[sizeof(vbucket_state_t)];
 +        encodeExt(ext, static_cast<uint32_t>(state));
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
 +                           sizeof(vbucket_state_t));
 +    } else {
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
 +    }
 +
 +    check(h1->unknown_command(h, cookie, pkt, add_response) ==
 +          ENGINE_SUCCESS, "Error in getting all vb info");
 +
 +    free(pkt);
 +    return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
 +}
 +
 +void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                 ntohs(*(reinterpret_cast<uint16_t*>(last_body +
-                                                   per_vb_resp_size * i))),
++                          uint16_t vb_start, uint16_t vb_end) {
 +    const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
 +    const int high_seqno_offset = sizeof(uint16_t);
 +
++    std::string seqno_body(last_body, last_bodylen);
++
 +    /* Check if the total response length is as expected. We expect 10 bytes
 +       (2 for vb_id + 8 for seqno) */
 +    checkeq((uint32_t)((vb_end - vb_start + 1) * per_vb_resp_size),
 +            last_bodylen, "Failed to get all vb info.");
 +    /* Check if the contents are correct */
 +    for (int i = 0; i < (vb_end - vb_start + 1); i++) {
 +        /* Check for correct vb_id */
 +        checkeq(static_cast<const uint16_t>(vb_start + i),
-                 ntohll(*(reinterpret_cast<uint64_t*>(last_body +
-                                                      per_vb_resp_size * i +
-                                                      high_seqno_offset))),
++                ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
++                                                          per_vb_resp_size * i))),
 +                "vb_id mismatch");
 +        /* Check for correct high_seqno */
 +        std::string vb_stat_seqno("vb_" + std::to_string(vb_start + i) +
 +                                  ":high_seqno");
 +        uint64_t high_seqno_vb = get_ull_stat(h, h1, vb_stat_seqno.c_str(),
 +                                              "vbucket-seqno");
 +        checkeq(high_seqno_vb,
++                ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
++                                                           per_vb_resp_size * i +
++                                                           high_seqno_offset))),
 +              "high_seqno mismatch");
 +    }
 +}
 +
  void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                     const size_t keylen, const char *val, const size_t vallen,
                     const uint32_t vb, ItemMetaData *itemMeta,
@@@ -149,10 -138,6 +149,10 @@@ bool set_param(ENGINE_HANDLE *h, ENGINE
                 const char *param, const char *val);
  bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                         uint16_t vb, vbucket_state_t state);
-                           int vb_start, int vb_end);
 +bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 +                       vbucket_state_t state, const void *cookie);
 +void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
++                          uint16_t vb_start, uint16_t vb_end);
  void start_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1);
  void stop_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1);
  ENGINE_ERROR_CODE store(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
@@@ -14115,348 -11778,71 +14106,413 @@@ static enum test_result test_mb16357(EN
      return SUCCESS;
  }
  
 +// Regression test for MB-17517 - ensure that if an item is locked when TAP
 +// attempts to stream it, it doesn't get a CAS of -1.
 +static enum test_result test_mb17517_tap_with_locked_key(ENGINE_HANDLE *h,
 +                                                         ENGINE_HANDLE_V1 *h1) {
 +    const uint16_t vbid = 0;
 +    // Store an item and immediately lock it.
 +    item *it = NULL;
 +    std::string key("key");
 +    checkeq(store(h, h1, NULL, OPERATION_SET, key.c_str(), "value",
 +                  &it, 0, vbid, 3600, PROTOCOL_BINARY_RAW_BYTES),
 +            ENGINE_SUCCESS,
 +            "Failed to store an item.");
 +    h1->release(h, NULL, it);
 +
 +    uint32_t lock_timeout = 10;
 +    getl(h, h1, key.c_str(), vbid, lock_timeout);
 +    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status,
 +            "Expected to be able to getl on first try");
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    // Create the TAP connection and try to get the items.
 +    const void *cookie = testHarness.create_cookie();
 +    std::string name("test_mb17517_tap_with_locked_key");
 +    TAP_ITERATOR iter = h1->get_tap_iterator(h, cookie, name.c_str(),
 +                                             name.length(),
 +                                             TAP_CONNECT_FLAG_DUMP, NULL, 0);
 +    check(iter != NULL, "Failed to create a tap iterator");
 +
 +    void *engine_specific;
 +    uint16_t nengine_specific;
 +    uint8_t ttl;
 +    uint16_t flags;
 +    uint32_t seqno;
 +    uint16_t vbucket;
 +    tap_event_t event;
 +
 +    uint16_t unlikely_vbucket_identifier = 17293;
 +
 +    do {
 +        vbucket = unlikely_vbucket_identifier;
 +        event = iter(h, cookie, &it, &engine_specific,
 +                     &nengine_specific, &ttl, &flags,
 +                     &seqno, &vbucket);
 +
 +        switch (event) {
 +        case TAP_PAUSE:
 +            testHarness.waitfor_cookie(cookie);
 +            break;
 +        case TAP_OPAQUE:
 +        case TAP_NOOP:
 +            break;
 +        case TAP_MUTATION: {
 +            testHarness.unlock_cookie(cookie);
 +
 +            item_info info;
 +            info.nvalue = 1;
 +            if (!h1->get_item_info(h, NULL, it, &info)) {
 +                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
 +                        "get_item_info failed\n");
 +                return FAIL;
 +            }
 +
 +            // Check the CAS.
 +            if (info.cas == ~0ull) {
 +                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
 +                        "Got CAS of -1 in TAP_MUTATION\n");
 +                return FAIL;
 +            }
 +
 +            testHarness.lock_cookie(cookie);
 +            break;
 +        }
 +        case TAP_DISCONNECT:
 +            break;
 +        default:
 +            std::cerr << "Unexpected event:  " << event << std::endl;
 +            return FAIL;
 +        }
 +
 +    } while (event != TAP_DISCONNECT);
 +
 +    testHarness.unlock_cookie(cookie);
 +    testHarness.destroy_cookie(cookie);
 +
 +    return SUCCESS;
 +}
 +
 +static void force_vbstate_to_25x(std::string dbname, int vbucket) {
 +    std::string filename = dbname +
 +                           DIRECTORY_SEPARATOR_CHARACTER +
 +                           std::to_string(vbucket) +
 +                           ".couch.1";
 +    Db* handle;
 +    couchstore_error_t err = couchstore_open_db(filename.c_str(),
 +                                                COUCHSTORE_OPEN_FLAG_CREATE,
 +                                                &handle);
 +
 +    checkeq(COUCHSTORE_SUCCESS, err, "Failed to open new database");
 +
 +    // Create 2.5 _local/vbstate
 +    std::string vbstate2_5_x ="{\"state\": \"active\","
 +                              " \"checkpoint_id\": \"1\","
 +                              " \"max_deleted_seqno\": \"0\"}";
 +    LocalDoc vbstate;
 +    vbstate.id.buf = (char *)"_local/vbstate";
 +    vbstate.id.size = sizeof("_local/vbstate") - 1;
 +    vbstate.json.buf = (char *)vbstate2_5_x.c_str();
 +    vbstate.json.size = vbstate2_5_x.size();
 +    vbstate.deleted = 0;
 +
 +    err = couchstore_save_local_document(handle, &vbstate);
 +    checkeq(COUCHSTORE_SUCCESS, err, "Failed to write local document");
 +    couchstore_commit(handle);
 +    couchstore_close_db(handle);
 +}
 +
 +// Regression test for MB-19635
 +// Check that warming up from a 2.x couchfile doesn't end up with a UUID of 0
 +// we warmup 2 vbuckets and ensure they get unique IDs.
 +static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
 +                                                      ENGINE_HANDLE_V1 *h1) {
 +    std::string dbname = dbname_env;
 +
 +    force_vbstate_to_25x(dbname, 0);
 +    force_vbstate_to_25x(dbname, 1);
 +
 +    // Now shutdown engine force and restart to warmup from the 2.5.x data.
 +    testHarness.reload_engine(&h, &h1,
 +                              testHarness.engine_path,
 +                              testHarness.get_current_testcase()->cfg,
 +                              true, false);
 +    wait_for_warmup_complete(h, h1);
 +    uint64_t vb_uuid0 = get_ull_stat(h, h1, "vb_0:uuid", "vbucket-details");
 +    uint64_t vb_uuid1 = get_ull_stat(h, h1, "vb_1:uuid", "vbucket-details");
 +    checkne(vb_uuid0, vb_uuid1, "UUID is not unique");
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
 +                                           ENGINE_HANDLE_V1 *h1)
 +{
 +    auto func = [h, h1](std::string key, size_t newValue, bool expectedSetParam){
 +        std::string statKey = "ep_" + key;
 +        size_t param = get_int_stat(h,
 +                                    h1,
 +                                    statKey.c_str());
 +        std::string value = std::to_string(newValue);
 +        check(expectedSetParam == set_param(h, h1,
 +                                            protocol_binary_engine_param_dcp,
 +                                            key.c_str(),
 +                                            value.c_str()),
 +                "Set param not expected");
 +        check(newValue != param,
 +              "Forcing failure as nothing will change");
 +
 +        if (expectedSetParam) {
 +            checkeq(newValue,
 +                    size_t(get_int_stat(h,
 +                                        h1,
 +                                        statKey.c_str())),
 +                "Incorrect dcp param value after calling set_param");
 +        }
 +    };
 +
 +    func("dcp_consumer_process_buffered_messages_yield_limit", 1000, true);
 +    func("dcp_consumer_process_buffered_messages_batch_size", 1000, true);
 +    func("dcp_consumer_process_buffered_messages_yield_limit", 0, false);
 +    func("dcp_consumer_process_buffered_messages_batch_size", 0, false);
 +    return SUCCESS;
 +}
 +
 +
 +/*
 + * Test MB-18452
 + * Drive DCP consumer by halting all NONIO tasks
 + * Writing numItems mutations (they get buffered)
 + * Then trigger the NONIO tasks, which will trigger the DCP consumer
 + *  to consume the buffered items.
 + * If the DCP consumer is friendly and not hogging the NONIO threads
 + * we should see it being scheduled many times.
 + * This test function returns the number of times the processor task was
 + * dispatched.
 + */
 +static int test_mb18452(ENGINE_HANDLE *h,
 +                        ENGINE_HANDLE_V1 *h1,
 +                        size_t numItems,
 +                        size_t yieldValue,
 +                        size_t batchSize) {
 +
 +    // 1. Setup the consumer params.
 +    std::string value = std::to_string(yieldValue);
 +    set_param(h, h1, protocol_binary_engine_param_dcp,
 +              "dcp_consumer_process_buffered_messages_yield_limit",
 +              value.c_str());
 +    value = std::to_string(batchSize);
 +    set_param(h, h1, protocol_binary_engine_param_dcp,
 +              "dcp_consumer_process_buffered_messages_batch_size",
 +              value.c_str());
 +
 +    const uint16_t vbid = 0;
 +    const uint32_t opaque = 0xFFFF0000;
 +    const uint32_t flags = 0;
 +    const void* cookie = testHarness.create_cookie();
 +
 +    // 2. We need to use a replica
 +    check(set_vbucket_state(h, h1, vbid, vbucket_state_replica),
 +          "Failed to set vbucket state.");
 +
 +    // 3. Force the engine to not run any NONIO tasks whilst we 'load up'
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "0");
 +
 +    // 4. Create a consumer and one stream for the vbucket
 +    std::string consumer("unittest");
 +    checkeq(h1->dcp.open(h,
 +                         cookie,
 +                         opaque,
 +                         0/*seqno*/,
 +                         flags,
 +                         (void*)consumer.c_str(),
 +                         consumer.length()),
 +            ENGINE_SUCCESS,
 +            "Failed dcp Consumer open connection.");
 +    add_stream_for_consumer(h, h1, cookie, opaque + 1, vbid, flags,
 +                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
 +
 +    uint32_t stream_opaque = get_int_stat(h, h1,
 +                                          "eq_dcpq:unittest:stream_0_opaque",
 +                                          "dcp");
 +    checkeq(ENGINE_SUCCESS,
 +            h1->dcp.snapshot_marker(h,
 +                                    cookie,
 +                                    stream_opaque,
 +                                    vbid,
 +                                    1,//snap start
 +                                    numItems,//snap end
 +                                    2), //flags
 +            "Failed to send snapshot marker");
 +
 +    for (uint64_t seqno = 1; seqno <= numItems; seqno++) {
 +        std::string key = "key" + std::to_string(seqno);
 +        checkeq(ENGINE_SUCCESS,
 +                h1->dcp.mutation(h,
 +                                 cookie,
 +                                 stream_opaque,
 +                                 key.c_str(),
 +                                 key.length(),
 +                                 "value", // item value
 +                                 sizeof("value"), // item value length
 +                                 seqno, // cas
 +                                 vbid, // vbucket
 +                                 0, // flags
 +                                 PROTOCOL_BINARY_RAW_BYTES,
 +                                 seqno, // bySeqno
 +                                 1, // revSeqno
 +                                 0, // expiration
 +                                 0, // locktime
 +                                 "", //meta
 +                                 0, // metalen
 +                                 INITIAL_NRU_VALUE),
 +                "Failed to send dcp mutation");
 +
 +        // At n - 1, enable NONIO tasks, the nth mutation will wake up the task.
 +        if (seqno == (numItems - 1)) {
 +               set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "1");
 +        }
 +    }
 +
 +    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", numItems);
 +
 +    // 3. Force the engine to not run any NONIO tasks whilst we 'count up'
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "0");
 +
 +    // Now we should count how many times the NONIO task ran
 +    // This is slighly racy, but if the task is yielding we expect many
 +    // runs from it, not a small number
 +    check(h1->get_stats(h, NULL, "dispatcher",
 +                        strlen("dispatcher"), add_stats) == ENGINE_SUCCESS,
 +                        "Failed to get worker stats");
 +
 +    // Count up how many times the Processing task was logged
 +    int count = 0;
 +    const std::string key1 = "nonio_worker_";
 +    const std::string key2 = "Processing buffered items for eq_dcpq:unittest";
 +    for (auto kv : vals) {
 +        if (kv.first.find(key1) != std::string::npos &&
 +            kv.second.find(key2) != std::string::npos) {
 +            count++;
 +        }
 +    }
 +
 +    // 4. Re-enable NONIO so we can shutdown
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "1");
 +    return count;
 +}
 +
 +/**
 + * Test the behaviour of DCP consumer under load.
 + * The consumer use a NONIO task to process data from an input buffer.
 + * This task when given lots of data should voluntarily yield if it finds
 + * itself running for n iterations...
 + */
 +static enum test_result test_mb18452_smallYield(ENGINE_HANDLE* h,
 +                                                 ENGINE_HANDLE_V1* h1) {
 +    const int batchSize = 10;
 +    const int numItems = 1000;
 +    const int yield = 10;
 +
 +    int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
 +
 +    // Before the ep-engine updates, the processor run count was usually 1 or 2
 +    // with the fix it's up around 80 (appears to saturate the log).
 +
 +    // So we check that it ran the same or more times than the numItems/(yield*batch)
 +    check(processorRuns >= (numItems / (yield * batchSize)),
 +          "DCP Processor ran less times than expected.");
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_mb18452_largeYield(ENGINE_HANDLE* h,
 +                                                ENGINE_HANDLE_V1* h1) {
 +    const int batchSize = 10;
 +    const int numItems = 10000;
 +    const int yield = 10000;
 +    int processorRuns =  test_mb18452(h, h1, numItems, yield, batchSize);
 +    // Here we expect very few yields, so very few runs (definitely not enough to fill
 +    // the task log (TASK_LOG_SIZE)
 +    check(processorRuns < 80,
 +          "DCP Processor ran more times than expected.");
 +
 +
 +    return SUCCESS;
 +}
 +
+ /**
+  * This test demonstrates bucket shutdown when there is a rogue
+  * backfill (whose producer and stream are already closed).
+  */
+ static enum test_result test_mb19153(ENGINE_HANDLE *h,
+                                      ENGINE_HANDLE_V1 *h1) {
+     putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
+     // Set max num AUX IO to 0, so no backfill would start
+     // immediately
+     set_param(h, h1, protocol_binary_engine_param_flush,
+               "max_num_auxio", "0");
+     int num_items = 10000;
+     for (int j = 0; j < num_items; ++j) {
+         item *i = NULL;
+         std::stringstream ss;
+         ss << "key-" << j;
+         check(store(h, h1, NULL, OPERATION_SET,
+                     ss.str().c_str(), "data", &i, 0, 0, 0, 0)
+                     == ENGINE_SUCCESS, "Failed to store a value");
+         h1->release(h, NULL, i);
+     }
+     const void *cookie = testHarness.create_cookie();
+     uint32_t flags = DCP_OPEN_PRODUCER;
+     const char *name = "unittest";
+     uint32_t opaque = 1;
+     uint64_t start = 0;
+     uint64_t end = num_items;
+     // Setup a producer connection
+     checkeq(ENGINE_SUCCESS,
+             h1->dcp.open(h, cookie, ++opaque, 0, flags,
+                          (void*)name, strlen(name)),
+             "Failed dcp Consumer open connection.");
+     // Initiate a stream request
+     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+     uint64_t rollback = 0;
+     checkeq(ENGINE_SUCCESS,
+             h1->dcp.stream_req(h, cookie, 0, opaque, 0, start, end,
+                                vb_uuid, 0, 0,
+                                &rollback, mock_dcp_add_failover_log),
+             "Expected success");
+     // Disconnect the producer
+     testHarness.destroy_cookie(cookie);
+     // Wait for ConnManager to clear out dead connections from dcpConnMap
+     wait_for_stat_to_be(h, h1, "ep_dcp_dead_conn_count", 0, "dcp");
+     // Set auxIO threads to 1, so the backfill for the closed producer
+     // is picked up, and begins to run.
+     set_param(h, h1, protocol_binary_engine_param_flush,
+               "max_num_auxio", "1");
+     // Terminate engine
+     return SUCCESS;
+ }
  static enum test_result prepare(engine_test_t *test) {
  #ifdef __sun
          // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@@ -15496,35 -12782,11 +15552,38 @@@ engine_test_t* get_tests(void) 
          TestCase("test MB-16357", test_mb16357,
                   test_setup, teardown, "compaction_exp_mem_threshold=85",
                   prepare, cleanup),
+         TestCase("test MB-19153", test_mb19153,
+                  test_setup, teardown, NULL, prepare, cleanup),
          TestCase("test dcp early termination", test_dcp_early_termination,
                   test_setup, teardown, NULL, prepare, cleanup),
 +
 +        TestCase("test MB-17517 CAS -1 DCP", test_mb17517_cas_minus_1_dcp,
 +                 test_setup, teardown, NULL, prepare, cleanup),
 +
 +        TestCase("test MB-17517 CAS -1 TAP", test_mb17517_cas_minus_1_tap,
 +                 test_setup, teardown, NULL, prepare, cleanup),
 +
 +        TestCase("test_mb17517_tap_with_locked_key",
 +                 test_mb17517_tap_with_locked_key, test_setup, teardown, NULL,
 +                 prepare, cleanup),
 +
 +        TestCase("test_mb19635_upgrade_from_25x",
 +                 test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
 +                 prepare, cleanup),
 +
 +        TestCase("test_set_dcp_param",
 +                 test_set_dcp_param, test_setup, teardown, NULL,
 +                 prepare, cleanup),
 +
 +        TestCase("test_mb18452_largeYield",
 +                 test_mb18452_largeYield, test_setup, teardown, "max_num_nonio=1",
 +                 prepare, cleanup),
 +
 +        TestCase("test_mb18452_smallYield",
 +                 test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
 +                 prepare, cleanup),
 +
          TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
      };