A simple observer implementation with customizable allocer and customizable
messages!
Maybe we need a producer-consumer .. here just implements in observer, i.e. publisher-subsciber!
And use in a single process!
#include <atomic>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <thread>
#include <mutex>
#include "boost/shared_ptr.hpp"
#include "boost/utility/string_ref.hpp"
#include "boost/bind.hpp"
#include "gtest/gtest.h"
using Data = boost::shared_ptr<uint8_t[]>;
class BaseMsg {
public:
virtual ~BaseMsg() noexcept {}
inline explicit operator bool() const noexcept
{
return this->data != nullptr;
}
inline void setData(const Data& data) noexcept
{
this->data = data;
}
inline void setSize(int32_t size) noexcept
{
this->size = size;
}
inline Data getData() noexcept
{
return this->data;
}
inline int32_t getSize() noexcept
{
return this->size;
}
protected:
Data data{ nullptr };
int32_t size{ 0 };
uint32_t seq{ 0 };///< Seq-number, TODO: use this field!
double stamp{ 0.0 };///< TODO: maybe also use this field!
};
/// Here shared_ptr not very necessary... expect sub-msgs has many customs fields!
using BaseMsgConstPtr = boost::shared_ptr<const BaseMsg>;
// Forward declaretions:
// Producer and ProducerPtr
template<typename Msg, typename Allocer>
class Producer;
template<typename Msg, typename Allocer>
using ProducerPtr = boost::shared_ptr<Producer<Msg, Allocer> >;
// BaseConsumer and BaseConsumerPtr
class BaseConsumer;
using BaseConsumerPtr = boost::shared_ptr<BaseConsumer>;
template<typename Msg>
class Consumer;
class BaseAllocer {
public:
virtual ~BaseAllocer() noexcept {}
virtual Data alloc(int32_t /*bytes*/) noexcept
{
return nullptr;
}
};
using BaseAllocerPtr = boost::shared_ptr<BaseAllocer>;
/// The core handler, dispatcher:
template<typename Allocer>
class Dispatcher {
public:
// Allocer check
static_assert(
std::is_base_of<BaseAllocer, typename std::decay<Allocer>::type>::value,
"BaseAllocer should be base of Allocer");
Dispatcher(const BaseAllocerPtr& allocer) noexcept : allocer(allocer) {}
template<typename Msg>
ProducerPtr<Msg, Allocer> advertise(int32_t channel);
template<typename Msg>
using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
template<typename Msg>
boost::shared_ptr<Consumer<Msg> > subscribe(
int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept;
Data alloc(int32_t bytes) noexcept;
template<typename Msg>
void publish(int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept;
// TODO: dispatch(), e.g. in a or multi threads to dispatch buffers to all subscibers!
private:
BaseAllocerPtr allocer{ nullptr };
std::unordered_multimap<int32_t, BaseConsumerPtr> subscibers;
};
// 1. produce(), 2. publish()
template<typename Msg, typename Allocer>
class Producer {
public:
// Msg check
static_assert(
std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
"BaseMsg should be base of Msg");
Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept;
/// Produce a buffer to fill message.
boost::shared_ptr<Msg> produce(int32_t bytes);
inline void publish(const boost::shared_ptr<const Msg>& msg) noexcept;
private:
const int32_t channel;
Dispatcher<Allocer>& handle;
};
/// Consumer, i.e. subsciber.
class BaseConsumer {
public:
virtual void callback(const BaseMsgConstPtr&) {}
};
template<typename Msg>
class Consumer : public BaseConsumer {
public:
// Msg check
static_assert(
std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
"BaseMsg should be base of Msg");
Consumer() noexcept {}
Consumer(int32_t qsz) noexcept : qsz(qsz) {}
// If callback setup then auto callback!
using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
inline void setCb(const Cb& cb) noexcept
{
this->cb = cb;
}
inline Cb& getCb() noexcept
{
return this->cb;
}
inline void setConnected(bool connected) noexcept
{
this->connected = connected;
}
inline bool isConnected() const noexcept
{
return this->connected;
}
// If not set callback, you can deque datas
bool deque(Msg& m) noexcept
{
std::lock_guard<std::mutex> lock(qMutex);
if (q.empty()) {
return false;
}
auto qMsg = q.front();
q.pop_front();
boost::shared_ptr<const Msg> msg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
if (!msg) {
return false;
}
m = *msg;
return true;
}
virtual void callback(const BaseMsgConstPtr& msg) override;
private:
bool connected{ true };
Cb cb{ nullptr };
int32_t qsz{ 0 };
mutable std::mutex qMutex;///< Access q
std::list<BaseMsgConstPtr> q;
};
// Implementations
template<typename Allocer>
template<typename Msg>
ProducerPtr<Msg, Allocer> Dispatcher<Allocer>::advertise(int32_t channel)
{
// Msg check
static_assert(
std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
"BaseMsg should be base of Msg");
ProducerPtr<Msg, Allocer> p(new Producer<Msg, Allocer>{ channel, *this });
return p;
}
template<typename Allocer>
template<typename Msg>
boost::shared_ptr<Consumer<Msg> > Dispatcher<Allocer>::subscribe(
int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept
{
boost::shared_ptr<Consumer<Msg> > c(new Consumer<Msg>(queneSz));
c->setCb(cb);
BaseConsumerPtr baseC = c;
subscibers.insert(std::pair<int32_t, BaseConsumerPtr>{ channel, baseC });
return c;
}
template<typename Allocer>
Data Dispatcher<Allocer>::alloc(int32_t bytes) noexcept
{
return allocer->alloc(bytes);
}
template<typename Allocer>
template<typename Msg>
void Dispatcher<Allocer>::publish(
int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept
{
auto rit = subscibers.equal_range(channel);
BaseConsumerPtr c;
for (auto it = rit.first, end = rit.second; it != end; ++it) {
c = it->second;
// TODO: callback in thread!!
c->callback(msg);
}
}
template<typename Msg, typename Allocer>
Producer<Msg, Allocer>::Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept :
channel(channel), handle(handle) {}
template<typename Msg, typename Allocer>
boost::shared_ptr<Msg> Producer<Msg, Allocer>::produce(int32_t bytes)
{
auto p = handle.alloc(bytes);
if (!p) {
return nullptr;
}
boost::shared_ptr<Msg> msg(new Msg());
msg->setData(p);
msg->setSize(bytes);
return msg;
}
template<typename Msg, typename Allocer>
inline void Producer<Msg, Allocer>::publish(const boost::shared_ptr<const Msg>& msg) noexcept
{
handle.publish(channel, msg);
}
template<typename Msg>
void Consumer<Msg>::callback(const BaseMsgConstPtr& msg)
{
if (!connected || !cb) {
// Enque
std::lock_guard<std::mutex> lock(qMutex);
q.push_back(msg);
if (int32_t(q.size()) > qsz) {
std::cout << "q full, deque front\n";
q.pop_front();
}
return;
}
// Eat all, but queue first!
bool qEmpty;
{
std::lock_guard<std::mutex> lock(qMutex);
qEmpty = q.empty();
}
while(!qEmpty) {
BaseMsgConstPtr qMsg;
{
std::lock_guard<std::mutex> lock(qMutex);
if (!q.empty()) {
qMsg = q.front();
q.pop_front();
qEmpty = q.empty();
} else {
break;
}
}
boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
if (cbMsg) {
cb(cbMsg);
}
}
// Eat the new one
boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(msg);
if (cbMsg) {
cb(cbMsg);
}
return;
}
// Examples
// Example allocer
class SimpleAllocer : public BaseAllocer {
public:
/// Fake deletor, we not need really delete, just let this area free!
class Deletor {
public:
Deletor(uint64_t memId = UINT32_MAX, SimpleAllocer* allocer = nullptr) noexcept :
memId(memId), allocer(allocer) {}
void operator()(uint8_t*& p) noexcept
{
if (p) {
// NO delete
// Just to nullptr!
p = nullptr;
// And move to idleBuffers!
allocer->dealloc(memId);
}
}
private:
uint32_t memId{ UINT32_MAX };
SimpleAllocer* allocer{ nullptr };
};
/// @throw std::bad_alloc when no enough memory.
// TODO: maybe, examle: small need 20, big only need 10...
SimpleAllocer(int32_t pieceBytes, int32_t increBytes, int32_t npieces, int32_t totalPieces) :
pieceBytes(pieceBytes), increBytes(increBytes)
{
int32_t n = ::ceil(totalPieces / npieces);
this->maxBytes = pieceBytes + (increBytes * (n - 1));
for (int32_t i = 0; i < n; ++i) {
for (int32_t j = 0; j < npieces; ++j) {
buffers[i * npieces + j] = std::vector<uint8_t>(pieceBytes);
idleBuffers.insert(i);
bufferByteIds.insert(std::pair<int32_t, uint32_t>{ pieceBytes, i * npieces + j });
}
pieceBytes += increBytes;
}
}
SimpleAllocer(SimpleAllocer&& rhs) noexcept
{
*this = std::move(rhs);
}
SimpleAllocer& operator=(SimpleAllocer&& rhs) noexcept
{
if (this != &rhs) {
std::lock_guard<std::mutex> rlock(rhs.buffersMutex);
std::lock_guard<std::mutex> lock(buffersMutex);
this->buffers = std::move(rhs.buffers);
this->bufferByteIds = std::move(rhs.bufferByteIds);
this->idleBuffers = std::move(rhs.idleBuffers);
this->inUseBuffers = std::move(rhs.inUseBuffers);
}
return *this;
}
virtual Data alloc(int32_t bytes) noexcept override
{
// TODO pick a good memory
// TEST: use a simple memory
std::lock_guard<std::mutex> lock(buffersMutex);
if (this->idleBuffers.empty()) {
return nullptr;
}
if (bytes > maxBytes) {
return nullptr;
}
// TODO: should use bufferByteIds to find a good and NOT too large buffer!
for (auto it = idleBuffers.begin(), end = idleBuffers.end(); it != end; ++it) {
auto memId = *it;
if (int32_t(buffers[memId].size()) < bytes) {
continue;
}
idleBuffers.erase(it);
inUseBuffers.insert(memId);
return Data{ buffers[memId].data(), Deletor{ memId, this } };
}
return nullptr;
}
void dealloc(uint32_t memId)
{
std::lock_guard<std::mutex> lock(buffersMutex);
auto it = inUseBuffers.find(memId);
if (it != inUseBuffers.end()) {
inUseBuffers.erase(it);
idleBuffers.insert(memId);
}
}
private:
// TODO maybe use a good mono buffer, and belows should all in monoBuffer!!
//std::vector<uint8_t> monoBuffer;
// TEST: use a simple buffer: { memId, buffer(should be part of monoBuffer) }
mutable std::mutex buffersMutex;///< Access buffers, idleBuffers and inUseBuffers ...
int32_t pieceBytes;
int32_t increBytes;
int32_t maxBytes;
std::unordered_map<uint32_t, std::vector<uint8_t> > buffers;
/// TODO use bufferByteIds or maybe another fast query table!
std::unordered_multimap<int32_t, uint32_t> bufferByteIds;
std::unordered_set<uint32_t> idleBuffers;
std::unordered_set<uint32_t> inUseBuffers;
};
// Example msgs
class I32Msg : public BaseMsg {
public:
operator int32_t() const noexcept
{
return *(reinterpret_cast<const int32_t*>(this->data.get()));
}
};
class StringMsg : public BaseMsg {
public:
operator boost::string_ref() const noexcept
{
return boost::string_ref{ reinterpret_cast<const char*>(this->data.get()), uint64_t(this->size - 1) };
}
};
class My {
public:
void cb(const boost::shared_ptr<const StringMsg>& msg) noexcept
{
std::cout << "my StringMsg cb: " << boost::string_ref(*msg) << ", from thread "
<< std::this_thread::get_id() << '\n';
}
};
TEST(Dispatcher, 1)
{
std::atomic<int32_t> K{ 0 };
//BaseAllocerPtr simpleAllocer(new SimpleAllocer(64, 1024, 4, 20));
BaseAllocerPtr simpleAllocer(new SimpleAllocer(16, 20, 4, 20));
Dispatcher<SimpleAllocer> dispatcher(simpleAllocer);
auto i32ProducerC1 = dispatcher.advertise<I32Msg>(1);
auto i32ProducerC1B = dispatcher.advertise<I32Msg>(1);
auto stringProducerC1 = dispatcher.advertise<StringMsg>(1);
auto stringProducerC1B = dispatcher.advertise<StringMsg>(1);
auto stringProducerC2 = dispatcher.advertise<StringMsg>(2);
std::mutex i32ConsumerC1CbMutex;
auto i32ConsumerC1Cb = [&i32ConsumerC1CbMutex](const boost::shared_ptr<const I32Msg>& msg) {
std::lock_guard<std::mutex> lock(i32ConsumerC1CbMutex);
std::cout << "i32ConsumerC1Cb: " << int32_t(*msg) << ", from thread "
<< std::this_thread::get_id() << '\n';
};
auto i32ConsumerC1 = dispatcher.subscribe<I32Msg>(1, i32ConsumerC1Cb, 2);
std::mutex stringConsumerC1CbMutex;
auto stringConsumerAllCb = [&stringConsumerC1CbMutex](const boost::shared_ptr<const StringMsg>& msg) {
std::lock_guard<std::mutex> lock(stringConsumerC1CbMutex);
std::cout << "stringConsumerAllCb: " << boost::string_ref(*msg) << ", from thread "
<< std::this_thread::get_id() << '\n';
};
auto stringConsumerC1 = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
auto stringConsumerC1B = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
auto stringConsumerC2 = dispatcher.subscribe<StringMsg>(2, stringConsumerAllCb, 2);
My my;
auto stringConsumerC2B = dispatcher.subscribe<StringMsg>(2, boost::bind(&My::cb, &my, _1), 2);
auto stringConsumerC2C = dispatcher.subscribe<StringMsg>(2, nullptr, 2);
// Produce
std::thread t1([&K, &i32ProducerC1] {
std::cout << "i32ProducerC1 thread " << std::this_thread::get_id() << '\n';
while (K < 100) {
// Get a memory!
auto p = i32ProducerC1->produce(4);
if (p) {
// Fill data!
*(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
// TODO: Notify consumer, maybe here need callback in thread!!
i32ProducerC1->publish(p);
} else {
std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
++K;
}
});
std::thread t2([&K, &i32ProducerC1B] {
std::cout << "i32ProducerC1B thread " << std::this_thread::get_id() << '\n';
while (K < 100) {
auto p = i32ProducerC1B->produce(4);
if (p) {
*(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
i32ProducerC1B->publish(p);
} else {
std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
++K;
}
});
std::thread t3([&K, &stringProducerC1, &stringProducerC1B] {
std::cout << "stringProducerC1 C1B thread " << std::this_thread::get_id() << '\n';
while (K < 100) {
std::string s = "sss=" + std::to_string(::rand() % 200 + 3000);
auto p = stringProducerC1->produce(s.length() + 1);
if (p) {
std::cout << "OK : stringProducerC1 C1B [1] will pub " << s << "\n";
::memcpy(p->getData().get(), s.data(), s.size() + 1);
stringProducerC1->publish(p);
} else {
std::cout << "WARN: stringProducerC1 C1B [1] NOT get memory!\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
s = "sss=" + std::to_string(::rand() % 200 + 3000);
p = stringProducerC1B->produce(s.length() + 1);
if (p) {
std::cout << "OK : stringProducerC1 C1B [2] will pub " << s << "\n";
::memcpy(p->getData().get(), s.data(), s.size() + 1);
stringProducerC1B->publish(p);
} else {
std::cout << "WARN: stringProducerC1 C1B [2] NOT get memory!\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
++K;
}
});
std::thread t4([&K, &stringProducerC2, &stringConsumerC2C] {
std::cout << "stringProducerC2 thread " << std::this_thread::get_id() << '\n';
while (K < 100) {
std::string s = "[[SSS K=" + std::to_string(K) + "]]";
auto p = stringProducerC2->produce(s.length() + 1);
if (p) {
::memcpy(p->getData().get(), s.data(), s.size() + 1);
stringProducerC2->publish(p);
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
if (0 == (K % 6)) {
StringMsg smsg;
while (stringConsumerC2C->deque(smsg)) {
std::cout << "deque smsg OK! " << boost::string_ref(smsg) << "\n";
}
}
++K;
}
});
t1.join();
t2.join();
t3.join();
t4.join();
}
int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}