Observer pattern

Observer 模式 (观察者模式 / 发布-订阅)

一个发布者(主题), 多个订阅者(观察者), 发布者发布消息通知所有订阅者.

例子
– 消息等发布-订阅
– MVC 是观察者模式的一种实现
– 常见的一个例子: 对同一组数据进行统计分析时候, 希望提供多种形式的表示,
(例如以表格进行统计显示, 柱状图统计显示和百分比统计显示等).
这些表示都依赖于同一组数据, 当数据改变的时候, 所有的统计的显示都能够改变.

认识观察者模式

认识观察者模式:

发布者(主题) + 订阅者(观察者) = 观察者模式:

观察者模式:

定义观察者模式

观察者模式 Observer pattern 定义了对象之间的一 (Subject or Topic) 对
多 (Observers) 的依赖关系.
这样一来, 当主题对象改变状态时, 它的所有依赖者都会收到通知并自动更新.

观察者模式 也称为发布-订阅 (Publish-Subscribe),
“主题 Subject” 就是消息的发布者(Publisher), “观察者 Observers”则是消息的订阅者
(Subscribers)

观察者模式应该可以说是应用最多, 影响最广的模式之一,
也是在大型系统开发过程中要用到的模式之一.

观察者的一个实例 Model / View / Control (MVC) 结构在系统开发架构设计中有着很重要的
地位和意义, MVC 实现了业务逻辑和表示层的解耦.
在 Java Struts 则提供和 MFC 中 Doc/View 结构类似的实现 MVC 的框架,
另外 Java 语言本身就提供了观察者模式的实现接口.

观察者模式类图示例


实现示例 1 分析

https://yuiwong.org/gitlab/cpp/cppprogdesipatt/tree/master/src/observer/example

/**
 * 简单的观察者模式(主题 即发布者, 观察者即订阅者)实现
 * g++ observer.cpp -std=c++11 -Wall -Wextra
 */
#include <iostream>
#include <memory>
#include <functional>
#include <set>
template<typename T> struct Observer;
/**
 * @struct Subject
 * A generic subject(i.e. publisher) implementation
 */
template<typename T>
struct Subject {
    using ObserverConstPtr = std::shared_ptr<Observer<T> const>;
    Subject() noexcept = default;
    virtual ~Subject() noexcept = default;
    virtual bool subscribe(ObserverConstPtr const& observer) noexcept;
    virtual bool unsubscribe(ObserverConstPtr const& observer) noexcept;
    virtual bool publish(std::shared_ptr<T const> const& message) const
    noexcept;
protected:
    virtual void notify() const noexcept;
    std::set<ObserverConstPtr> observers;
    mutable std::shared_ptr<T const> message;
};
/**
 * @struct Observer
 * Basic and generic observer(i.e. subscriber) implementation
 */
template<typename T>
struct Observer {
    Observer(
        std::function<void(std::shared_ptr<T const> const&)> const& callback)
        noexcept;
protected:
    Observer() noexcept = default;
public:
    virtual ~Observer() noexcept = default;
    virtual void update(std::shared_ptr<T const> const& message) const
    noexcept;
protected:
    std::function<void(std::shared_ptr<T const> const&)> callback;
};
template<typename T>
bool Subject<T>::subscribe(ObserverConstPtr const& observer) noexcept
{
    if (observer) {
        auto const ret = this->observers.insert(observer);
        return ret.second;
    }
    return false;
}
template<typename T>
bool Subject<T>::unsubscribe(ObserverConstPtr const& observer) noexcept
{
    auto const it = this->observers.find(observer);
    if (it != this->observers.end()) {
        this->observers.erase(observer);
        return true;
    }
    return false;
}
template<typename T>
bool Subject<T>::publish(std::shared_ptr<T const> const& message) const
noexcept
{
    if (message) {
        this->message = message;
        this->notify();
        return true;
    }
    return false;
}
template<typename T>
void Subject<T>::notify() const noexcept
{
    for (auto it = this->observers.cbegin(), end = this->observers.cend();
        it != end; ++it) {
        (*it)->update(this->message);
    }
}
template<typename T>
Observer<T>::Observer(
    std::function<void(std::shared_ptr<T const> const&)> const& callback)
    noexcept: callback(callback) {}
template<typename T>
void Observer<T>::update(std::shared_ptr<T const> const& message) const
noexcept
{
    if (this->callback) {
        this->callback(message);
    }
}
/// String subscriber
struct SSubsciber {
    SSubsciber();
    void callback(std::shared_ptr<std::string const> const& a);
protected:
    static int nextIdx;
    int idx;
};
int SSubsciber::nextIdx = 0;
SSubsciber::SSubsciber(): idx(SSubsciber::nextIdx++) {}
void SSubsciber::callback(std::shared_ptr<std::string const> const& a)
{
    std::cout << "SSubsciber." << this->idx << ": " << *a << "\n";
}
struct SObserver: public Observer<std::string> {
    SObserver(): Observer() {
        this->Observer::callback = std::bind(
            &SObserver::callback, this, std::placeholders::_1);
    }
    void callback(std::shared_ptr<std::string const> const& a) const {
        std::cout << "SObserver: " << *a << "\n";
    }
};
struct S2Observer: public Observer<std::string> {
    S2Observer(): Observer() {
        this->Observer::callback = std::bind(
            &S2Observer::callback, this, std::placeholders::_1);
    }
    void callback(std::shared_ptr<std::string const> const& a) const {
        std::cout << "S2Observer: " << *a << "\n";
    }
};
int main()
{
    Subject<std::string> sPublihser;
    std::shared_ptr<Observer<std::string> const> const sSubsciber1(
        new Observer<std::string>([](std::shared_ptr<
        std::string const> const& a){
            std::cout << "aSubsciber1: " << *a << "\n";
        }));
    std::shared_ptr<Observer<std::string> const> const sSubsciber2(
        new Observer<std::string>([](std::shared_ptr<
        std::string const> const& a){
            std::cout << "aSubsciber2: " << *a << "\n";
        }));
    SSubsciber ss3;
    SSubsciber ss4;
    std::shared_ptr<Observer<std::string> const> const s3(
        new Observer<std::string>(std::bind(
        &SSubsciber::callback, &ss3, std::placeholders::_1)));
    std::shared_ptr<Observer<std::string> const> const s4(
        new Observer<std::string>(std::bind(
        &SSubsciber::callback, &ss4, std::placeholders::_1)));
    std::shared_ptr<SObserver const> const s5(new SObserver());
    std::shared_ptr<S2Observer const> const s6(new S2Observer());
    sPublihser.subscribe(sSubsciber1);
    sPublihser.subscribe(sSubsciber2);
    sPublihser.subscribe(s3);
    sPublihser.subscribe(s4);
    sPublihser.subscribe(s5);
    sPublihser.subscribe(s6);
    auto const msg1 = std::shared_ptr<std::string>(new std::string("aaa"));
    auto const msg2 = std::shared_ptr<std::string>(new std::string("bbb"));
    sPublihser.publish(msg1);
    sPublihser.publish(msg2);
    std::cout << "\n";
    sPublihser.unsubscribe(sSubsciber1);
    sPublihser.publish(msg1);
    sPublihser.publish(msg2);
    return 0;
}

这里的主题提供依赖于它的观察者的注册和注销操作,
并且提供了使得依赖于它的所有观察者的通知操作. 观察者则提供一个 update 操作.

在观察者模式的示例实现中主题维护一个 unordered_set 作为存储其所有观察者的容器,
每当调用 notify 操作就遍历 set 中的观察者对象, 广播通知观察者 (调用观察者的 update).

运行示例程序, 可以看到当主题发布 aaa 和 bbb 的时候,
所有观察者都收到了 aaa 和 bbb, 然后第一个观察者取消订阅后,
再次发布 aaa 和 bbb 时除了第一个观察者其他都收到了.

TCP checksum

计算 TCP 首部中使用的校验和

  • TCP checksum is a 16-bit field in TCP header used for error detection
  • Same as IP checksum,
    TCP checksum is the 16-bit one’s complement of the one’s
    complement sum of all 16-bit words in the computation data.
  • Checksum 图:
  • TCP checksum computing includes:
    • UDP 数据报和 TCP 段中使用的校验和的计算都包含一个 12 字节长的伪首部.
      12 bytes TCP 伪首部 (12 bytes TCP pseudo header,
      from the IP header and computed), includes:
    • ipHdr srcIP 4 bytes
    • ipHdr dstIP 4 bytes
    • 1 reserved byte: 0x00
    • 1 byte protocol: from ipHdr: 0x06 for TCP
    • 2 byte computed TCP length
    • TCP “Pseudo Header” For Checksum Calculation 图:
    • Original TCP segment(length: is above TCP length), might+ padding,
      includes:
    • TCP header
    • TCP data, includes:
      • TCP data
      • Padded as needed with zero bytes at the end to make a multiple
        of two bytes

TCP “Pseudo Header” For Checksum Calculation

Field Name Bytes Description
Source Address 4 The 32-bit IP address of the originator of the datagram, taken from the IP header
Destination Address 4 The 32-bit IP address of the intended recipient of the datagram, also from the IP header
Reserved 1 8 bits of zeroes
Protocol 1 The Protocol field from the IP header. This indicates what higher-layer protocol is carried in the IP datagram. Of course, we already know what this protocol is, it’s TCP! So, this field will normally have the value 6
TCP Length 2 The length of the TCP segment, including both header and data. Note that this is not a specific field in the TCP header; it is computed
/**
 * 计算 TCP 首部中使用的校验和
 * @param cleanChecksum true to cleanup original checksum before compute
 * @param ipHdr IP首部
 * @return in network byte order
 */
uint16_t computeChecksum(
    bool const cleanChecksum, IpHdr const* const ipHdr) const
{
    // get computed TCP length, also is original TCP segment length
    size_t const protocolSize = ipHdr->computeProtocolSize();
    size_t const n = 12 + protocolSize;
    uint8_t data[n];
    // ipHdr srcIP
    ::memcpy(data, &(ipHdr->sourceIpAddress), 4);
    // ipHdr dstIP
    ::memcpy(data + 4, &(ipHdr->destinationIpAddress), 4);
    // reserved
    data[8] = 0x00;
    // protocol
    data[9] = ipHdr->protocol;
    // computed TCP length
    data[10] = (protocolSize & 0xff00) >> 8;
    data[11] = protocolSize & 0xff;
    // original TCP segment
    tcphdr* const tcpHdr = reinterpret_cast<tcphdr*>(data + 12);
    ::memcpy(tcpHdr, this->tcpHdr, protocolSize);
    // cleanup original checksum when need
    if (cleanChecksum) {
        tcpHdr->check = 0;
    }
    // compute
    return ethpacket::ComputeChecksum(
        reinterpret_cast<uint16_t const*>(data), n);
}

Ethernet packet checksum

计算以太网包首部中使用的校验和

算法: first summing all numbers(every 16-bit, might with padding)
and adding the carry (or carries) to the result,
then compute ones' complement:
– 发送端或接收端计算发送端的校验和时, 需要首先把校验和字段设置为 0
– 然后累积每个 16-bit
– 然后将最终结果"折叠"成 16 bits
– 然后 ones' complement (算法: 所有按位取反)
– 最后只要低 16 bits, 最后结果是网络字节顺序

/**
 * 计算以太网包首部中使用的校验和
 * @param data data to compute, in network byte order
 * @param bytes data size in bytes
 * @return in network byte order
 */
uint16_t ComputeChecksum(uint16_t const* const data, size_t const bytes)
{
    // initialize sum to zero
    unsigned long sum = 0;
    // n to keep current left bytes
    size_t n;
    // data index for each 16-bit
    long dataIndex;
    // accumulate sum
    for (n = bytes, dataIndex = -1; n > 1; n -= 2) {
        sum += data[++dataIndex];
    }
    /*
     * if any bytes left, pad the bytes with 0x00 and add
     * 最后一个 16-bit, 网络字节序:
     * 最后一个字节(位于最后一个 16-bit高位): MSB, then 0x00,
     * 而 MSB 在低地址, 因此计算使用最后一个 16-bit 是 0x00 then 最后一个字节:
     * - 即最后一个字节,
     * - => ((data[++dataIndex] >> 8) & 0xff) ... but data[...] bad
     * - => ((data[++dataIndex]) & htons(0xff00) ... but data[...] bad
     * - => uint8_t const msb = (reinterpret_cast<uint8_t const* const>(data))[
     *          long(bytes) - 1];
     */
    if (n > 0) {
        uint8_t const msb = (reinterpret_cast<uint8_t const* const>(data))[
            long(bytes) - 1];
        sum += msb;
    }
    // fold sum to 16 bits: add carrier to result
    while (sum >> 16) {
        sum = (sum >> 16) + (sum & 0xffff);
    }
    /*
     * ones' complement:
     * The ones' complement of a binary number is defined as the value
     * obtained by inverting all the bits in the binary representation of the
     * number (swapping 0s for 1s and vice versa)
     */
    return uint16_t(~sum);
}