Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / connmap.h
index a4e0dc2..6979037 100644 (file)
@@ -254,7 +254,8 @@ protected:
 class ConnNotifier {
 public:
     ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
-        : notifier_type(ntype), connMap(cm), pendingNotification(false)  { }
+        : notifier_type(ntype), connMap(cm), task(0),
+          pendingNotification(false)  { }
 
     void start();
 
@@ -444,9 +445,7 @@ private:
 
 };
 
-
 class DcpConnMap : public ConnMap {
-
 public:
 
     DcpConnMap(EventuallyPersistentEngine &engine);
@@ -469,6 +468,8 @@ public:
 
     void notifyVBConnections(uint16_t vbid, uint64_t bySeqno);
 
+    void notifyBackfillManagerTasks();
+
     void removeVBConnections(connection_t &conn);
 
     void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
@@ -479,10 +480,44 @@ public:
 
     void manageConnections();
 
+    bool canAddBackfillToActiveQ();
+
+    void decrNumActiveSnoozingBackfills();
+
+    void updateMaxActiveSnoozingBackfills(size_t maxDataSize);
+
+    uint16_t getNumActiveSnoozingBackfills () const {
+        return numActiveSnoozingBackfills;
+    }
+
+    uint16_t getMaxActiveSnoozingBackfills () const {
+        return maxActiveSnoozingBackfills;
+    }
+
+    size_t getAggrDcpConsumerBufferSize () const {
+        return aggrDcpConsumerBufferSize.load();
+    }
+
+    void incAggrDcpConsumerBufferSize (size_t bufSize) {
+        aggrDcpConsumerBufferSize.fetch_add(bufSize);
+    }
+
+    void decAggrDcpConsumerBufferSize (size_t bufSize) {
+        aggrDcpConsumerBufferSize.fetch_sub(bufSize);
+    }
+
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
 
-    void addStats(ADD_STAT add_stat, const void *c);
+    /*
+     * Change the value at which a DcpConsumer::Processor task will yield
+     */
+    void consumerYieldConfigChanged(size_t newValue);
+
+    /*
+     * Change the batchsize that the DcpConsumer::Processor operates with
+     */
+    void consumerBatchSizeConfigChanged(size_t newValue);
 
 private:
 
@@ -492,6 +527,27 @@ private:
 
     // Guarded by the parent's classes `connsLock`
     std::list<connection_t> deadConnections;
+
+    SpinLock numBackfillsLock;
+    /* Db file memory */
+    static const uint32_t dbFileMem;
+    uint16_t numActiveSnoozingBackfills;
+    uint16_t maxActiveSnoozingBackfills;
+    /* Max num of backfills we want to have irrespective of memory */
+    static const uint16_t numBackfillsThreshold;
+    /* Max percentage of memory we want backfills to occupy */
+    static const uint8_t numBackfillsMemThreshold;
+    /* Total memory used by all DCP consumer buffers */
+    AtomicValue<size_t> aggrDcpConsumerBufferSize;
+
+    class DcpConfigChangeListener : public ValueChangedListener {
+    public:
+        DcpConfigChangeListener(DcpConnMap& connMap);
+        virtual ~DcpConfigChangeListener() { }
+        virtual void sizeValueChanged(const std::string &key, size_t value);
+    private:
+        DcpConnMap& myConnMap;
+    };
 };