#include "config.h"
+#include "atomic.h"
+#include "dcp-stream.h"
+#include "dcp-producer.h"
#include "ep_engine.h"
+#include "ext_meta_parser.h"
+#include "vbucket.h"
#include <queue>
class DcpProducer;
typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
+class Stream;
+typedef SingleThreadedRCPtr<Stream> stream_t;
+
typedef enum {
STREAM_PENDING,
STREAM_BACKFILLING,
cannot_process
} process_items_error_t;
+typedef enum {
+ BACKFILL_FROM_MEMORY,
+ BACKFILL_FROM_DISK
+} backfill_source_t;
+
class Stream : public RCValue {
public:
Stream(const std::string &name, uint32_t flags, uint32_t opaque,
uint64_t vb_uuid, uint64_t snap_start_seqno,
uint64_t snap_end_seqno);
- virtual ~Stream() {}
+ virtual ~Stream();
uint32_t getFlags() { return flags_; }
AtomicValue <uint64_t> readyQueueMemory;
};
-typedef RCPtr<Stream> stream_t;
class ActiveStreamCheckpointProcessorTask;
uint64_t vb_uuid, uint64_t snap_start_seqno,
uint64_t snap_end_seqno);
- ~ActiveStream() {
- transitionState(STREAM_DEAD);
- clear_UNLOCKED();
- }
+ ~ActiveStream();
DcpResponse* next();
void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
- void backfillReceived(Item* itm);
+ bool backfillReceived(Item* itm, backfill_source_t backfill_source);
void completeBackfill();
protected:
// Returns the outstanding items for the stream's checkpoint cursor.
- void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
+ void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
// Given a set of queued items, create mutation responses for each item,
// and pass onto the producer associated with this stream.
- void processItems(std::deque<queued_item>& items);
+ void processItems(std::vector<queued_item>& items);
bool nextCheckpointItem();
const char* getEndStreamStatusStr(end_stream_status_t status);
+ ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId,
+ uint8_t conflictResMode);
+
bool isCurrentSnapshotCompleted() const;
//! The last sequence number queued from disk or memory
* must acquire the streamMutex lock.
*/
AtomicValue <size_t> backfillRemaining;
-
- //! The amount of items that have been read from disk
- size_t itemsFromBackfill;
- //! The amount of items that have been read from memory
- AtomicValue<size_t> itemsFromMemory;
+ //! Stats to track items read and sent from the backfill phase
+ struct {
+ AtomicValue<size_t> memory;
+ AtomicValue<size_t> disk;
+ AtomicValue<size_t> sent;
+ } backfillItems;
+ //! The amount of items that have been sent during the memory phase
+ size_t itemsFromMemoryPhase;
//! Whether ot not this is the first snapshot marker sent
bool firstMarkerSent;
dcp_producer_t producer;
AtomicValue<bool> isBackfillTaskRunning;
+ struct {
+ AtomicValue<uint32_t> bytes;
+ AtomicValue<uint32_t> items;
+ } bufferedBackfill;
+
//! Last snapshot end seqno sent to the DCP client
AtomicValue<uint64_t> lastSentSnapEndSeqno;
class ActiveStreamCheckpointProcessorTask : public GlobalTask {
public:
ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
- : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
+ : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
+ INT_MAX, false),
notified(false),
iterationsBeforeYield(e.getConfiguration()
.getDcpProducerSnapshotMarkerYieldLimit()) { }
uint64_t snap_end_seqno);
~NotifierStream() {
- LockHolder lh(streamMutex);
transitionState(STREAM_DEAD);
- clear_UNLOCKED();
}
DcpResponse* next();
~PassiveStream();
- process_items_error_t processBufferedMessages(uint32_t &processed_bytes);
+ process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
+ size_t batchSize);
DcpResponse* next();
void addStats(ADD_STAT add_stat, const void *c);
- static const size_t batchSize;
-
private:
ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
- ENGINE_ERROR_CODE commitMutation(MutationResponse* mutation,
- bool backfillPhase);
-
ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
- ENGINE_ERROR_CODE commitDeletion(MutationResponse* deletion,
- bool backfillPhase);
-
void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
void processMarker(SnapshotMarker* marker);
void processSetVBucketState(SetVBucketState* state);
- void transitionState(stream_state_t newState);
+ bool transitionState(stream_state_t newState);
uint32_t clearBuffer();
+ uint32_t setDead_UNLOCKED(end_stream_status_t status,
+ LockHolder *slh);
+
const char* getEndStreamStatusStr(end_stream_status_t status);
EventuallyPersistentEngine* engine;
AtomicValue<uint64_t> cur_snapshot_end;
AtomicValue<snapshot_type_t> cur_snapshot_type;
bool cur_snapshot_ack;
- bool saveSnapshot;
struct Buffer {
Buffer() : bytes(0), items(0) {}