数据库(MySQL)和缓存(Redis)的数据一致性问题

数据库(MySQL)和缓存(Redis)的数据一致性问题

涉及到数据更新或者删除,就容易出现缓存和数据库间的数据一致性问题。

问题

数据一致性方式:
* 先删除缓存,再写数据库;
* 先写数据库,再删除缓存;
* 先更新缓存,再写数据库;
* 先写数据库,再更新缓存;

数据一致性问题:
1. 如果先删了缓存,还没有来得及写MySQL,另一个线程就来读,发现缓存空,则去数据库读取数据写入缓存,此时缓存中为脏数据.
2. 如果先写库,在删除缓存前,写库线程挂掉,没有删掉缓存.
3. …其他性能等问题

处理策略

  • 延时双删策略
  • 延时双删 + 重试
  • 延时双删 + binlog重试(canal
  • redis分布式读写锁(可以保证绝对一致性)
  • 短TTL + 双删
  • 后删除 + binlog重试

短ttl和双删

相同对象、整体,单进程数据库缓存,最终数据一致性最佳实践(无锁、简单、解决决定一致性,可以很好保证最终一致性):
短tll双删一致性.svg

  1. 临时缓存GET使用的短TTL,例如15秒(GET的时候发现有这个TTL,使用计数缓存,可以多个写,就使用这个,否则使用较长的) — 失败不写了
  2. 删除缓存 — 失败不写了
  3. 写数据库
  4. 删除GET使用的短TTL(临时缓存,务必,进程内可以保证)
  5. 再删除缓存,无需重试,可以失败(可选,最好有,也可不必)
示例

修改:

/**
 * 根据主键批量部分更新 (主键字段为一个字段,自动获取)
 *
 * @param <E>          实例类类型
 * @param objs         实体类对象集合,主键字段和要更新的字段是必须的
 * @param updateFields 要更新的字段,非null时不能为empty,如果要更新所有字段,这个字段应该是null
 * @param batchSize    每次更新的数量,null时不限制,一次更新,否则应该大于等于1
 * @param newCache     是否缓存,默认不缓存
 * @return 执行结果,更新数量,可能是字段的数量
 * @throws OperationException 失败时抛出异常
 * @since 2021-09-27 p2
 */
@Override
@Modifying(flushAutomatically = true, clearAutomatically = true)
@Transactional(rollbackFor = {Exception.class, RuntimeException.class})
public <E> int update(@NotNull Collection<E> objs,
                      @Nullable Collection<String> updateFields,
                      @Nullable Integer batchSize,
                      boolean newCache) throws OperationException {
    // 检查对象
    OperationAssert.notEmpty(objs, MSG_UPDATE_COLLECTION_CANNOT_EMPTY);
    E first = objs.iterator().next();
    OperationAssert.notNull(first, MSG_UPDATE_OBJ_CANNOT_NULL);

    // 主键字段
    Class<E> domainClass = (Class<E>) first.getClass();
    String idFieldName = DbUtils.getIdFieldName(domainClass);

    DbCache cache = null;
    List<String> ids = null;
    CacheConfiguration config = getConfiguration();
    boolean addedTtl = false;
    int result;
    try {
        if (config.isEnableDbCache() && (cache = getDbCache()) != null) {
            ids = ClassUtils.getFieldValuesNoexcept(objs, idFieldName, domainClass, String.class);
            // 1. 缓存短ttl
            addedTtl = addShortTtl(idFieldName, ids, cache, domainClass);
            // 2. 删除缓存 -- 失败不写了
            OperationAssert.expectTrue(cache.deleteByKeys(ids, domainClass) >= 0, "删除缓存失败");
        }

        // 3. 写数据库
        result = update(objs, Collections.singletonList(DbUtils.getDbIdFieldName(domainClass)), updateFields,
                batchSize, domainClass);
    } finally {
        // 4. 删除GET使用的短TTL
        removeShortTtl(idFieldName, ids, cache, addedTtl, domainClass);
    }

    if (!newCache) {
        // 5. 再移除缓存,忽略错误
        removeCacheAgainNoexcept(ids, cache, domainClass);
    } else {
        // 6. 需要时新增缓存,忽略错误
        setCache(idFieldName, objs, domainClass);
    }
    // 7. 需要时进行更新通知
    if (canNotifyModify(config, domainClass)) {
        mqNotify(objs, domainClass, DatasourceChangeNotify.CHANGE_TYPE_MODIFY);
    }
    return result;
}

查询:

/**
 * Retrieves an entity by its id.
 *
 * @param id must not be {@literal null}.
 * @return the entity with the given id or {@literal Optional#empty()} if none found.
 * @throws IllegalArgumentException if {@literal id} is {@literal null}.
 */
@NotNull
@Override
public Optional<E> findById(@NotNull I id) {
    CacheConfiguration config = sqlExecuteAdapter.getConfiguration();
    DbCache cache;
    if (config.isEnableDbCache() && ((cache = sqlExecuteAdapter.getDbCache()) != null)) {
        // 查询,支持缓存
        Class<E> domainClass = getDomainClass();
        DbCacheData<E> cacheData = cache.findByKey(id, domainClass, 1);
        if ((cacheData != null) && !CollectionUtils.isEmpty(cacheData.getData())
                && (cacheData.getData().iterator().next() != null)) {
            return Optional.of(cacheData.getData().iterator().next());
        } else {
            return Optional.empty();
        }
    }

    // 直接查询
    return super.findById(id);
}

@Nullable
private <T> DbCacheData<T> findByKey(@NotNull String field,
                                     @Nullable Object key,
                                     @Nullable Integer max,
                                     @NotNull String cacheKey,
                                     @NotNull Class<T> clazz) {
    CriteriaBuilder criteriaBuilder = sqlExecutor.getCriteriaBuilder();
    CriteriaQuery<T> query = SqlUtils.createCriteriaQuery(criteriaBuilder, clazz);
    Root<T> root = query.from(clazz);
    Predicate predicate;
    if (StringUtils.isNotBlank(field) && (key != null)) {
        predicate = SqlUtils.createAndInIfValid(criteriaBuilder.conjunction(), field, Collections.singleton(key),
                root, criteriaBuilder);
    } else {
        predicate = criteriaBuilder.conjunction();
    }
    query.where(predicate);
    List<T> objects = sqlExecutor.query(query, max);
    DbCacheData<T> newDbCacheData = new DbCacheData<>();
    newDbCacheData.setCacheTime(DateUtils.now());
    if (!CollectionUtils.isEmpty(objects)) {
        newDbCacheData.setData(objects);
        newDbCacheData.setStatus(DbCacheData.STATUS_OK);
    } else {
        newDbCacheData.setStatus(DbCacheData.STATUS_PLACEHOLDER);
    }

    try {
        CacheConfiguration config = getConfiguration();
        long ttl = useShortTtl(cacheKey) ? config.getShortDbCacheTtl() : config.getDbCacheTtl();
        RedisAccess<String, Object> access = getRedisAccess();
        if (!access.set(cacheKey, newDbCacheData, ttl)) {
            log.warn("ignored set error, key {}", cacheKey);
        }
    } catch (Exception e) {
        log.warn("ignored error {} {}", e.getMessage(), e.getStackTrace());
    }
    if (NumericUtils.equals(DbCacheData.STATUS_OK, newDbCacheData.getStatus())) {
        return newDbCacheData;
    } else {
        return null;
    }
}

参考

One more circular buffer(normal queue)

One more circular buffer(normal queue)

Sometimes we prefer to use circular buffer: “like CircularBuffer, but NOT use size and next index, but use front and rear”, so an impletametation here.

See also
Write a CircularBuffer
std::queue
Full impletametation: circularqueue.hpp

/**
 * Var capacity circular queue.
 * NOTE:
 * - Like CircularBuffer, but if full, enque(push) will FAIL!
 * - And like CircularBuffer, but NOT use size and next index, but use front
 *   and rear.
 * - When use in multi-threads, should add lock or some other to sync.
 * @tparam T data type.
 * @sa CircularBuffer
 * @todo
 * - Add full CircularBuffer like features.
 * - Add iterators.
 */
template<typename T>
class CircularQueue {
public:
    /**
     * Ctor.
     * @param capacity the buffer capacity, should >= 1.
     * @throw std::logic_error when capacity invalid.
     */
    CircularQueue(int32_t capacity = 1, const T& initValue = T{});
    /**
     * Test whether container is empty
     * @return True if queue is empty, false otherwise.
     */
    inline bool empty() const noexcept;
    /**
     * Test whether container is empty
     * @return True if queue if full, false otherwise.
     */
    inline bool full() const noexcept;
    /// @return data size.
    inline int32_t size() const noexcept;
    /// @return queue capacity.
    inline int32_t capacity() const noexcept;
    /**
     * Inserts a new element at the end of the queue, after its current last
     * element. The content of this new element is initialized to value.
     * @param value Value to which the inserted element is initialized.
     * Member type value_type is the type of the elements in the container
     * (defined as an alias of the first class template parameter, T).
     */
    bool push(const T& value) noexcept;
    bool push(T&& value) noexcept;
    bool pop() noexcept;
    bool pop(T& value) noexcept;
    /// @throw std::logic_error when queue empty.
    T pop_front();
    /**
     * Get queue head(front) when not empty.
     * @return queue head(front) when not empty.
     * @throw std::logic_error when empty.
     */
    const T& front() const;
    T& front();
private:
    /// Pop index(current or next readable), init as empty.
    int32_t currRFront{ -1 };
    int32_t lastWRear{ -1 };///< Push index(last writeable), init as empty
    int32_t capa{ 1 };///< The buffer capacity, > 0.
    std::vector<T> buffer;///< The internal buffer.
};

不易理解的指针(C/C++)

不易理解的指针(C/C++)

See also


#include <iostream>

/**
 * 不易理解的指针:
 *
 * - 函数指针
 * - 指针数组
 * - 数组指针
 * - 返回函数指针的函数
 *
 * Compile:
 * g++ pointer.cpp -std=c++11 -Wall -Wextra
 *
 * @note better NOT function pointer, but use C++ functor if you can, e.g.
 * std::function
 * boost::function
 */

int main() noexcept
{
    // 函数指针和(函数)指针数组
    int (* a1[10])(int) = { nullptr };// The array of 10 function pointers, init [0] to nullptr, others 0
    // ERROR: int (error1[10])(int) = { nullptr };
    int* a2[10] = { nullptr };// The array of 10 int pointers, init [0] to nullptr, others 0
    std::cout << std::is_array<decltype(a2)>::value << '\n';// 1

    typedef int (function_t)(int);// Functor type
    typedef int function2_t(int);// Functor type
    int (foo)(int);// 函数声明
    typedef int (* functionptr_t)(int);// Functor pointer type (函数指针类型)
    // Another array of 10 function pointers, init [0] and [1] to nullptr, others 0
    function_t* a4[10] = { nullptr, nullptr };
    functionptr_t a5[10] = { nullptr, nullptr };

    std::cout << std::is_same<function_t, function2_t>::value << '\n';// 1
    std::cout << std::is_same<decltype(a1), decltype(a4)>::value << '\n';// 1
    std::cout << std::is_same<decltype(a1), decltype(a5)>::value << '\n';// 1

    int (* a3)(int) = nullptr;// 函数指针
    function_t* a6 = nullptr;// 函数指针
    functionptr_t a7 = nullptr;// 函数指针
    std::cout << std::is_same<decltype(a3), decltype(a6)>::value << '\n';// 1
    std::cout << std::is_same<decltype(a3), decltype(a7)>::value << '\n';// 1

    // 数组指针
    function_t* (* b1)[10] = nullptr;// The pointer of an array of 10 function pointers: b1
    // ERROR: function_t (*error2)[10] = nullptr;
    functionptr_t (* b2)[10] = nullptr;// The pointer of an array of 10 function pointers: b2
    typedef function_t* functionptrarr_t[10];// The array type of array of function pointers
    typedef functionptr_t functionptrarr2_t[10];// The array type of array of function pointers
    functionptrarr_t* b3 = nullptr;// 数组指针
    functionptrarr2_t* b4 = nullptr;// 数组指针
    b4 = b3 = b2 = b1;// OK
    std::cout << std::is_same<decltype(b1), decltype(b2)>::value << '\n';// 1
    std::cout << std::is_same<decltype(b1), decltype(b3)>::value << '\n';// 1
    std::cout << std::is_same<decltype(b1), decltype(b4)>::value << '\n';// 1
    int (* b5)[10] = nullptr;// The pointer of an array of 10 int pointers: b5
    typedef int intarr_t[10];// The array type of array of 10 int pointers
    intarr_t* b6 = nullptr;// 数组指针
    b6 = b5;// OK
    std::cout << std::is_same<decltype(b5), decltype(b6)>::value << '\n';// 1
    std::cout << std::is_array<decltype(b1)>::value << '\n';// 0

    // 返回函数指针的函数
    // foo2 是一个函数(声明), 该函数的参数是第一个 int 型, 第二个 char* 型,
    // 该函数返回值的是一个函数指针(对象),
    // 这个函数指针(指向的函数)的类型是 int 型返回值和一个 int 型参数.
    int (* foo2(int, char*))(int);
    // 即上面等价于:
    typedef int foo_t(int);
    foo_t* foo3(int, char*);
    std::cout << std::is_same<decltype(foo2), decltype(foo3)>::value << '\n';// 1

    return 0;
}