MB-19678: Merge backfill and in-memory snapshots correctly on replica vb 92/64192/4
authorManu Dhundi <manu@couchbase.com>
Tue, 17 May 2016 18:55:28 +0000 (11:55 -0700)
committerDave Rigby <daver@couchbase.com>
Fri, 20 May 2016 11:15:19 +0000 (11:15 +0000)
When a DCP client, on replica vb, opens a stream which it intends to
keep open forever, merge the backfill and in-memory snapshots by using the
the checkpoint snapshot_end as snapshot_end_seqno.

Change-Id: Ic05a59ccafa54bbee19882707404a78c47002be7
Reviewed-on: http://review.couchbase.org/64192
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Manu Dhundi <manu@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
docs/stats.org
src/dcp-stream.cc
src/dcp-stream.h

index b517318..56ef5c6 100644 (file)
@@ -671,29 +671,30 @@ another colon.  For example, if your client is named, =slave1=, the
 
 ****Per Stream Stats
 
-| backfill_disk_items | The amount of items read during backfill from disk    |
-| backfill_mem_items  | The amount of items read during backfill from memory  |
-| backfill_sent       | The amount of items sent to the consumer during the   |
-| end_seqno           | The seqno send mutations up to                        |
-| flags               | The flags supplied in the stream request              |
-| items_ready         | Whether the stream has items ready to send            |
-| last_sent_seqno     | The last seqno sent by this stream                    |
-| last_read_seqno     | The last seqno read by this stream from disk or memory|
-| ready_queue_memory  | Memory occupied by elements in the DCP readyQ         |
-| memory_phase        | The amount of items sent during the memory phase      |
-| opaque              | The unique stream identifier                          |
-| snap_end_seqno      | The last snapshot end seqno (Used if a consumer is    |
-|                     | resuming a stream)                                    |
-| snap_start_seqno    | The last snapshot start seqno (Used if a consumer is  |
-|                     | resuming a stream)                                    |
-| start_seqno         | The seqno to start sending mutations from             |
-| state               | The stream state (pending, backfilling, in-memory,    |
-|                     | takeover-send, takeover-wait, or dead)                |
-| vb_uuid             | The vb uuid used in the stream request                |
-| cur_snapshot_type   | The type of the current snapshot being received       |
-| cur_snapshot_start  | The start seqno of the current snapshot being         |
-|                     | received                                              |
-| cur_snapshot_end    | The end seqno of the current snapshot being received  |
+| backfill_disk_items      | The amount of items read during backfill from disk    |
+| backfill_mem_items       | The amount of items read during backfill from memory  |
+| backfill_sent            | The amount of items sent to the consumer during the   |
+| end_seqno                | The seqno send mutations up to                        |
+| flags                    | The flags supplied in the stream request              |
+| items_ready              | Whether the stream has items ready to send            |
+| last_sent_seqno          | The last seqno sent by this stream                    |
+| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream     |
+| last_read_seqno          | The last seqno read by this stream from disk or memory|
+| ready_queue_memory       | Memory occupied by elements in the DCP readyQ         |
+| memory_phase             | The amount of items sent during the memory phase      |
+| opaque                   | The unique stream identifier                          |
+| snap_end_seqno           | The last snapshot end seqno (Used if a consumer is    |
+|                          | resuming a stream)                                    |
+| snap_start_seqno         | The last snapshot start seqno (Used if a consumer is  |
+|                          | resuming a stream)                                    |
+| start_seqno              | The seqno to start sending mutations from             |
+| state                    | The stream state (pending, backfilling, in-memory,    |
+|                          | takeover-send, takeover-wait, or dead)                |
+| vb_uuid                  | The vb uuid used in the stream request                |
+| cur_snapshot_type        | The type of the current snapshot being received       |
+| cur_snapshot_start       | The start seqno of the current snapshot being         |
+|                          | received                                              |
+| cur_snapshot_end         | The end seqno of the current snapshot being received  |
 
 ** Dcp Aggregated Stats
 
index 8e44eb2..4962bf1 100644 (file)
@@ -236,11 +236,14 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
         if (end_seqno_ > endSeqno) {
             /* We possibly have items in the open checkpoint
                (incomplete snapshot) */
-            LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
-                " and memory snapshot for a replica vbucket, start seqno "
-                "%" PRIu64 " and end seqno %" PRIu64 "",
-                producer->logHeader(), vb_, startSeqno, endSeqno);
-            endSeqno = end_seqno_;
+            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+            LOG(EXTENSION_LOG_WARNING,
+                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
+                "replica vbucket, backfill start seqno %" PRIu64 ", "
+                "backfill end seqno %" PRIu64 ", "
+                "snapshot end seqno after merge %" PRIu64,
+                vb_, startSeqno, endSeqno, info.range.end);
+            endSeqno = info.range.end;
         }
     }
 
@@ -249,14 +252,11 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
         endSeqno);
     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
                                     MARKER_FLAG_DISK));
-    lastSentSnapEndSeqno = endSeqno;
+    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
 
     if (!vb) {
         endStream(END_STREAM_STATE);
     } else {
-        if (endSeqno > end_seqno_) {
-            chkCursorSeqno = end_seqno_;
-        }
         // Only re-register the cursor if we still need to get memory snapshots
         CursorRegResult result =
             vb->checkpointManager.registerCursorBySeqno(name_, chkCursorSeqno);
@@ -456,6 +456,11 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
     add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
     add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+    snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
+             name_.c_str(), vb_);
+    add_casted_stat(buffer,
+                    lastSentSnapEndSeqno.load(std::memory_order_relaxed),
+                    add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
     add_casted_stat(buffer, lastReadSeqno, add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
@@ -694,7 +699,7 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
         }
         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
                                         flags));
-        lastSentSnapEndSeqno = snapEnd;
+        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
     }
 
     std::deque<MutationResponse*>::iterator itemItr;
@@ -942,7 +947,8 @@ bool ActiveStream::isCurrentSnapshotCompleted() const
     // An atomic read of vbucket state without acquiring the
     // reader lock for state should suffice here.
     if (vbucket && vbucket->getState() == vbucket_state_replica) {
-        if (lastSentSnapEndSeqno >= lastReadSeqno) {
+        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
+            lastReadSeqno) {
             return false;
         }
     }
index 85a57f1..b6c9e0d 100644 (file)
@@ -294,7 +294,7 @@ private:
     } bufferedBackfill;
 
     //! Last snapshot end seqno sent to the DCP client
-    uint64_t lastSentSnapEndSeqno;
+    AtomicValue<uint64_t> lastSentSnapEndSeqno;
 
     /* Flag used by checkpointCreatorTask that is set before all items are
        extracted for given checkpoint cursor, and is unset after all retrieved