MB-19678: Merge backfill and in-memory snapshots correctly on replica vb 07/64207/3
authorManu Dhundi <manu@couchbase.com>
Thu, 19 May 2016 00:47:45 +0000 (17:47 -0700)
committerManu Dhundi <manu@couchbase.com>
Fri, 20 May 2016 00:34:30 +0000 (00:34 +0000)
When a DCP client, on replica vb, opens a stream which it intends to
keep open forever, merge the backfill and in-memory snapshots by using the
the checkpoint snapshot_end as snapshot_end_seqno.

Change-Id: Ic05a59ccafa54bbee19882707404a78c47002be7
Reviewed-on: http://review.couchbase.org/64207
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Manu Dhundi <manu@couchbase.com>
docs/stats.org
src/dcp-stream.cc
src/dcp-stream.h

index 2aa7f09..e99e19e 100644 (file)
@@ -651,26 +651,27 @@ another colon.  For example, if your client is named, =slave1=, the
 
 ****Per Stream Stats
 
-| backfilled         | The amount of items sent from disk                     |
-| end_seqno          | The seqno send mutations up to                         |
-| flags              | The flags supplied in the stream request               |
-| items_ready        | Whether the stream has items ready to send             |
-| last_sent_seqno    | The last seqno sent by this stream                     |
-| last_read_seqno    | The last seqno read by this stream from disk or memory |
-| ready_queue_memory | Memory occupied by elements in the DCP readyQ          |
-| memory             | The amount of items sent from memory                   |
-| opaque             | The unique stream identifier                           |
-| snap_end_seqno     | The last snapshot end seqno (Used if a consumer is     |
-|                    | resuming a stream)                                     |
-| snap_start_seqno   | The last snapshot start seqno (Used if a consumer is   |
-|                    | resuming a stream)                                     |
-| start_seqno        | The seqno to start sending mutations from              |
-| state              | The stream state (pending, backfilling, in-memory,     |
-|                    | takeover-send, takeover-wait, or dead)                 |
-| vb_uuid            | The vb uuid used in the stream request                 |
-| cur_snapshot_type  | The type of the current snapshot being received        |
-| cur_snapshot_start | The start seqno of the current snapshot being received |
-| cur_snapshot_end   | The end seqno of the current snapshot being received   |
+| backfilled               | The amount of items sent from disk                     |
+| end_seqno                | The seqno send mutations up to                         |
+| flags                    | The flags supplied in the stream request               |
+| items_ready              | Whether the stream has items ready to send             |
+| last_sent_seqno          | The last seqno sent by this stream                     |
+| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream      |
+| last_read_seqno          | The last seqno read by this stream from disk or memory |
+| ready_queue_memory       | Memory occupied by elements in the DCP readyQ          |
+| memory                   | The amount of items sent from memory                   |
+| opaque                   | The unique stream identifier                           |
+| snap_end_seqno           | The last snapshot end seqno (Used if a consumer is     |
+|                          | resuming a stream)                                     |
+| snap_start_seqno         | The last snapshot start seqno (Used if a consumer is   |
+|                          | resuming a stream)                                     |
+| start_seqno              | The seqno to start sending mutations from              |
+| state                    | The stream state (pending, backfilling, in-memory,     |
+|                          | takeover-send, takeover-wait, or dead)                 |
+| vb_uuid                  | The vb uuid used in the stream request                 |
+| cur_snapshot_type        | The type of the current snapshot being received        |
+| cur_snapshot_start       | The start seqno of the current snapshot being received |
+| cur_snapshot_end         | The end seqno of the current snapshot being received   |
 
 ** Dcp Aggregated Stats
 
index 7f9b5e5..ca1f7ba 100644 (file)
@@ -370,11 +370,15 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
         if (end_seqno_ > endSeqno) {
             /* We possibly have items in the open checkpoint
                (incomplete snapshot) */
-            LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
-                " and memory snapshot for a replica vbucket, start seqno "
-                "%" PRIu64 " and end seqno %" PRIu64 "",
-                producer->logHeader(), vb_, startSeqno, endSeqno);
-            endSeqno = end_seqno_;
+            uint64_t snapshot_start, snapshot_end;
+            vb->getCurrentSnapshot(snapshot_start, snapshot_end);
+            LOG(EXTENSION_LOG_WARNING,
+                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
+                "replica vbucket, backfill start seqno %" PRIu64 ", "
+                "backfill end seqno %" PRIu64 ", "
+                "snapshot end seqno after merge %" PRIu64,
+                vb_, startSeqno, endSeqno, snapshot_end);
+            endSeqno = snapshot_end;
         }
     }
 
@@ -383,14 +387,11 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
         endSeqno);
     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
                                     MARKER_FLAG_DISK));
-    lastSentSnapEndSeqno = endSeqno;
+    lastSentSnapEndSeqno.store(endSeqno);
 
     if (!vb) {
         endStream(END_STREAM_STATE);
     } else {
-        if (endSeqno > end_seqno_) {
-            chkCursorSeqno = end_seqno_;
-        }
         // Only re-register the cursor if we still need to get memory snapshots
         CursorRegResult result =
             vb->checkpointManager.registerTAPCursorBySeqno(name_,
@@ -573,6 +574,9 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
     add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
+    snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
+             name_.c_str(), vb_);
+    add_casted_stat(buffer, lastSentSnapEndSeqno.load(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
     add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
@@ -805,7 +809,7 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
         }
         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
                                         flags));
-        lastSentSnapEndSeqno = snapEnd;
+        lastSentSnapEndSeqno.store(snapEnd);
     }
 
     std::deque<MutationResponse*>::iterator itemItr;
@@ -1030,7 +1034,7 @@ bool ActiveStream::isCurrentSnapshotCompleted() const
     // An atomic read of vbucket state without acquiring the
     // reader lock for state should suffice here.
     if (vbucket && vbucket->getState() == vbucket_state_replica) {
-        if (lastSentSnapEndSeqno >= lastReadSeqno) {
+        if (lastSentSnapEndSeqno.load() >= lastReadSeqno) {
             return false;
         }
     }
index f9c7288..81b5a62 100644 (file)
@@ -281,7 +281,7 @@ private:
     AtomicValue<bool> isBackfillTaskRunning;
 
     //! Last snapshot end seqno sent to the DCP client
-    uint64_t lastSentSnapEndSeqno;
+    AtomicValue<uint64_t> lastSentSnapEndSeqno;
 
     /* Flag used by checkpointCreatorTask that is set before all items are
        extracted for given checkpoint cursor, and is unset after all retrieved