#include "locks.h"
#include "syncobject.h"
#include "atomicqueue.h"
+#include "dcp-consumer.h"
+#include "dcp-producer.h"
// Forward declaration
class ConnNotifier;
class TapConsumer;
class TapProducer;
-class DcpConsumer;
-class DcpProducer;
class Item;
class EventuallyPersistentEngine;
#include "tapconnection.h"
#include "dcp-stream.h"
+typedef RCPtr<PassiveStream> passive_stream_t;
-class PassiveStream;
class DcpResponse;
class DcpConsumer : public Consumer, public Notifiable {
#include "tapconnection.h"
#include "dcp-stream.h"
+typedef SingleThreadedRCPtr<Stream> stream_t;
+
class DcpResponse;
class BufferLog {
add_casted_stat(buffer, stateName(state_), add_stat, c);
}
-ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
const std::string &n, uint32_t flags,
uint32_t opaque, uint16_t vb, uint64_t st_seqno,
uint64_t en_seqno, uint64_t vb_uuid,
return producer->logHeader();
}
-NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
+NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
const std::string &name, uint32_t flags,
uint32_t opaque, uint16_t vb, uint64_t st_seqno,
uint64_t en_seqno, uint64_t vb_uuid,
state_ = newState;
}
-PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
+PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
const std::string &name, uint32_t flags,
uint32_t opaque, uint16_t vb, uint64_t st_seqno,
uint64_t en_seqno, uint64_t vb_uuid,
class MutationResponse;
class SetVBucketState;
class SnapshotMarker;
+class DcpResponse;
+
class DcpConsumer;
+typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
+
class DcpProducer;
-class DcpResponse;
+typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
typedef enum {
STREAM_PENDING,
class ActiveStream : public Stream {
public:
- ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+ ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
const std::string &name, uint32_t flags, uint32_t opaque,
uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
uint64_t vb_uuid, uint64_t snap_start_seqno,
int waitForSnapshot;
EventuallyPersistentEngine* engine;
- DcpProducer* producer;
+ dcp_producer_t producer;
bool isBackfillTaskRunning;
};
class NotifierStream : public Stream {
public:
- NotifierStream(EventuallyPersistentEngine* e, DcpProducer* producer,
+ NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
const std::string &name, uint32_t flags, uint32_t opaque,
uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
uint64_t vb_uuid, uint64_t snap_start_seqno,
void transitionState(stream_state_t newState);
- DcpProducer* producer;
+ dcp_producer_t producer;
};
class PassiveStream : public Stream {
public:
- PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* consumer,
+ PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
const std::string &name, uint32_t flags, uint32_t opaque,
uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
uint64_t vb_uuid, uint64_t snap_start_seqno,
const char* getEndStreamStatusStr(end_stream_status_t status);
EventuallyPersistentEngine* engine;
- DcpConsumer* consumer;
+ dcp_consumer_t consumer;
uint64_t last_seqno;
uint64_t cur_snapshot_start;
} buffer;
};
-typedef SingleThreadedRCPtr<Stream> stream_t;
-typedef RCPtr<PassiveStream> passive_stream_t;
-
#endif // SRC_DCP_STREAM_H_
size_t maxReaders, size_t maxWriters,
size_t maxAuxIO, size_t maxNonIO) :
numTaskSets(nTaskSets), totReadyTasks(0),
- isHiPrioQset(false), isLowPrioQset(false), numBuckets(0) {
+ isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
+ numSleepers(0) {
size_t numCPU = getNumCPU();
size_t numThreads = (size_t)((numCPU * 3)/4);
numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
return SUCCESS;
}
+/*
+ * Test that destroying a DCP producer before it ends
+ * works. MB-16915 reveals itself via valgrind.
+ */
+static enum test_result test_dcp_early_termination(ENGINE_HANDLE* h,
+ ENGINE_HANDLE_V1* h1) {
+
+
+ // create enough streams that some backfill tasks should overlap
+ // with the connection deletion task.
+ const int streams = 100;
+
+ // 1 item so that we will at least allow backfill to be scheduled
+ const int num_items = 1;
+ uint64_t vbuuid[streams];
+ for (int i = 0; i < streams; i++) {
+
+ check(set_vbucket_state(h, h1, i, vbucket_state_active),
+ "Failed to set vbucket state");
+ std::stringstream statkey;
+ statkey << "vb_" << i << ":0:id";
+ vbuuid[i] = get_ull_stat(h, h1, statkey.str().c_str(), "failovers");
+
+ /* Set n items */
+
+ for (int count = 0; count < num_items; count++) {
+ std::stringstream key;
+ key << "KEY" << i << count;
+ check(ENGINE_SUCCESS ==
+ store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
+ "somevalue", NULL, 0, i, 0), "Error storing.");
+ }
+ }
+ wait_for_flusher_to_settle(h, h1);
+
+ const void *cookie = testHarness.create_cookie();
+ uint32_t opaque = 1;
+ check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
+ (void*)"unittest", strlen("unittest")) == ENGINE_SUCCESS,
+ "Failed dcp producer open connection.");
+
+ check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size",
+ strlen("connection_buffer_size"),
+ "1024", 4) == ENGINE_SUCCESS,
+ "Failed to establish connection buffer");
+
+ struct dcp_message_producers* producers = get_dcp_producers();
+ for (int i = 0; i < streams; i++) {
+ uint64_t rollback = 0;
+ check(h1->dcp.stream_req(h, cookie, DCP_ADD_STREAM_FLAG_DISKONLY,
+ ++opaque, i, 0, num_items,
+ vbuuid[i], 0, num_items, &rollback,
+ mock_dcp_add_failover_log)
+ == ENGINE_SUCCESS,
+ "Failed to initiate stream request");
+ h1->dcp.step(h, cookie, producers);
+ }
+
+ // Destroy the connection
+ testHarness.destroy_cookie(cookie);
+
+ // Let all AUXIO (backfills) finish
+ wait_for_stat_to_be(h, h1, "ep_workload:LowPrioQ_AuxIO:InQsize", 0, "workload");
+ wait_for_stat_to_be(h, h1, "ep_workload:LowPrioQ_AuxIO:OutQsize", 0, "workload");
+ return SUCCESS;
+}
+
extern "C" {
static void wait_for_persistence_thread(void *arg) {
struct handle_pair *hp = static_cast<handle_pair *>(arg);
TestCase("test MB-16357", test_mb16357,
test_setup, teardown, "compaction_exp_mem_threshold=85",
prepare, cleanup),
+ TestCase("test dcp early termination", test_dcp_early_termination,
+ test_setup, teardown, NULL, prepare, cleanup),
TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
};