数据库(MySQL)和缓存(Redis)的数据一致性问题

数据库(MySQL)和缓存(Redis)的数据一致性问题

涉及到数据更新或者删除,就容易出现缓存和数据库间的数据一致性问题。

问题

数据一致性方式:
* 先删除缓存,再写数据库;
* 先写数据库,再删除缓存;
* 先更新缓存,再写数据库;
* 先写数据库,再更新缓存;

数据一致性问题:
1. 如果先删了缓存,还没有来得及写MySQL,另一个线程就来读,发现缓存空,则去数据库读取数据写入缓存,此时缓存中为脏数据.
2. 如果先写库,在删除缓存前,写库线程挂掉,没有删掉缓存.
3. …其他性能等问题

处理策略

  • 延时双删策略
  • 延时双删 + 重试
  • 延时双删 + binlog重试(canal
  • redis分布式读写锁(可以保证绝对一致性)
  • 短TTL + 双删
  • 后删除 + binlog重试

短ttl和双删

相同对象、整体,单进程数据库缓存,最终数据一致性最佳实践(无锁、简单、解决决定一致性,可以很好保证最终一致性):
短tll双删一致性.svg

  1. 临时缓存GET使用的短TTL,例如15秒(GET的时候发现有这个TTL,使用计数缓存,可以多个写,就使用这个,否则使用较长的) — 失败不写了
  2. 删除缓存 — 失败不写了
  3. 写数据库
  4. 删除GET使用的短TTL(临时缓存,务必,进程内可以保证)
  5. 再删除缓存,无需重试,可以失败(可选,最好有,也可不必)
示例

修改:

/**
 * 根据主键批量部分更新 (主键字段为一个字段,自动获取)
 *
 * @param <E>          实例类类型
 * @param objs         实体类对象集合,主键字段和要更新的字段是必须的
 * @param updateFields 要更新的字段,非null时不能为empty,如果要更新所有字段,这个字段应该是null
 * @param batchSize    每次更新的数量,null时不限制,一次更新,否则应该大于等于1
 * @param newCache     是否缓存,默认不缓存
 * @return 执行结果,更新数量,可能是字段的数量
 * @throws OperationException 失败时抛出异常
 * @since 2021-09-27 p2
 */
@Override
@Modifying(flushAutomatically = true, clearAutomatically = true)
@Transactional(rollbackFor = {Exception.class, RuntimeException.class})
public <E> int update(@NotNull Collection<E> objs,
                      @Nullable Collection<String> updateFields,
                      @Nullable Integer batchSize,
                      boolean newCache) throws OperationException {
    // 检查对象
    OperationAssert.notEmpty(objs, MSG_UPDATE_COLLECTION_CANNOT_EMPTY);
    E first = objs.iterator().next();
    OperationAssert.notNull(first, MSG_UPDATE_OBJ_CANNOT_NULL);

    // 主键字段
    Class<E> domainClass = (Class<E>) first.getClass();
    String idFieldName = DbUtils.getIdFieldName(domainClass);

    DbCache cache = null;
    List<String> ids = null;
    CacheConfiguration config = getConfiguration();
    boolean addedTtl = false;
    int result;
    try {
        if (config.isEnableDbCache() && (cache = getDbCache()) != null) {
            ids = ClassUtils.getFieldValuesNoexcept(objs, idFieldName, domainClass, String.class);
            // 1. 缓存短ttl
            addedTtl = addShortTtl(idFieldName, ids, cache, domainClass);
            // 2. 删除缓存 -- 失败不写了
            OperationAssert.expectTrue(cache.deleteByKeys(ids, domainClass) >= 0, "删除缓存失败");
        }

        // 3. 写数据库
        result = update(objs, Collections.singletonList(DbUtils.getDbIdFieldName(domainClass)), updateFields,
                batchSize, domainClass);
    } finally {
        // 4. 删除GET使用的短TTL
        removeShortTtl(idFieldName, ids, cache, addedTtl, domainClass);
    }

    if (!newCache) {
        // 5. 再移除缓存,忽略错误
        removeCacheAgainNoexcept(ids, cache, domainClass);
    } else {
        // 6. 需要时新增缓存,忽略错误
        setCache(idFieldName, objs, domainClass);
    }
    // 7. 需要时进行更新通知
    if (canNotifyModify(config, domainClass)) {
        mqNotify(objs, domainClass, DatasourceChangeNotify.CHANGE_TYPE_MODIFY);
    }
    return result;
}

查询:

/**
 * Retrieves an entity by its id.
 *
 * @param id must not be {@literal null}.
 * @return the entity with the given id or {@literal Optional#empty()} if none found.
 * @throws IllegalArgumentException if {@literal id} is {@literal null}.
 */
@NotNull
@Override
public Optional<E> findById(@NotNull I id) {
    CacheConfiguration config = sqlExecuteAdapter.getConfiguration();
    DbCache cache;
    if (config.isEnableDbCache() && ((cache = sqlExecuteAdapter.getDbCache()) != null)) {
        // 查询,支持缓存
        Class<E> domainClass = getDomainClass();
        DbCacheData<E> cacheData = cache.findByKey(id, domainClass, 1);
        if ((cacheData != null) && !CollectionUtils.isEmpty(cacheData.getData())
                && (cacheData.getData().iterator().next() != null)) {
            return Optional.of(cacheData.getData().iterator().next());
        } else {
            return Optional.empty();
        }
    }

    // 直接查询
    return super.findById(id);
}

@Nullable
private <T> DbCacheData<T> findByKey(@NotNull String field,
                                     @Nullable Object key,
                                     @Nullable Integer max,
                                     @NotNull String cacheKey,
                                     @NotNull Class<T> clazz) {
    CriteriaBuilder criteriaBuilder = sqlExecutor.getCriteriaBuilder();
    CriteriaQuery<T> query = SqlUtils.createCriteriaQuery(criteriaBuilder, clazz);
    Root<T> root = query.from(clazz);
    Predicate predicate;
    if (StringUtils.isNotBlank(field) && (key != null)) {
        predicate = SqlUtils.createAndInIfValid(criteriaBuilder.conjunction(), field, Collections.singleton(key),
                root, criteriaBuilder);
    } else {
        predicate = criteriaBuilder.conjunction();
    }
    query.where(predicate);
    List<T> objects = sqlExecutor.query(query, max);
    DbCacheData<T> newDbCacheData = new DbCacheData<>();
    newDbCacheData.setCacheTime(DateUtils.now());
    if (!CollectionUtils.isEmpty(objects)) {
        newDbCacheData.setData(objects);
        newDbCacheData.setStatus(DbCacheData.STATUS_OK);
    } else {
        newDbCacheData.setStatus(DbCacheData.STATUS_PLACEHOLDER);
    }

    try {
        CacheConfiguration config = getConfiguration();
        long ttl = useShortTtl(cacheKey) ? config.getShortDbCacheTtl() : config.getDbCacheTtl();
        RedisAccess<String, Object> access = getRedisAccess();
        if (!access.set(cacheKey, newDbCacheData, ttl)) {
            log.warn("ignored set error, key {}", cacheKey);
        }
    } catch (Exception e) {
        log.warn("ignored error {} {}", e.getMessage(), e.getStackTrace());
    }
    if (NumericUtils.equals(DbCacheData.STATUS_OK, newDbCacheData.getStatus())) {
        return newDbCacheData;
    } else {
        return null;
    }
}

参考