Buff

Buff

A simple observer implementation with customizable allocer and customizable
messages!
Maybe we need a producer-consumer .. here just implements in observer, i.e. publisher-subsciber!
And use in a single process!

#include <atomic>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <thread>
#include <mutex>
#include "boost/shared_ptr.hpp"
#include "boost/utility/string_ref.hpp"
#include "boost/bind.hpp"
#include "gtest/gtest.h"

using Data = boost::shared_ptr<uint8_t[]>;

class BaseMsg {
public:
    virtual ~BaseMsg() noexcept {}
    inline explicit operator bool() const noexcept
    {
        return this->data != nullptr;
    }
    inline void setData(const Data& data) noexcept
    {
        this->data = data;
    }
    inline void setSize(int32_t size) noexcept
    {
        this->size = size;
    }
    inline Data getData() noexcept
    {
        return this->data;
    }
    inline int32_t getSize() noexcept
    {
        return this->size;
    }
protected:
    Data data{ nullptr };
    int32_t size{ 0 };
    uint32_t seq{ 0 };///< Seq-number, TODO: use this field!
    double stamp{ 0.0 };///< TODO: maybe also use this field!
};
/// Here shared_ptr not very necessary... expect sub-msgs has many customs fields!
using BaseMsgConstPtr = boost::shared_ptr<const BaseMsg>;

// Forward declaretions:
// Producer and ProducerPtr
template<typename Msg, typename Allocer>
class Producer;
template<typename Msg, typename Allocer>
using ProducerPtr = boost::shared_ptr<Producer<Msg, Allocer> >;
// BaseConsumer and BaseConsumerPtr
class BaseConsumer;
using BaseConsumerPtr = boost::shared_ptr<BaseConsumer>;

template<typename Msg>
class Consumer;

class BaseAllocer {
public:
    virtual ~BaseAllocer() noexcept {}
    virtual Data alloc(int32_t /*bytes*/) noexcept
    {
        return nullptr;
    }
};
using BaseAllocerPtr = boost::shared_ptr<BaseAllocer>;

/// The core handler, dispatcher:
template<typename Allocer>
class Dispatcher {
public:
    // Allocer check
    static_assert(
        std::is_base_of<BaseAllocer, typename std::decay<Allocer>::type>::value,
        "BaseAllocer should be base of Allocer");

    Dispatcher(const BaseAllocerPtr& allocer) noexcept : allocer(allocer) {}
    template<typename Msg>
    ProducerPtr<Msg, Allocer> advertise(int32_t channel);
    template<typename Msg>
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    template<typename Msg>
    boost::shared_ptr<Consumer<Msg> > subscribe(
        int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept;

    Data alloc(int32_t bytes) noexcept;
    template<typename Msg>
    void publish(int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept;

    // TODO: dispatch(), e.g. in a or multi threads to dispatch buffers to all subscibers!

private:
    BaseAllocerPtr allocer{ nullptr };
    std::unordered_multimap<int32_t, BaseConsumerPtr> subscibers;
};

// 1. produce(), 2. publish()
template<typename Msg, typename Allocer>
class Producer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");
    Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept;
    /// Produce a buffer to fill message.
    boost::shared_ptr<Msg> produce(int32_t bytes);
    inline void publish(const boost::shared_ptr<const Msg>& msg) noexcept;
private:
    const int32_t channel;
    Dispatcher<Allocer>& handle;
};

/// Consumer, i.e. subsciber.
class BaseConsumer {
public:
    virtual void callback(const BaseMsgConstPtr&) {}
};

template<typename Msg>
class Consumer : public BaseConsumer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    Consumer() noexcept {}
    Consumer(int32_t qsz) noexcept : qsz(qsz) {}

    // If callback setup then auto callback!
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    inline void setCb(const Cb& cb) noexcept
    {
        this->cb = cb;
    }
    inline Cb& getCb() noexcept
    {
        return this->cb;
    }
    inline void setConnected(bool connected) noexcept
    {
        this->connected = connected;
    }
    inline bool isConnected() const noexcept
    {
        return this->connected;
    }

    // If not set callback, you can deque datas
    bool deque(Msg& m) noexcept
    {
        std::lock_guard<std::mutex> lock(qMutex);
        if (q.empty()) {
            return false;
        }
        auto qMsg = q.front();
        q.pop_front();
        boost::shared_ptr<const Msg> msg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (!msg) {
            return false;
        }
        m = *msg;
        return true;
    }

    virtual void callback(const BaseMsgConstPtr& msg) override;
private:
    bool connected{ true };
    Cb cb{ nullptr };
    int32_t qsz{ 0 };
    mutable std::mutex qMutex;///< Access q
    std::list<BaseMsgConstPtr> q;
};

// Implementations
template<typename Allocer>
template<typename Msg>
ProducerPtr<Msg, Allocer> Dispatcher<Allocer>::advertise(int32_t channel)
{
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    ProducerPtr<Msg, Allocer> p(new Producer<Msg, Allocer>{ channel, *this });
    return p;
}
template<typename Allocer>
template<typename Msg>
boost::shared_ptr<Consumer<Msg> > Dispatcher<Allocer>::subscribe(
    int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept
{
    boost::shared_ptr<Consumer<Msg> > c(new Consumer<Msg>(queneSz));
    c->setCb(cb);
    BaseConsumerPtr baseC = c;
    subscibers.insert(std::pair<int32_t, BaseConsumerPtr>{ channel, baseC });
    return c;
}
template<typename Allocer>
Data Dispatcher<Allocer>::alloc(int32_t bytes) noexcept
{
    return allocer->alloc(bytes);
}
template<typename Allocer>
template<typename Msg>
void Dispatcher<Allocer>::publish(
    int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept
{
    auto rit = subscibers.equal_range(channel);
    BaseConsumerPtr c;
    for (auto it = rit.first, end = rit.second; it != end; ++it) {
        c = it->second;
        // TODO: callback in thread!!
        c->callback(msg);
    }
}

template<typename Msg, typename Allocer>
Producer<Msg, Allocer>::Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept :
    channel(channel), handle(handle) {}
template<typename Msg, typename Allocer>
boost::shared_ptr<Msg> Producer<Msg, Allocer>::produce(int32_t bytes)
{
    auto p = handle.alloc(bytes);
    if (!p) {
        return nullptr;
    }
    boost::shared_ptr<Msg> msg(new Msg());
    msg->setData(p);
    msg->setSize(bytes);
    return msg;
}
template<typename Msg, typename Allocer>
inline void Producer<Msg, Allocer>::publish(const boost::shared_ptr<const Msg>& msg) noexcept
{
    handle.publish(channel, msg);
}

template<typename Msg>
void Consumer<Msg>::callback(const BaseMsgConstPtr& msg)
{
    if (!connected || !cb) {
        // Enque
        std::lock_guard<std::mutex> lock(qMutex);
        q.push_back(msg);
        if (int32_t(q.size()) > qsz) {
            std::cout << "q full, deque front\n";
            q.pop_front();
        }
        return;
    }
    // Eat all, but queue first!
    bool qEmpty;
    {
        std::lock_guard<std::mutex> lock(qMutex);
        qEmpty = q.empty();
    }
    while(!qEmpty) {
        BaseMsgConstPtr qMsg;
        {
            std::lock_guard<std::mutex> lock(qMutex);
            if (!q.empty()) {
                qMsg = q.front();
                q.pop_front();
                qEmpty = q.empty();
            } else {
                break;
            }
        }
        boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (cbMsg) {
            cb(cbMsg);
        }
    }
    // Eat the new one
    boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(msg);
    if (cbMsg) {
        cb(cbMsg);
    }
    return;
}

// Examples
// Example allocer
class SimpleAllocer : public BaseAllocer {
public:
    /// Fake deletor, we not need really delete, just let this area free!
    class Deletor {
    public:
        Deletor(uint64_t memId = UINT32_MAX, SimpleAllocer* allocer = nullptr) noexcept :
            memId(memId), allocer(allocer) {}
        void operator()(uint8_t*& p) noexcept
        {
            if (p) {
                // NO delete
                // Just to nullptr!
                p = nullptr;
                // And move to idleBuffers!
                allocer->dealloc(memId);
            }
        }
    private:
        uint32_t memId{ UINT32_MAX };
        SimpleAllocer* allocer{ nullptr };
    };
    /// @throw std::bad_alloc when no enough memory.
    // TODO: maybe, examle: small need 20, big only need 10...
    SimpleAllocer(int32_t pieceBytes, int32_t increBytes, int32_t npieces, int32_t totalPieces) :
        pieceBytes(pieceBytes), increBytes(increBytes)
    {
        int32_t n = ::ceil(totalPieces / npieces);
        this->maxBytes = pieceBytes + (increBytes * (n - 1));
        for (int32_t i = 0; i < n; ++i) {
            for (int32_t j = 0; j < npieces; ++j) {
                buffers[i * npieces + j] = std::vector<uint8_t>(pieceBytes);
                idleBuffers.insert(i);
                bufferByteIds.insert(std::pair<int32_t, uint32_t>{ pieceBytes, i * npieces + j });
            }
            pieceBytes += increBytes;
        }
    }
    SimpleAllocer(SimpleAllocer&& rhs) noexcept
    {
        *this = std::move(rhs);
    }
    SimpleAllocer& operator=(SimpleAllocer&& rhs) noexcept
    {
        if (this != &rhs) {
            std::lock_guard<std::mutex> rlock(rhs.buffersMutex);
            std::lock_guard<std::mutex> lock(buffersMutex);
            this->buffers = std::move(rhs.buffers);
            this->bufferByteIds = std::move(rhs.bufferByteIds);
            this->idleBuffers = std::move(rhs.idleBuffers);
            this->inUseBuffers = std::move(rhs.inUseBuffers);
        }
        return *this;
    }
    virtual Data alloc(int32_t bytes) noexcept override
    {
        // TODO pick a good memory

        // TEST: use a simple memory
        std::lock_guard<std::mutex> lock(buffersMutex);
        if (this->idleBuffers.empty()) {
            return nullptr;
        }
        if (bytes > maxBytes) {
            return nullptr;
        }
        // TODO: should use bufferByteIds to find a good and NOT too large buffer!
        for (auto it = idleBuffers.begin(), end = idleBuffers.end(); it != end; ++it) {
            auto memId = *it;
            if (int32_t(buffers[memId].size()) < bytes) {
                continue;
            }
            idleBuffers.erase(it);
            inUseBuffers.insert(memId);
            return Data{ buffers[memId].data(), Deletor{ memId, this } };
        }
        return nullptr;
    }
    void dealloc(uint32_t memId)
    {
        std::lock_guard<std::mutex> lock(buffersMutex);
        auto it = inUseBuffers.find(memId);
        if (it != inUseBuffers.end()) {
            inUseBuffers.erase(it);
            idleBuffers.insert(memId);
        }
    }
private:
    // TODO maybe use a good mono buffer, and belows should all in monoBuffer!!
    //std::vector<uint8_t> monoBuffer;
    // TEST: use a simple buffer: { memId, buffer(should be part of monoBuffer) }
    mutable std::mutex buffersMutex;///< Access buffers, idleBuffers and inUseBuffers ...
    int32_t pieceBytes;
    int32_t increBytes;
    int32_t maxBytes;
    std::unordered_map<uint32_t, std::vector<uint8_t> > buffers;
    /// TODO use bufferByteIds or maybe another fast query table!
    std::unordered_multimap<int32_t, uint32_t> bufferByteIds;
    std::unordered_set<uint32_t> idleBuffers;
    std::unordered_set<uint32_t> inUseBuffers;
};

// Example msgs
class I32Msg : public BaseMsg {
public:
    operator int32_t() const noexcept
    {
       return *(reinterpret_cast<const int32_t*>(this->data.get()));
    }
};
class StringMsg : public BaseMsg {
public:
    operator boost::string_ref() const noexcept
    {
       return boost::string_ref{ reinterpret_cast<const char*>(this->data.get()), uint64_t(this->size - 1) };
    }
};

class My {
public:
    void cb(const boost::shared_ptr<const StringMsg>& msg) noexcept
    {
        std::cout << "my StringMsg cb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    }
};

TEST(Dispatcher, 1)
{
    std::atomic<int32_t> K{ 0 };
    //BaseAllocerPtr simpleAllocer(new SimpleAllocer(64, 1024, 4, 20));
    BaseAllocerPtr simpleAllocer(new SimpleAllocer(16, 20, 4, 20));
    Dispatcher<SimpleAllocer> dispatcher(simpleAllocer);
    auto i32ProducerC1 = dispatcher.advertise<I32Msg>(1);
    auto i32ProducerC1B = dispatcher.advertise<I32Msg>(1);
    auto stringProducerC1 = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC1B = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC2 = dispatcher.advertise<StringMsg>(2);

    std::mutex i32ConsumerC1CbMutex;
    auto i32ConsumerC1Cb = [&i32ConsumerC1CbMutex](const boost::shared_ptr<const I32Msg>& msg) {
        std::lock_guard<std::mutex> lock(i32ConsumerC1CbMutex);
        std::cout << "i32ConsumerC1Cb: " << int32_t(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto i32ConsumerC1 = dispatcher.subscribe<I32Msg>(1, i32ConsumerC1Cb, 2);

    std::mutex stringConsumerC1CbMutex;
    auto stringConsumerAllCb = [&stringConsumerC1CbMutex](const boost::shared_ptr<const StringMsg>& msg) {
        std::lock_guard<std::mutex> lock(stringConsumerC1CbMutex);
        std::cout << "stringConsumerAllCb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto stringConsumerC1 = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC1B = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC2 = dispatcher.subscribe<StringMsg>(2, stringConsumerAllCb, 2);
    My my;
    auto stringConsumerC2B = dispatcher.subscribe<StringMsg>(2, boost::bind(&My::cb, &my, _1), 2);
    auto stringConsumerC2C = dispatcher.subscribe<StringMsg>(2, nullptr, 2);

    // Produce
    std::thread t1([&K, &i32ProducerC1] {
        std::cout << "i32ProducerC1 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            // Get a memory!
            auto p = i32ProducerC1->produce(4);
            if (p) {
                // Fill data!
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                // TODO: Notify consumer, maybe here need callback in thread!!
                i32ProducerC1->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t2([&K, &i32ProducerC1B] {
        std::cout << "i32ProducerC1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            auto p = i32ProducerC1B->produce(4);
            if (p) {
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                i32ProducerC1B->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t3([&K, &stringProducerC1, &stringProducerC1B] {
        std::cout << "stringProducerC1 C1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "sss=" + std::to_string(::rand() % 200 + 3000);
            auto p = stringProducerC1->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [1] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [1] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            s = "sss=" + std::to_string(::rand() % 200 + 3000);
            p = stringProducerC1B->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [2] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1B->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [2] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t4([&K, &stringProducerC2, &stringConsumerC2C] {
        std::cout << "stringProducerC2 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "[[SSS K=" + std::to_string(K) + "]]";
            auto p = stringProducerC2->produce(s.length() + 1);
            if (p) {
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC2->publish(p);
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            if (0 == (K % 6)) {
                StringMsg smsg;
                while (stringConsumerC2C->deque(smsg)) {
                    std::cout << "deque smsg OK! " << boost::string_ref(smsg) << "\n";
                }
            }
            ++K;
        }
    });
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
int main(int argc, char** argv)
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}