数据库(MySQL)和缓存(Redis)的数据一致性问题
涉及到数据更新或者删除,就容易出现缓存和数据库间的数据一致性问题。
问题
数据一致性方式:
* 先删除缓存,再写数据库;
* 先写数据库,再删除缓存;
* 先更新缓存,再写数据库;
* 先写数据库,再更新缓存;
数据一致性问题:
1. 如果先删了缓存,还没有来得及写MySQL,另一个线程就来读,发现缓存空,则去数据库读取数据写入缓存,此时缓存中为脏数据.
2. 如果先写库,在删除缓存前,写库线程挂掉,没有删掉缓存.
3. …其他性能等问题
处理策略
短ttl和双删
相同对象、整体,单进程数据库缓存,最终数据一致性最佳实践(无锁、简单、解决决定一致性,可以很好保证最终一致性):
- 临时缓存GET使用的短TTL,例如15秒(GET的时候发现有这个TTL,使用计数缓存,可以多个写,就使用这个,否则使用较长的) — 失败不写了
- 删除缓存 — 失败不写了
- 写数据库
- 删除GET使用的短TTL(临时缓存,务必,进程内可以保证)
- 再删除缓存,无需重试,可以失败(可选,最好有,也可不必)
示例
修改:
/**
* 根据主键批量部分更新 (主键字段为一个字段,自动获取)
*
* @param <E> 实例类类型
* @param objs 实体类对象集合,主键字段和要更新的字段是必须的
* @param updateFields 要更新的字段,非null时不能为empty,如果要更新所有字段,这个字段应该是null
* @param batchSize 每次更新的数量,null时不限制,一次更新,否则应该大于等于1
* @param newCache 是否缓存,默认不缓存
* @return 执行结果,更新数量,可能是字段的数量
* @throws OperationException 失败时抛出异常
* @since 2021-09-27 p2
*/
@Override
@Modifying(flushAutomatically = true, clearAutomatically = true)
@Transactional(rollbackFor = {Exception.class, RuntimeException.class})
public <E> int update(@NotNull Collection<E> objs,
@Nullable Collection<String> updateFields,
@Nullable Integer batchSize,
boolean newCache) throws OperationException {
// 检查对象
OperationAssert.notEmpty(objs, MSG_UPDATE_COLLECTION_CANNOT_EMPTY);
E first = objs.iterator().next();
OperationAssert.notNull(first, MSG_UPDATE_OBJ_CANNOT_NULL);
// 主键字段
Class<E> domainClass = (Class<E>) first.getClass();
String idFieldName = DbUtils.getIdFieldName(domainClass);
DbCache cache = null;
List<String> ids = null;
CacheConfiguration config = getConfiguration();
boolean addedTtl = false;
int result;
try {
if (config.isEnableDbCache() && (cache = getDbCache()) != null) {
ids = ClassUtils.getFieldValuesNoexcept(objs, idFieldName, domainClass, String.class);
// 1. 缓存短ttl
addedTtl = addShortTtl(idFieldName, ids, cache, domainClass);
// 2. 删除缓存 -- 失败不写了
OperationAssert.expectTrue(cache.deleteByKeys(ids, domainClass) >= 0, "删除缓存失败");
}
// 3. 写数据库
result = update(objs, Collections.singletonList(DbUtils.getDbIdFieldName(domainClass)), updateFields,
batchSize, domainClass);
} finally {
// 4. 删除GET使用的短TTL
removeShortTtl(idFieldName, ids, cache, addedTtl, domainClass);
}
if (!newCache) {
// 5. 再移除缓存,忽略错误
removeCacheAgainNoexcept(ids, cache, domainClass);
} else {
// 6. 需要时新增缓存,忽略错误
setCache(idFieldName, objs, domainClass);
}
// 7. 需要时进行更新通知
if (canNotifyModify(config, domainClass)) {
mqNotify(objs, domainClass, DatasourceChangeNotify.CHANGE_TYPE_MODIFY);
}
return result;
}
查询:
/**
* Retrieves an entity by its id.
*
* @param id must not be {@literal null}.
* @return the entity with the given id or {@literal Optional#empty()} if none found.
* @throws IllegalArgumentException if {@literal id} is {@literal null}.
*/
@NotNull
@Override
public Optional<E> findById(@NotNull I id) {
CacheConfiguration config = sqlExecuteAdapter.getConfiguration();
DbCache cache;
if (config.isEnableDbCache() && ((cache = sqlExecuteAdapter.getDbCache()) != null)) {
// 查询,支持缓存
Class<E> domainClass = getDomainClass();
DbCacheData<E> cacheData = cache.findByKey(id, domainClass, 1);
if ((cacheData != null) && !CollectionUtils.isEmpty(cacheData.getData())
&& (cacheData.getData().iterator().next() != null)) {
return Optional.of(cacheData.getData().iterator().next());
} else {
return Optional.empty();
}
}
// 直接查询
return super.findById(id);
}
@Nullable
private <T> DbCacheData<T> findByKey(@NotNull String field,
@Nullable Object key,
@Nullable Integer max,
@NotNull String cacheKey,
@NotNull Class<T> clazz) {
CriteriaBuilder criteriaBuilder = sqlExecutor.getCriteriaBuilder();
CriteriaQuery<T> query = SqlUtils.createCriteriaQuery(criteriaBuilder, clazz);
Root<T> root = query.from(clazz);
Predicate predicate;
if (StringUtils.isNotBlank(field) && (key != null)) {
predicate = SqlUtils.createAndInIfValid(criteriaBuilder.conjunction(), field, Collections.singleton(key),
root, criteriaBuilder);
} else {
predicate = criteriaBuilder.conjunction();
}
query.where(predicate);
List<T> objects = sqlExecutor.query(query, max);
DbCacheData<T> newDbCacheData = new DbCacheData<>();
newDbCacheData.setCacheTime(DateUtils.now());
if (!CollectionUtils.isEmpty(objects)) {
newDbCacheData.setData(objects);
newDbCacheData.setStatus(DbCacheData.STATUS_OK);
} else {
newDbCacheData.setStatus(DbCacheData.STATUS_PLACEHOLDER);
}
try {
CacheConfiguration config = getConfiguration();
long ttl = useShortTtl(cacheKey) ? config.getShortDbCacheTtl() : config.getDbCacheTtl();
RedisAccess<String, Object> access = getRedisAccess();
if (!access.set(cacheKey, newDbCacheData, ttl)) {
log.warn("ignored set error, key {}", cacheKey);
}
} catch (Exception e) {
log.warn("ignored error {} {}", e.getMessage(), e.getStackTrace());
}
if (NumericUtils.equals(DbCacheData.STATUS_OK, newDbCacheData.getStatus())) {
return newDbCacheData;
} else {
return null;
}
}