Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / dcp-stream.h
index ad8b29b..3326d52 100644 (file)
 
 #include "config.h"
 
+#include "atomic.h"
+#include "dcp-stream.h"
+#include "dcp-producer.h"
 #include "ep_engine.h"
+#include "ext_meta_parser.h"
+#include "vbucket.h"
 
 #include <queue>
 
@@ -36,6 +41,9 @@ typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
 class DcpProducer;
 typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
 
+class Stream;
+typedef SingleThreadedRCPtr<Stream> stream_t;
+
 typedef enum {
     STREAM_PENDING,
     STREAM_BACKFILLING,
@@ -75,6 +83,11 @@ typedef enum {
     cannot_process
 } process_items_error_t;
 
+typedef enum {
+    BACKFILL_FROM_MEMORY,
+    BACKFILL_FROM_DISK
+} backfill_source_t;
+
 class Stream : public RCValue {
 public:
     Stream(const std::string &name, uint32_t flags, uint32_t opaque,
@@ -82,7 +95,7 @@ public:
            uint64_t vb_uuid, uint64_t snap_start_seqno,
            uint64_t snap_end_seqno);
 
-    virtual ~Stream() {}
+    virtual ~Stream();
 
     uint32_t getFlags() { return flags_; }
 
@@ -161,7 +174,6 @@ private:
     AtomicValue <uint64_t> readyQueueMemory;
 };
 
-typedef RCPtr<Stream> stream_t;
 
 class ActiveStreamCheckpointProcessorTask;
 
@@ -173,10 +185,7 @@ public:
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
                  uint64_t snap_end_seqno);
 
-    ~ActiveStream() {
-        transitionState(STREAM_DEAD);
-        clear_UNLOCKED();
-    }
+    ~ActiveStream();
 
     DcpResponse* next();
 
@@ -201,7 +210,7 @@ public:
 
     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
 
-    void backfillReceived(Item* itm);
+    bool backfillReceived(Item* itm, backfill_source_t backfill_source);
 
     void completeBackfill();
 
@@ -218,11 +227,11 @@ public:
 
 protected:
     // Returns the outstanding items for the stream's checkpoint cursor.
-    void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
+    void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
 
     // Given a set of queued items, create mutation responses for each item,
     // and pass onto the producer associated with this stream.
-    void processItems(std::deque<queued_item>& items);
+    void processItems(std::vector<queued_item>& items);
 
     bool nextCheckpointItem();
 
@@ -250,6 +259,9 @@ private:
 
     const char* getEndStreamStatusStr(end_stream_status_t status);
 
+    ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId,
+                                              uint8_t conflictResMode);
+
     bool isCurrentSnapshotCompleted() const;
 
     //! The last sequence number queued from disk or memory
@@ -266,11 +278,14 @@ private:
      * must acquire the streamMutex lock.
      */
     AtomicValue <size_t> backfillRemaining;
-
-    //! The amount of items that have been read from disk
-    size_t itemsFromBackfill;
-    //! The amount of items that have been read from memory
-    AtomicValue<size_t> itemsFromMemory;
+    //! Stats to track items read and sent from the backfill phase
+    struct {
+        AtomicValue<size_t> memory;
+        AtomicValue<size_t> disk;
+        AtomicValue<size_t> sent;
+    } backfillItems;
+    //! The amount of items that have been sent during the memory phase
+    size_t itemsFromMemoryPhase;
     //! Whether ot not this is the first snapshot marker sent
     bool firstMarkerSent;
 
@@ -280,6 +295,11 @@ private:
     dcp_producer_t producer;
     AtomicValue<bool> isBackfillTaskRunning;
 
+    struct {
+        AtomicValue<uint32_t> bytes;
+        AtomicValue<uint32_t> items;
+    } bufferedBackfill;
+
     //! Last snapshot end seqno sent to the DCP client
     AtomicValue<uint64_t> lastSentSnapEndSeqno;
 
@@ -294,7 +314,8 @@ private:
 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
 public:
     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
-      : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
+        : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
+                     INT_MAX, false),
       notified(false),
       iterationsBeforeYield(e.getConfiguration()
                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
@@ -357,9 +378,7 @@ public:
                    uint64_t snap_end_seqno);
 
     ~NotifierStream() {
-        LockHolder lh(streamMutex);
         transitionState(STREAM_DEAD);
-        clear_UNLOCKED();
     }
 
     DcpResponse* next();
@@ -385,7 +404,8 @@ public:
 
     ~PassiveStream();
 
-    process_items_error_t processBufferedMessages(uint32_t &processed_bytes);
+    process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
+                                                  size_t batchSize);
 
     DcpResponse* next();
 
@@ -400,30 +420,25 @@ public:
 
     void addStats(ADD_STAT add_stat, const void *c);
 
-    static const size_t batchSize;
-
 private:
 
     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
 
-    ENGINE_ERROR_CODE commitMutation(MutationResponse* mutation,
-                                     bool backfillPhase);
-
     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
 
-    ENGINE_ERROR_CODE commitDeletion(MutationResponse* deletion,
-                                     bool backfillPhase);
-
     void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
 
     void processMarker(SnapshotMarker* marker);
 
     void processSetVBucketState(SetVBucketState* state);
 
-    void transitionState(stream_state_t newState);
+    bool transitionState(stream_state_t newState);
 
     uint32_t clearBuffer();
 
+    uint32_t setDead_UNLOCKED(end_stream_status_t status,
+                              LockHolder *slh);
+
     const char* getEndStreamStatusStr(end_stream_status_t status);
 
     EventuallyPersistentEngine* engine;
@@ -434,7 +449,6 @@ private:
     AtomicValue<uint64_t> cur_snapshot_end;
     AtomicValue<snapshot_type_t> cur_snapshot_type;
     bool cur_snapshot_ack;
-    bool saveSnapshot;
 
     struct Buffer {
         Buffer() : bytes(0), items(0) {}