Buff

Buff

A simple observer implementation with customizable allocer and customizable
messages!
Maybe we need a producer-consumer .. here just implements in observer, i.e. publisher-subsciber!
And use in a single process!

#include <atomic>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <thread>
#include <mutex>
#include "boost/shared_ptr.hpp"
#include "boost/utility/string_ref.hpp"
#include "boost/bind.hpp"
#include "gtest/gtest.h"

using Data = boost::shared_ptr<uint8_t[]>;

class BaseMsg {
public:
    virtual ~BaseMsg() noexcept {}
    inline explicit operator bool() const noexcept
    {
        return this->data != nullptr;
    }
    inline void setData(const Data& data) noexcept
    {
        this->data = data;
    }
    inline void setSize(int32_t size) noexcept
    {
        this->size = size;
    }
    inline Data getData() noexcept
    {
        return this->data;
    }
    inline int32_t getSize() noexcept
    {
        return this->size;
    }
protected:
    Data data{ nullptr };
    int32_t size{ 0 };
    uint32_t seq{ 0 };///< Seq-number, TODO: use this field!
    double stamp{ 0.0 };///< TODO: maybe also use this field!
};
/// Here shared_ptr not very necessary... expect sub-msgs has many customs fields!
using BaseMsgConstPtr = boost::shared_ptr<const BaseMsg>;

// Forward declaretions:
// Producer and ProducerPtr
template<typename Msg, typename Allocer>
class Producer;
template<typename Msg, typename Allocer>
using ProducerPtr = boost::shared_ptr<Producer<Msg, Allocer> >;
// BaseConsumer and BaseConsumerPtr
class BaseConsumer;
using BaseConsumerPtr = boost::shared_ptr<BaseConsumer>;

template<typename Msg>
class Consumer;

class BaseAllocer {
public:
    virtual ~BaseAllocer() noexcept {}
    virtual Data alloc(int32_t /*bytes*/) noexcept
    {
        return nullptr;
    }
};
using BaseAllocerPtr = boost::shared_ptr<BaseAllocer>;

/// The core handler, dispatcher:
template<typename Allocer>
class Dispatcher {
public:
    // Allocer check
    static_assert(
        std::is_base_of<BaseAllocer, typename std::decay<Allocer>::type>::value,
        "BaseAllocer should be base of Allocer");

    Dispatcher(const BaseAllocerPtr& allocer) noexcept : allocer(allocer) {}
    template<typename Msg>
    ProducerPtr<Msg, Allocer> advertise(int32_t channel);
    template<typename Msg>
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    template<typename Msg>
    boost::shared_ptr<Consumer<Msg> > subscribe(
        int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept;

    Data alloc(int32_t bytes) noexcept;
    template<typename Msg>
    void publish(int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept;

    // TODO: dispatch(), e.g. in a or multi threads to dispatch buffers to all subscibers!

private:
    BaseAllocerPtr allocer{ nullptr };
    std::unordered_multimap<int32_t, BaseConsumerPtr> subscibers;
};

// 1. produce(), 2. publish()
template<typename Msg, typename Allocer>
class Producer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");
    Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept;
    /// Produce a buffer to fill message.
    boost::shared_ptr<Msg> produce(int32_t bytes);
    inline void publish(const boost::shared_ptr<const Msg>& msg) noexcept;
private:
    const int32_t channel;
    Dispatcher<Allocer>& handle;
};

/// Consumer, i.e. subsciber.
class BaseConsumer {
public:
    virtual void callback(const BaseMsgConstPtr&) {}
};

template<typename Msg>
class Consumer : public BaseConsumer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    Consumer() noexcept {}
    Consumer(int32_t qsz) noexcept : qsz(qsz) {}

    // If callback setup then auto callback!
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    inline void setCb(const Cb& cb) noexcept
    {
        this->cb = cb;
    }
    inline Cb& getCb() noexcept
    {
        return this->cb;
    }
    inline void setConnected(bool connected) noexcept
    {
        this->connected = connected;
    }
    inline bool isConnected() const noexcept
    {
        return this->connected;
    }

    // If not set callback, you can deque datas
    bool deque(Msg& m) noexcept
    {
        std::lock_guard<std::mutex> lock(qMutex);
        if (q.empty()) {
            return false;
        }
        auto qMsg = q.front();
        q.pop_front();
        boost::shared_ptr<const Msg> msg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (!msg) {
            return false;
        }
        m = *msg;
        return true;
    }

    virtual void callback(const BaseMsgConstPtr& msg) override;
private:
    bool connected{ true };
    Cb cb{ nullptr };
    int32_t qsz{ 0 };
    mutable std::mutex qMutex;///< Access q
    std::list<BaseMsgConstPtr> q;
};

// Implementations
template<typename Allocer>
template<typename Msg>
ProducerPtr<Msg, Allocer> Dispatcher<Allocer>::advertise(int32_t channel)
{
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    ProducerPtr<Msg, Allocer> p(new Producer<Msg, Allocer>{ channel, *this });
    return p;
}
template<typename Allocer>
template<typename Msg>
boost::shared_ptr<Consumer<Msg> > Dispatcher<Allocer>::subscribe(
    int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept
{
    boost::shared_ptr<Consumer<Msg> > c(new Consumer<Msg>(queneSz));
    c->setCb(cb);
    BaseConsumerPtr baseC = c;
    subscibers.insert(std::pair<int32_t, BaseConsumerPtr>{ channel, baseC });
    return c;
}
template<typename Allocer>
Data Dispatcher<Allocer>::alloc(int32_t bytes) noexcept
{
    return allocer->alloc(bytes);
}
template<typename Allocer>
template<typename Msg>
void Dispatcher<Allocer>::publish(
    int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept
{
    auto rit = subscibers.equal_range(channel);
    BaseConsumerPtr c;
    for (auto it = rit.first, end = rit.second; it != end; ++it) {
        c = it->second;
        // TODO: callback in thread!!
        c->callback(msg);
    }
}

template<typename Msg, typename Allocer>
Producer<Msg, Allocer>::Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept :
    channel(channel), handle(handle) {}
template<typename Msg, typename Allocer>
boost::shared_ptr<Msg> Producer<Msg, Allocer>::produce(int32_t bytes)
{
    auto p = handle.alloc(bytes);
    if (!p) {
        return nullptr;
    }
    boost::shared_ptr<Msg> msg(new Msg());
    msg->setData(p);
    msg->setSize(bytes);
    return msg;
}
template<typename Msg, typename Allocer>
inline void Producer<Msg, Allocer>::publish(const boost::shared_ptr<const Msg>& msg) noexcept
{
    handle.publish(channel, msg);
}

template<typename Msg>
void Consumer<Msg>::callback(const BaseMsgConstPtr& msg)
{
    if (!connected || !cb) {
        // Enque
        std::lock_guard<std::mutex> lock(qMutex);
        q.push_back(msg);
        if (int32_t(q.size()) > qsz) {
            std::cout << "q full, deque front\n";
            q.pop_front();
        }
        return;
    }
    // Eat all, but queue first!
    bool qEmpty;
    {
        std::lock_guard<std::mutex> lock(qMutex);
        qEmpty = q.empty();
    }
    while(!qEmpty) {
        BaseMsgConstPtr qMsg;
        {
            std::lock_guard<std::mutex> lock(qMutex);
            if (!q.empty()) {
                qMsg = q.front();
                q.pop_front();
                qEmpty = q.empty();
            } else {
                break;
            }
        }
        boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (cbMsg) {
            cb(cbMsg);
        }
    }
    // Eat the new one
    boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(msg);
    if (cbMsg) {
        cb(cbMsg);
    }
    return;
}

// Examples
// Example allocer
class SimpleAllocer : public BaseAllocer {
public:
    /// Fake deletor, we not need really delete, just let this area free!
    class Deletor {
    public:
        Deletor(uint64_t memId = UINT32_MAX, SimpleAllocer* allocer = nullptr) noexcept :
            memId(memId), allocer(allocer) {}
        void operator()(uint8_t*& p) noexcept
        {
            if (p) {
                // NO delete
                // Just to nullptr!
                p = nullptr;
                // And move to idleBuffers!
                allocer->dealloc(memId);
            }
        }
    private:
        uint32_t memId{ UINT32_MAX };
        SimpleAllocer* allocer{ nullptr };
    };
    /// @throw std::bad_alloc when no enough memory.
    // TODO: maybe, examle: small need 20, big only need 10...
    SimpleAllocer(int32_t pieceBytes, int32_t increBytes, int32_t npieces, int32_t totalPieces) :
        pieceBytes(pieceBytes), increBytes(increBytes)
    {
        int32_t n = ::ceil(totalPieces / npieces);
        this->maxBytes = pieceBytes + (increBytes * (n - 1));
        for (int32_t i = 0; i < n; ++i) {
            for (int32_t j = 0; j < npieces; ++j) {
                buffers[i * npieces + j] = std::vector<uint8_t>(pieceBytes);
                idleBuffers.insert(i);
                bufferByteIds.insert(std::pair<int32_t, uint32_t>{ pieceBytes, i * npieces + j });
            }
            pieceBytes += increBytes;
        }
    }
    SimpleAllocer(SimpleAllocer&& rhs) noexcept
    {
        *this = std::move(rhs);
    }
    SimpleAllocer& operator=(SimpleAllocer&& rhs) noexcept
    {
        if (this != &rhs) {
            std::lock_guard<std::mutex> rlock(rhs.buffersMutex);
            std::lock_guard<std::mutex> lock(buffersMutex);
            this->buffers = std::move(rhs.buffers);
            this->bufferByteIds = std::move(rhs.bufferByteIds);
            this->idleBuffers = std::move(rhs.idleBuffers);
            this->inUseBuffers = std::move(rhs.inUseBuffers);
        }
        return *this;
    }
    virtual Data alloc(int32_t bytes) noexcept override
    {
        // TODO pick a good memory

        // TEST: use a simple memory
        std::lock_guard<std::mutex> lock(buffersMutex);
        if (this->idleBuffers.empty()) {
            return nullptr;
        }
        if (bytes > maxBytes) {
            return nullptr;
        }
        // TODO: should use bufferByteIds to find a good and NOT too large buffer!
        for (auto it = idleBuffers.begin(), end = idleBuffers.end(); it != end; ++it) {
            auto memId = *it;
            if (int32_t(buffers[memId].size()) < bytes) {
                continue;
            }
            idleBuffers.erase(it);
            inUseBuffers.insert(memId);
            return Data{ buffers[memId].data(), Deletor{ memId, this } };
        }
        return nullptr;
    }
    void dealloc(uint32_t memId)
    {
        std::lock_guard<std::mutex> lock(buffersMutex);
        auto it = inUseBuffers.find(memId);
        if (it != inUseBuffers.end()) {
            inUseBuffers.erase(it);
            idleBuffers.insert(memId);
        }
    }
private:
    // TODO maybe use a good mono buffer, and belows should all in monoBuffer!!
    //std::vector<uint8_t> monoBuffer;
    // TEST: use a simple buffer: { memId, buffer(should be part of monoBuffer) }
    mutable std::mutex buffersMutex;///< Access buffers, idleBuffers and inUseBuffers ...
    int32_t pieceBytes;
    int32_t increBytes;
    int32_t maxBytes;
    std::unordered_map<uint32_t, std::vector<uint8_t> > buffers;
    /// TODO use bufferByteIds or maybe another fast query table!
    std::unordered_multimap<int32_t, uint32_t> bufferByteIds;
    std::unordered_set<uint32_t> idleBuffers;
    std::unordered_set<uint32_t> inUseBuffers;
};

// Example msgs
class I32Msg : public BaseMsg {
public:
    operator int32_t() const noexcept
    {
       return *(reinterpret_cast<const int32_t*>(this->data.get()));
    }
};
class StringMsg : public BaseMsg {
public:
    operator boost::string_ref() const noexcept
    {
       return boost::string_ref{ reinterpret_cast<const char*>(this->data.get()), uint64_t(this->size - 1) };
    }
};

class My {
public:
    void cb(const boost::shared_ptr<const StringMsg>& msg) noexcept
    {
        std::cout << "my StringMsg cb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    }
};

TEST(Dispatcher, 1)
{
    std::atomic<int32_t> K{ 0 };
    //BaseAllocerPtr simpleAllocer(new SimpleAllocer(64, 1024, 4, 20));
    BaseAllocerPtr simpleAllocer(new SimpleAllocer(16, 20, 4, 20));
    Dispatcher<SimpleAllocer> dispatcher(simpleAllocer);
    auto i32ProducerC1 = dispatcher.advertise<I32Msg>(1);
    auto i32ProducerC1B = dispatcher.advertise<I32Msg>(1);
    auto stringProducerC1 = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC1B = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC2 = dispatcher.advertise<StringMsg>(2);

    std::mutex i32ConsumerC1CbMutex;
    auto i32ConsumerC1Cb = [&i32ConsumerC1CbMutex](const boost::shared_ptr<const I32Msg>& msg) {
        std::lock_guard<std::mutex> lock(i32ConsumerC1CbMutex);
        std::cout << "i32ConsumerC1Cb: " << int32_t(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto i32ConsumerC1 = dispatcher.subscribe<I32Msg>(1, i32ConsumerC1Cb, 2);

    std::mutex stringConsumerC1CbMutex;
    auto stringConsumerAllCb = [&stringConsumerC1CbMutex](const boost::shared_ptr<const StringMsg>& msg) {
        std::lock_guard<std::mutex> lock(stringConsumerC1CbMutex);
        std::cout << "stringConsumerAllCb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto stringConsumerC1 = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC1B = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC2 = dispatcher.subscribe<StringMsg>(2, stringConsumerAllCb, 2);
    My my;
    auto stringConsumerC2B = dispatcher.subscribe<StringMsg>(2, boost::bind(&My::cb, &my, _1), 2);
    auto stringConsumerC2C = dispatcher.subscribe<StringMsg>(2, nullptr, 2);

    // Produce
    std::thread t1([&K, &i32ProducerC1] {
        std::cout << "i32ProducerC1 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            // Get a memory!
            auto p = i32ProducerC1->produce(4);
            if (p) {
                // Fill data!
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                // TODO: Notify consumer, maybe here need callback in thread!!
                i32ProducerC1->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t2([&K, &i32ProducerC1B] {
        std::cout << "i32ProducerC1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            auto p = i32ProducerC1B->produce(4);
            if (p) {
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                i32ProducerC1B->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t3([&K, &stringProducerC1, &stringProducerC1B] {
        std::cout << "stringProducerC1 C1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "sss=" + std::to_string(::rand() % 200 + 3000);
            auto p = stringProducerC1->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [1] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [1] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            s = "sss=" + std::to_string(::rand() % 200 + 3000);
            p = stringProducerC1B->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [2] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1B->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [2] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t4([&K, &stringProducerC2, &stringConsumerC2C] {
        std::cout << "stringProducerC2 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "[[SSS K=" + std::to_string(K) + "]]";
            auto p = stringProducerC2->produce(s.length() + 1);
            if (p) {
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC2->publish(p);
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            if (0 == (K % 6)) {
                StringMsg smsg;
                while (stringConsumerC2C->deque(smsg)) {
                    std::cout << "deque smsg OK! " << boost::string_ref(smsg) << "\n";
                }
            }
            ++K;
        }
    });
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
int main(int argc, char** argv)
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

Write a CircularBuffer

Write a CircularBuffer

为什么要写一个 CircularBuffer :

  • Template-able CircularBuffer.
  • 可变和不可变容量(recapacity)的 buffer.
  • 可以 forward or move (改变有效数据范围) 的 buffer.
  • 可以 resize 有效数据 size 的 buffer.
  • 可以连续访问未使用区域的 buffer (restBegin and restEnd).

例如可以用于:

  • A copy less ROS/navigation/costmap_2d/ cost map implementation!

实现

The CircularBuffer class brief:

/**
 * Var capacity circular buffer.
 * NOTE:
 * - Like CircularQueue, but if full, then overwrite!
 * - And like CircularQueue, but NOT use front and rear, but use next index
 *   and size(front index just for faster computing).
 * - When use in multi-threads, should add lock or some other to sync.
 * @tparam T data type.
 * @sa CircularQueue
 */
template<typename T>
class CircularBuffer {
public:
    /// The iterator type.
    class Iterator;
    /// The const iterator type.
    class ConstIterator;
    /// Box type: { boxMinXIdx, boxMinYIdx, boxWidth, boxHeight }
    using Box = Eigen::Vector4i;
    /**
     * Ctor.
     * @param capacity the buffer capacity, should >= 1.
     * @throw std::logic_error when capacity invalid.
     */
    CircularBuffer(int32_t capacity = 1, const T& initValue = T{});
    /// Move ctor
    CircularBuffer(CircularBuffer<T>&& rhs) noexcept;
    /// Copy ctor
    CircularBuffer(const CircularBuffer<T>& rhs) noexcept;
    /// Dtor
    ~CircularBuffer() noexcept;
    /// Move assign
    CircularBuffer<T>& operator=(CircularBuffer<T>&& rhs) noexcept;
    /// Copy assign
    CircularBuffer<T>& operator=(const CircularBuffer<T>& rhs) noexcept;
    /// @return internal alloced buffer capacity.
    inline int32_t getAllocedCapacity() const noexcept;
    /// @return buffer capacity.
    inline int32_t capacity() const noexcept;
    /**
     * Set buffer capacity.
     * @param capacity the new capacity to set, should > 0.
     * @param initValue the initValue to set when reset capacity.
     * @return true when success, otherwise false.
     */
    bool recapacity(int32_t capacity, const T& initValue = T{}) noexcept;
    /// Fit internal buffer's capacity to this->capacity().
    void shrink_to_fit() noexcept;
    /**
     * Forward the buffer to frontIdx(real is nextIdx) by step.
     * @param step the steps to forward, if < 0, then backward.
     * @return forwarded buffer.
     */
    inline CircularBuffer<T>& forward(int32_t step) noexcept;
    /**
     * Move box to right and down.
     *
     *  0 1 2 3
     * 0. - - - > right (x, width)
     * 1|
     * 2|
     *  V
     * down (y, height)
     *
     * @param box the box to move.
     * @param width the full(capacity) buffer width (i.e. sizeX).
     * @param height the full(capacity) buffer height (i.e. sizeY).
     * @param xStep x increasing direction steps,
     * if < 0 then to descending direction.
     * @param yStep y increasing direction steps,
     * if < 0 then to descending direction.
     * @param initValue NOTE: if some regions will out of bounds then fill
     * these regions with initValue first, then these regions with initValue
     * will be moved circularly!
     * @return the moved buffer.
     * @sa
     * - fillBox
     * - fillBoxOutside
     */
    CircularBuffer<T>& moveBox(
        const Box& box,
        int32_t width,
        int32_t height,
        int32_t xStep,
        int32_t yStep,
        const T& initValue = T{}) noexcept;
    /**
     * Resize in-use datas count.
     * @note NO re-init.
     * @param newCount the new count to resize, should >= 0.
     * @return resized buffer.
     * @throw std::logic_error when @a newCount invalid.
     */
    CircularBuffer<T>& resize(int32_t newCount);
    /**
     * Resize in-use datas count.
     * @param newCount the new count to resize, should >= 0.
     * @param initValue set new or unused buffer to initValue.
     * @return resized buffer.
     * @throw std::logic_error when @a newCount invalid.
     */
    CircularBuffer<T>& resize(int32_t newCount, const T& initValue);
    /**
     * Push a value.
     * @param value to push.
     */
    void push_back(T&& value) noexcept;
    void push_back(const T& value) noexcept;
    bool pop_back(const bool reset = false) noexcept;
    bool pop_back(T& removed, const bool reset = false) noexcept;
    bool pop_front(const bool reset = false) noexcept;
    bool pop_front(T& removed, const bool reset = false) noexcept;
    /**
     * Get first data when buffer not empty.
     * @return first data when buffer not empty.
     * @throw std::logic_error when buffer empty.
     */
    const T& front() const;
    T& front();
    /**
     * Get last data when buffer not empty.
     * @return last data when buffer not empty.
     * @throw std::logic_error when buffer empty.
     */
    const T& back() const;
    T& back();
    /// @return internal buffer.data().
    inline T* data() noexcept;
    /// @return internal buffer.data().
    inline const T* data() const noexcept;
    /// @return whether buffer empty.
    inline bool empty() const noexcept;
    /// @return whether buffer full.
    inline bool full() const noexcept;
    /// @return data size.
    inline int32_t size() const noexcept;
    /// Clear in-use datas count to 0.
    inline CircularBuffer<T>& clear() noexcept;
    /// Clear buffer and reset to initValue.
    inline CircularBuffer<T>& clear(const T& initValue) noexcept;
    /// Set count to capacity, NOTE: NOT set rest buffer region to a value.
    inline CircularBuffer<T>& beFull() noexcept;
    /// Set count to capacity, and set rest buffer region to a value.
    inline CircularBuffer<T>& beFull(const T& value) noexcept;
    /**
     * Fill all (in-use and rest buffer regions) to gave value, but NOT
     * change buffer's size.
     */
    inline CircularBuffer<T>& fillNoResize(const T& value) noexcept;
    /**
     * Fill to gave starting from frontIdx + offset and count is @a count,
     * but NOT change buffer size.
     * @param offset the offset from frontIdx.
     * @param count the data count to fill.
     * @param value the value to fill.
     */
    inline CircularBuffer<T>& fillNoResize(
        const int64_t offset, const int32_t count, const T& value) noexcept;
    /**
     * Set count to capacity and set in-use and rest buffer regions to
     * gave value.
     */
    inline CircularBuffer<T>& fill(const T& value) noexcept;
    /// Fill in-use buffer region to gave value.
    inline CircularBuffer<T>& fillInUse(const T& value) noexcept;
    /// Fill rest buffer region to gave value.
    CircularBuffer<T>& fillRest(const T& value) noexcept;
    /**
     * Fill out of a inner box (sub-window) with gave value.
     * @param box the box to move.
     * @param width the full(capacity) buffer width (i.e. sizeX).
     * @param height the full(capacity) buffer height (i.e. sizeY).
     * @param value the value to fill.
     * @return filled buffer.
     * @note
     * - Size of buffer should be width * height!
     * - Parameters should valid and range!
     */
    CircularBuffer<T>& fillBoxOutside(
        const Box& box,
        int32_t width,
        int32_t height,
        const T& value) noexcept;
    CircularBuffer<T>& fillBox(
        const Box& box, int32_t width, const T& value) noexcept;
    /// Get buffer begin and end to watch all valid datas.
    Iterator begin() noexcept;
    Iterator end() noexcept;
    ConstIterator begin() const noexcept;
    ConstIterator end() const noexcept;
    /// Get buffer's rest region begin and end.
    Iterator restBegin() noexcept;
    Iterator restEnd() noexcept;
    /**
     * Access buffer from front to last,
     * @param idx data index, [0, size).
     * @return the data of index when success.
     * @throw std::logic_error when buffer empty when @a idx out of range.
     */
    const T& operator[](const int64_t idx) const;
    T& operator[](const int64_t idx);
    /**
     * Get ordered vector of the valid datas.
     * @return ordered vector buffer of all valid datas.
     * @note Result's size maybe < capacity.
     */
    std::vector<T> getOrdered() const noexcept;
    /// Check whether buffer equals @a datas
    bool operator==(const std::initializer_list<T>& datas) const noexcept;
private:
    /**
     * Compute front data index, NOTE: buffer NOT empty, otherwise index
     * invalid.
     */
    inline int64_t computeFrontIdx() const noexcept;
    std::vector<T> buffer;///< The buffer.
    int32_t capa{ 1 };///< The buffer capacity, > 0.
    int64_t nextIdx{ 0 };///< Next buffer data index.
    /**
     * Cache to fast related method, NOTE: sometimes invalid, need external
     * empty check!
     */
    int64_t frontIdx{ 0 };
    int32_t count{ 0 };///< Current valid data count.
};

Detailed implementations see: