不易理解的指针(C/C++)

不易理解的指针(C/C++)

See also


#include <iostream>

/**
 * 不易理解的指针:
 *
 * - 函数指针
 * - 指针数组
 * - 数组指针
 * - 返回函数指针的函数
 *
 * Compile:
 * g++ pointer.cpp -std=c++11 -Wall -Wextra
 *
 * @note better NOT function pointer, but use C++ functor if you can, e.g.
 * std::function
 * boost::function
 */

int main() noexcept
{
    // 函数指针和(函数)指针数组
    int (* a1[10])(int) = { nullptr };// The array of 10 function pointers, init [0] to nullptr, others 0
    // ERROR: int (error1[10])(int) = { nullptr };
    int* a2[10] = { nullptr };// The array of 10 int pointers, init [0] to nullptr, others 0
    std::cout << std::is_array<decltype(a2)>::value << '\n';// 1

    typedef int (function_t)(int);// Functor type
    typedef int function2_t(int);// Functor type
    int (foo)(int);// 函数声明
    typedef int (* functionptr_t)(int);// Functor pointer type (函数指针类型)
    // Another array of 10 function pointers, init [0] and [1] to nullptr, others 0
    function_t* a4[10] = { nullptr, nullptr };
    functionptr_t a5[10] = { nullptr, nullptr };

    std::cout << std::is_same<function_t, function2_t>::value << '\n';// 1
    std::cout << std::is_same<decltype(a1), decltype(a4)>::value << '\n';// 1
    std::cout << std::is_same<decltype(a1), decltype(a5)>::value << '\n';// 1

    int (* a3)(int) = nullptr;// 函数指针
    function_t* a6 = nullptr;// 函数指针
    functionptr_t a7 = nullptr;// 函数指针
    std::cout << std::is_same<decltype(a3), decltype(a6)>::value << '\n';// 1
    std::cout << std::is_same<decltype(a3), decltype(a7)>::value << '\n';// 1

    // 数组指针
    function_t* (* b1)[10] = nullptr;// The pointer of an array of 10 function pointers: b1
    // ERROR: function_t (*error2)[10] = nullptr;
    functionptr_t (* b2)[10] = nullptr;// The pointer of an array of 10 function pointers: b2
    typedef function_t* functionptrarr_t[10];// The array type of array of function pointers
    typedef functionptr_t functionptrarr2_t[10];// The array type of array of function pointers
    functionptrarr_t* b3 = nullptr;// 数组指针
    functionptrarr2_t* b4 = nullptr;// 数组指针
    b4 = b3 = b2 = b1;// OK
    std::cout << std::is_same<decltype(b1), decltype(b2)>::value << '\n';// 1
    std::cout << std::is_same<decltype(b1), decltype(b3)>::value << '\n';// 1
    std::cout << std::is_same<decltype(b1), decltype(b4)>::value << '\n';// 1
    int (* b5)[10] = nullptr;// The pointer of an array of 10 int pointers: b5
    typedef int intarr_t[10];// The array type of array of 10 int pointers
    intarr_t* b6 = nullptr;// 数组指针
    b6 = b5;// OK
    std::cout << std::is_same<decltype(b5), decltype(b6)>::value << '\n';// 1
    std::cout << std::is_array<decltype(b1)>::value << '\n';// 0

    // 返回函数指针的函数
    // foo2 是一个函数(声明), 该函数的参数是第一个 int 型, 第二个 char* 型,
    // 该函数返回值的是一个函数指针(对象),
    // 这个函数指针(指向的函数)的类型是 int 型返回值和一个 int 型参数.
    int (* foo2(int, char*))(int);
    // 即上面等价于:
    typedef int foo_t(int);
    foo_t* foo3(int, char*);
    std::cout << std::is_same<decltype(foo2), decltype(foo3)>::value << '\n';// 1

    return 0;
}

GNU libc Rounding Functions

GNU libc Rounding Functions

See also http://www.gnu.org/software/libc/manual/html_node/Rounding-Functions.html

The functions listed here perform operations such as rounding and truncation
of floating-point values. Some of these functions convert floating point
numbers to integer values. They are all declared in math.h.

You can also convert floating-point numbers to integers simply by casting
them to int. This discards the fractional part
, effectively rounding towards
zero.
However, this only works if the result can actually be represented as an
int-for very large numbers, this is impossible.
The functions listed here return the result as a double instead to get around
this problem.

The fromfp functions use the following macros, from TS 18661-1:2014, to
specify the direction of rounding.
These correspond to the rounding directions defined in IEEE 754-2008.

trunc

The trunc functions round x towards zero to the nearest integer
(returned in floating-point format). Thus,
trunc (1.5) is 1.0 and trunc (-1.5) is -1.0 (丢掉小数部分, x 大于等于 0 的时候结果
等于 floor(x), x 小于等于 0 的时候结果等于 ceil(x)
).

ceil

These functions round x upwards to the nearest integer, returning that value
as a double. Thus, ceil (1.5) is 2.0 (ceil: 天花板 — 向上(大)取整).

floor

These functions round x downwards to the nearest integer, returning that value
as a double. Thus, floor (1.5) is 1.0 and floor (-1.5) is -2.0 (
floor: 地板 — 向下(小)取整).

round

These functions are similar to rint, but they round halfway cases(一半的情况)
away from zero instead of to the nearest integer
(or other current rounding mode).
I.e. round to nearest integer, away from zero (返回 四舍五入整数).

rint

These functions round x to an integer value according to the current rounding
mode, usually 1, example 2.23-0ubuntu11 libc, gcc 5.4.0: FLT_ROUNDS is 1:

/usr/lib/gcc/x86_64-linux-gnu/5/include/float.h:128:

/* Addition rounds to 0: zero, 1: nearest, 2: +inf, 3: -inf, -1: unknown.  */
/* ??? This is supposed to change with calls to fesetround in <fenv.h>.  */
#undef FLT_ROUNDS
#define FLT_ROUNDS 1

And:

BUGS
       C99 specifies that the value of FLT_ROUNDS should reflect changes
       to the current rounding mode, as set by fesetround().  Currently,
       this does not occur: FLT_ROUNDS always has the value 1.

See also http://www.gnu.org/software/libc/manual/html_node/Floating-Point-Parameters.html#Floating-Point-Parameters

Round to nearest integer (Like round, but maybe NOT good, better use round
when nearest integer, especially “四舍五入”
, rint 返回 最接近参数的整数,
如果有两个数同样接近, 则会返回偶数的那个 !!
).

Bad rint examples

EXPECT_DOUBLE_EQ(-2.0, ::rint(-1.5));
EXPECT_DOUBLE_EQ(-2.0, ::rint(-2.5));// Here we want -3.0!
EXPECT_DOUBLE_EQ(0.0, ::rint(0.5));// Here we want 1.0!
EXPECT_DOUBLE_EQ(0.0, ::rint(-0.5));// Here we want -1.0!

Good round examples

EXPECT_DOUBLE_EQ(-2.0, ::round(-1.5));
EXPECT_DOUBLE_EQ(-3.0, ::round(-2.5));// Here we want -3.0!
EXPECT_DOUBLE_EQ(1.0, ::round(0.5));// Here we want 1.0!
EXPECT_DOUBLE_EQ(-1.0, ::round(-0.5));// Here we want -1.0!

对比

value round rint floor ceil trunc
0.1 0.0 0.0 0.0 1.0 0.0
0.5 1.0 0.0 0.0 1.0 0.0
0.6 1.0 1.0 0.0 1.0 0.0
1.1 1.0 1.0 1.0 2.0 1.0
1.5 2.0 2.0 1.0 2.0 1.0
2.3 2.0 2.0 2.0 3.0 2.0
2.5 3.0 2.0 2.0 3.0 2.0
3.8 4.0 4.0 3.0 4.0 3.0
5.5 6.0 6.0 5.0 6.0 5.0
-0.5 -1.0 -0.0 -1.0 -0.0 -0.0
-1.1 -1.0 -1.0 -2.0 -1.0 -1.0
-1.5 -2.0 -2.0 -2.0 -1.0 -1.0
-2.3 -2.0 -2.0 -3.0 -2.0 -2.0
-2.5 -3.0 -2.0 -3.0 -2.0 -2.0
-3.8 -4.0 -4.0 -4.0 -3.0 -3.0
-5.5 -6.0 -6.0 -6.0 -5.0 -5.0

See also

Buff

Buff

A simple observer implementation with customizable allocer and customizable
messages!
Maybe we need a producer-consumer .. here just implements in observer, i.e. publisher-subsciber!
And use in a single process!

#include <atomic>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <thread>
#include <mutex>
#include "boost/shared_ptr.hpp"
#include "boost/utility/string_ref.hpp"
#include "boost/bind.hpp"
#include "gtest/gtest.h"

using Data = boost::shared_ptr<uint8_t[]>;

class BaseMsg {
public:
    virtual ~BaseMsg() noexcept {}
    inline explicit operator bool() const noexcept
    {
        return this->data != nullptr;
    }
    inline void setData(const Data& data) noexcept
    {
        this->data = data;
    }
    inline void setSize(int32_t size) noexcept
    {
        this->size = size;
    }
    inline Data getData() noexcept
    {
        return this->data;
    }
    inline int32_t getSize() noexcept
    {
        return this->size;
    }
protected:
    Data data{ nullptr };
    int32_t size{ 0 };
    uint32_t seq{ 0 };///< Seq-number, TODO: use this field!
    double stamp{ 0.0 };///< TODO: maybe also use this field!
};
/// Here shared_ptr not very necessary... expect sub-msgs has many customs fields!
using BaseMsgConstPtr = boost::shared_ptr<const BaseMsg>;

// Forward declaretions:
// Producer and ProducerPtr
template<typename Msg, typename Allocer>
class Producer;
template<typename Msg, typename Allocer>
using ProducerPtr = boost::shared_ptr<Producer<Msg, Allocer> >;
// BaseConsumer and BaseConsumerPtr
class BaseConsumer;
using BaseConsumerPtr = boost::shared_ptr<BaseConsumer>;

template<typename Msg>
class Consumer;

class BaseAllocer {
public:
    virtual ~BaseAllocer() noexcept {}
    virtual Data alloc(int32_t /*bytes*/) noexcept
    {
        return nullptr;
    }
};
using BaseAllocerPtr = boost::shared_ptr<BaseAllocer>;

/// The core handler, dispatcher:
template<typename Allocer>
class Dispatcher {
public:
    // Allocer check
    static_assert(
        std::is_base_of<BaseAllocer, typename std::decay<Allocer>::type>::value,
        "BaseAllocer should be base of Allocer");

    Dispatcher(const BaseAllocerPtr& allocer) noexcept : allocer(allocer) {}
    template<typename Msg>
    ProducerPtr<Msg, Allocer> advertise(int32_t channel);
    template<typename Msg>
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    template<typename Msg>
    boost::shared_ptr<Consumer<Msg> > subscribe(
        int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept;

    Data alloc(int32_t bytes) noexcept;
    template<typename Msg>
    void publish(int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept;

    // TODO: dispatch(), e.g. in a or multi threads to dispatch buffers to all subscibers!

private:
    BaseAllocerPtr allocer{ nullptr };
    std::unordered_multimap<int32_t, BaseConsumerPtr> subscibers;
};

// 1. produce(), 2. publish()
template<typename Msg, typename Allocer>
class Producer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");
    Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept;
    /// Produce a buffer to fill message.
    boost::shared_ptr<Msg> produce(int32_t bytes);
    inline void publish(const boost::shared_ptr<const Msg>& msg) noexcept;
private:
    const int32_t channel;
    Dispatcher<Allocer>& handle;
};

/// Consumer, i.e. subsciber.
class BaseConsumer {
public:
    virtual void callback(const BaseMsgConstPtr&) {}
};

template<typename Msg>
class Consumer : public BaseConsumer {
public:
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    Consumer() noexcept {}
    Consumer(int32_t qsz) noexcept : qsz(qsz) {}

    // If callback setup then auto callback!
    using Cb = std::function<void (const boost::shared_ptr<const Msg>&)>;
    inline void setCb(const Cb& cb) noexcept
    {
        this->cb = cb;
    }
    inline Cb& getCb() noexcept
    {
        return this->cb;
    }
    inline void setConnected(bool connected) noexcept
    {
        this->connected = connected;
    }
    inline bool isConnected() const noexcept
    {
        return this->connected;
    }

    // If not set callback, you can deque datas
    bool deque(Msg& m) noexcept
    {
        std::lock_guard<std::mutex> lock(qMutex);
        if (q.empty()) {
            return false;
        }
        auto qMsg = q.front();
        q.pop_front();
        boost::shared_ptr<const Msg> msg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (!msg) {
            return false;
        }
        m = *msg;
        return true;
    }

    virtual void callback(const BaseMsgConstPtr& msg) override;
private:
    bool connected{ true };
    Cb cb{ nullptr };
    int32_t qsz{ 0 };
    mutable std::mutex qMutex;///< Access q
    std::list<BaseMsgConstPtr> q;
};

// Implementations
template<typename Allocer>
template<typename Msg>
ProducerPtr<Msg, Allocer> Dispatcher<Allocer>::advertise(int32_t channel)
{
    // Msg check
    static_assert(
        std::is_base_of<BaseMsg, typename std::decay<Msg>::type>::value,
        "BaseMsg should be base of Msg");

    ProducerPtr<Msg, Allocer> p(new Producer<Msg, Allocer>{ channel, *this });
    return p;
}
template<typename Allocer>
template<typename Msg>
boost::shared_ptr<Consumer<Msg> > Dispatcher<Allocer>::subscribe(
    int32_t channel, const Cb<Msg>& cb, int32_t queneSz) noexcept
{
    boost::shared_ptr<Consumer<Msg> > c(new Consumer<Msg>(queneSz));
    c->setCb(cb);
    BaseConsumerPtr baseC = c;
    subscibers.insert(std::pair<int32_t, BaseConsumerPtr>{ channel, baseC });
    return c;
}
template<typename Allocer>
Data Dispatcher<Allocer>::alloc(int32_t bytes) noexcept
{
    return allocer->alloc(bytes);
}
template<typename Allocer>
template<typename Msg>
void Dispatcher<Allocer>::publish(
    int32_t channel, const boost::shared_ptr<const Msg>& msg) noexcept
{
    auto rit = subscibers.equal_range(channel);
    BaseConsumerPtr c;
    for (auto it = rit.first, end = rit.second; it != end; ++it) {
        c = it->second;
        // TODO: callback in thread!!
        c->callback(msg);
    }
}

template<typename Msg, typename Allocer>
Producer<Msg, Allocer>::Producer(int32_t channel, Dispatcher<Allocer>& handle) noexcept :
    channel(channel), handle(handle) {}
template<typename Msg, typename Allocer>
boost::shared_ptr<Msg> Producer<Msg, Allocer>::produce(int32_t bytes)
{
    auto p = handle.alloc(bytes);
    if (!p) {
        return nullptr;
    }
    boost::shared_ptr<Msg> msg(new Msg());
    msg->setData(p);
    msg->setSize(bytes);
    return msg;
}
template<typename Msg, typename Allocer>
inline void Producer<Msg, Allocer>::publish(const boost::shared_ptr<const Msg>& msg) noexcept
{
    handle.publish(channel, msg);
}

template<typename Msg>
void Consumer<Msg>::callback(const BaseMsgConstPtr& msg)
{
    if (!connected || !cb) {
        // Enque
        std::lock_guard<std::mutex> lock(qMutex);
        q.push_back(msg);
        if (int32_t(q.size()) > qsz) {
            std::cout << "q full, deque front\n";
            q.pop_front();
        }
        return;
    }
    // Eat all, but queue first!
    bool qEmpty;
    {
        std::lock_guard<std::mutex> lock(qMutex);
        qEmpty = q.empty();
    }
    while(!qEmpty) {
        BaseMsgConstPtr qMsg;
        {
            std::lock_guard<std::mutex> lock(qMutex);
            if (!q.empty()) {
                qMsg = q.front();
                q.pop_front();
                qEmpty = q.empty();
            } else {
                break;
            }
        }
        boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(qMsg);
        if (cbMsg) {
            cb(cbMsg);
        }
    }
    // Eat the new one
    boost::shared_ptr<const Msg> cbMsg = boost::dynamic_pointer_cast<const Msg, const BaseMsg>(msg);
    if (cbMsg) {
        cb(cbMsg);
    }
    return;
}

// Examples
// Example allocer
class SimpleAllocer : public BaseAllocer {
public:
    /// Fake deletor, we not need really delete, just let this area free!
    class Deletor {
    public:
        Deletor(uint64_t memId = UINT32_MAX, SimpleAllocer* allocer = nullptr) noexcept :
            memId(memId), allocer(allocer) {}
        void operator()(uint8_t*& p) noexcept
        {
            if (p) {
                // NO delete
                // Just to nullptr!
                p = nullptr;
                // And move to idleBuffers!
                allocer->dealloc(memId);
            }
        }
    private:
        uint32_t memId{ UINT32_MAX };
        SimpleAllocer* allocer{ nullptr };
    };
    /// @throw std::bad_alloc when no enough memory.
    // TODO: maybe, examle: small need 20, big only need 10...
    SimpleAllocer(int32_t pieceBytes, int32_t increBytes, int32_t npieces, int32_t totalPieces) :
        pieceBytes(pieceBytes), increBytes(increBytes)
    {
        int32_t n = ::ceil(totalPieces / npieces);
        this->maxBytes = pieceBytes + (increBytes * (n - 1));
        for (int32_t i = 0; i < n; ++i) {
            for (int32_t j = 0; j < npieces; ++j) {
                buffers[i * npieces + j] = std::vector<uint8_t>(pieceBytes);
                idleBuffers.insert(i);
                bufferByteIds.insert(std::pair<int32_t, uint32_t>{ pieceBytes, i * npieces + j });
            }
            pieceBytes += increBytes;
        }
    }
    SimpleAllocer(SimpleAllocer&& rhs) noexcept
    {
        *this = std::move(rhs);
    }
    SimpleAllocer& operator=(SimpleAllocer&& rhs) noexcept
    {
        if (this != &rhs) {
            std::lock_guard<std::mutex> rlock(rhs.buffersMutex);
            std::lock_guard<std::mutex> lock(buffersMutex);
            this->buffers = std::move(rhs.buffers);
            this->bufferByteIds = std::move(rhs.bufferByteIds);
            this->idleBuffers = std::move(rhs.idleBuffers);
            this->inUseBuffers = std::move(rhs.inUseBuffers);
        }
        return *this;
    }
    virtual Data alloc(int32_t bytes) noexcept override
    {
        // TODO pick a good memory

        // TEST: use a simple memory
        std::lock_guard<std::mutex> lock(buffersMutex);
        if (this->idleBuffers.empty()) {
            return nullptr;
        }
        if (bytes > maxBytes) {
            return nullptr;
        }
        // TODO: should use bufferByteIds to find a good and NOT too large buffer!
        for (auto it = idleBuffers.begin(), end = idleBuffers.end(); it != end; ++it) {
            auto memId = *it;
            if (int32_t(buffers[memId].size()) < bytes) {
                continue;
            }
            idleBuffers.erase(it);
            inUseBuffers.insert(memId);
            return Data{ buffers[memId].data(), Deletor{ memId, this } };
        }
        return nullptr;
    }
    void dealloc(uint32_t memId)
    {
        std::lock_guard<std::mutex> lock(buffersMutex);
        auto it = inUseBuffers.find(memId);
        if (it != inUseBuffers.end()) {
            inUseBuffers.erase(it);
            idleBuffers.insert(memId);
        }
    }
private:
    // TODO maybe use a good mono buffer, and belows should all in monoBuffer!!
    //std::vector<uint8_t> monoBuffer;
    // TEST: use a simple buffer: { memId, buffer(should be part of monoBuffer) }
    mutable std::mutex buffersMutex;///< Access buffers, idleBuffers and inUseBuffers ...
    int32_t pieceBytes;
    int32_t increBytes;
    int32_t maxBytes;
    std::unordered_map<uint32_t, std::vector<uint8_t> > buffers;
    /// TODO use bufferByteIds or maybe another fast query table!
    std::unordered_multimap<int32_t, uint32_t> bufferByteIds;
    std::unordered_set<uint32_t> idleBuffers;
    std::unordered_set<uint32_t> inUseBuffers;
};

// Example msgs
class I32Msg : public BaseMsg {
public:
    operator int32_t() const noexcept
    {
       return *(reinterpret_cast<const int32_t*>(this->data.get()));
    }
};
class StringMsg : public BaseMsg {
public:
    operator boost::string_ref() const noexcept
    {
       return boost::string_ref{ reinterpret_cast<const char*>(this->data.get()), uint64_t(this->size - 1) };
    }
};

class My {
public:
    void cb(const boost::shared_ptr<const StringMsg>& msg) noexcept
    {
        std::cout << "my StringMsg cb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    }
};

TEST(Dispatcher, 1)
{
    std::atomic<int32_t> K{ 0 };
    //BaseAllocerPtr simpleAllocer(new SimpleAllocer(64, 1024, 4, 20));
    BaseAllocerPtr simpleAllocer(new SimpleAllocer(16, 20, 4, 20));
    Dispatcher<SimpleAllocer> dispatcher(simpleAllocer);
    auto i32ProducerC1 = dispatcher.advertise<I32Msg>(1);
    auto i32ProducerC1B = dispatcher.advertise<I32Msg>(1);
    auto stringProducerC1 = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC1B = dispatcher.advertise<StringMsg>(1);
    auto stringProducerC2 = dispatcher.advertise<StringMsg>(2);

    std::mutex i32ConsumerC1CbMutex;
    auto i32ConsumerC1Cb = [&i32ConsumerC1CbMutex](const boost::shared_ptr<const I32Msg>& msg) {
        std::lock_guard<std::mutex> lock(i32ConsumerC1CbMutex);
        std::cout << "i32ConsumerC1Cb: " << int32_t(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto i32ConsumerC1 = dispatcher.subscribe<I32Msg>(1, i32ConsumerC1Cb, 2);

    std::mutex stringConsumerC1CbMutex;
    auto stringConsumerAllCb = [&stringConsumerC1CbMutex](const boost::shared_ptr<const StringMsg>& msg) {
        std::lock_guard<std::mutex> lock(stringConsumerC1CbMutex);
        std::cout << "stringConsumerAllCb: " << boost::string_ref(*msg) << ", from thread "
            << std::this_thread::get_id() << '\n';
    };
    auto stringConsumerC1 = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC1B = dispatcher.subscribe<StringMsg>(1, stringConsumerAllCb, 2);
    auto stringConsumerC2 = dispatcher.subscribe<StringMsg>(2, stringConsumerAllCb, 2);
    My my;
    auto stringConsumerC2B = dispatcher.subscribe<StringMsg>(2, boost::bind(&My::cb, &my, _1), 2);
    auto stringConsumerC2C = dispatcher.subscribe<StringMsg>(2, nullptr, 2);

    // Produce
    std::thread t1([&K, &i32ProducerC1] {
        std::cout << "i32ProducerC1 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            // Get a memory!
            auto p = i32ProducerC1->produce(4);
            if (p) {
                // Fill data!
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                // TODO: Notify consumer, maybe here need callback in thread!!
                i32ProducerC1->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t2([&K, &i32ProducerC1B] {
        std::cout << "i32ProducerC1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            auto p = i32ProducerC1B->produce(4);
            if (p) {
                *(reinterpret_cast<int32_t*>(p->getData().get())) = ::rand() % 1000;
                i32ProducerC1B->publish(p);
            } else {
                std::cout << "WARN: i32ProducerC1B NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t3([&K, &stringProducerC1, &stringProducerC1B] {
        std::cout << "stringProducerC1 C1B thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "sss=" + std::to_string(::rand() % 200 + 3000);
            auto p = stringProducerC1->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [1] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [1] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            s = "sss=" + std::to_string(::rand() % 200 + 3000);
            p = stringProducerC1B->produce(s.length() + 1);
            if (p) {
                std::cout << "OK  : stringProducerC1 C1B [2] will pub " << s << "\n";
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC1B->publish(p);
            } else {
                std::cout << "WARN: stringProducerC1 C1B [2] NOT get memory!\n";
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            ++K;
        }
    });
    std::thread t4([&K, &stringProducerC2, &stringConsumerC2C] {
        std::cout << "stringProducerC2 thread " << std::this_thread::get_id() << '\n';
        while (K < 100) {
            std::string s = "[[SSS K=" + std::to_string(K) + "]]";
            auto p = stringProducerC2->produce(s.length() + 1);
            if (p) {
                ::memcpy(p->getData().get(), s.data(), s.size() + 1);
                stringProducerC2->publish(p);
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            if (0 == (K % 6)) {
                StringMsg smsg;
                while (stringConsumerC2C->deque(smsg)) {
                    std::cout << "deque smsg OK! " << boost::string_ref(smsg) << "\n";
                }
            }
            ++K;
        }
    });
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
int main(int argc, char** argv)
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}