数据库(MySQL)和缓存(Redis)的数据一致性问题

数据库(MySQL)和缓存(Redis)的数据一致性问题

涉及到数据更新或者删除,就容易出现缓存和数据库间的数据一致性问题。

问题

数据一致性方式:
* 先删除缓存,再写数据库;
* 先写数据库,再删除缓存;
* 先更新缓存,再写数据库;
* 先写数据库,再更新缓存;

数据一致性问题:
1. 如果先删了缓存,还没有来得及写MySQL,另一个线程就来读,发现缓存空,则去数据库读取数据写入缓存,此时缓存中为脏数据.
2. 如果先写库,在删除缓存前,写库线程挂掉,没有删掉缓存.
3. …其他性能等问题

处理策略

  • 延时双删策略
  • 延时双删 + 重试
  • 延时双删 + binlog重试(canal
  • redis分布式读写锁(可以保证绝对一致性)
  • 短TTL + 双删
  • 后删除 + binlog重试

短ttl和双删

相同对象、整体,单进程数据库缓存,最终数据一致性最佳实践(无锁、简单、解决决定一致性,可以很好保证最终一致性):
短tll双删一致性.svg

  1. 临时缓存GET使用的短TTL,例如15秒(GET的时候发现有这个TTL,使用计数缓存,可以多个写,就使用这个,否则使用较长的) — 失败不写了
  2. 删除缓存 — 失败不写了
  3. 写数据库
  4. 删除GET使用的短TTL(临时缓存,务必,进程内可以保证)
  5. 再删除缓存,无需重试,可以失败(可选,最好有,也可不必)
示例

修改:

/**
 * 根据主键批量部分更新 (主键字段为一个字段,自动获取)
 *
 * @param <E>          实例类类型
 * @param objs         实体类对象集合,主键字段和要更新的字段是必须的
 * @param updateFields 要更新的字段,非null时不能为empty,如果要更新所有字段,这个字段应该是null
 * @param batchSize    每次更新的数量,null时不限制,一次更新,否则应该大于等于1
 * @param newCache     是否缓存,默认不缓存
 * @return 执行结果,更新数量,可能是字段的数量
 * @throws OperationException 失败时抛出异常
 * @since 2021-09-27 p2
 */
@Override
@Modifying(flushAutomatically = true, clearAutomatically = true)
@Transactional(rollbackFor = {Exception.class, RuntimeException.class})
public <E> int update(@NotNull Collection<E> objs,
                      @Nullable Collection<String> updateFields,
                      @Nullable Integer batchSize,
                      boolean newCache) throws OperationException {
    // 检查对象
    OperationAssert.notEmpty(objs, MSG_UPDATE_COLLECTION_CANNOT_EMPTY);
    E first = objs.iterator().next();
    OperationAssert.notNull(first, MSG_UPDATE_OBJ_CANNOT_NULL);

    // 主键字段
    Class<E> domainClass = (Class<E>) first.getClass();
    String idFieldName = DbUtils.getIdFieldName(domainClass);

    DbCache cache = null;
    List<String> ids = null;
    CacheConfiguration config = getConfiguration();
    boolean addedTtl = false;
    int result;
    try {
        if (config.isEnableDbCache() && (cache = getDbCache()) != null) {
            ids = ClassUtils.getFieldValuesNoexcept(objs, idFieldName, domainClass, String.class);
            // 1. 缓存短ttl
            addedTtl = addShortTtl(idFieldName, ids, cache, domainClass);
            // 2. 删除缓存 -- 失败不写了
            OperationAssert.expectTrue(cache.deleteByKeys(ids, domainClass) >= 0, "删除缓存失败");
        }

        // 3. 写数据库
        result = update(objs, Collections.singletonList(DbUtils.getDbIdFieldName(domainClass)), updateFields,
                batchSize, domainClass);
    } finally {
        // 4. 删除GET使用的短TTL
        removeShortTtl(idFieldName, ids, cache, addedTtl, domainClass);
    }

    if (!newCache) {
        // 5. 再移除缓存,忽略错误
        removeCacheAgainNoexcept(ids, cache, domainClass);
    } else {
        // 6. 需要时新增缓存,忽略错误
        setCache(idFieldName, objs, domainClass);
    }
    // 7. 需要时进行更新通知
    if (canNotifyModify(config, domainClass)) {
        mqNotify(objs, domainClass, DatasourceChangeNotify.CHANGE_TYPE_MODIFY);
    }
    return result;
}

查询:

/**
 * Retrieves an entity by its id.
 *
 * @param id must not be {@literal null}.
 * @return the entity with the given id or {@literal Optional#empty()} if none found.
 * @throws IllegalArgumentException if {@literal id} is {@literal null}.
 */
@NotNull
@Override
public Optional<E> findById(@NotNull I id) {
    CacheConfiguration config = sqlExecuteAdapter.getConfiguration();
    DbCache cache;
    if (config.isEnableDbCache() && ((cache = sqlExecuteAdapter.getDbCache()) != null)) {
        // 查询,支持缓存
        Class<E> domainClass = getDomainClass();
        DbCacheData<E> cacheData = cache.findByKey(id, domainClass, 1);
        if ((cacheData != null) && !CollectionUtils.isEmpty(cacheData.getData())
                && (cacheData.getData().iterator().next() != null)) {
            return Optional.of(cacheData.getData().iterator().next());
        } else {
            return Optional.empty();
        }
    }

    // 直接查询
    return super.findById(id);
}

@Nullable
private <T> DbCacheData<T> findByKey(@NotNull String field,
                                     @Nullable Object key,
                                     @Nullable Integer max,
                                     @NotNull String cacheKey,
                                     @NotNull Class<T> clazz) {
    CriteriaBuilder criteriaBuilder = sqlExecutor.getCriteriaBuilder();
    CriteriaQuery<T> query = SqlUtils.createCriteriaQuery(criteriaBuilder, clazz);
    Root<T> root = query.from(clazz);
    Predicate predicate;
    if (StringUtils.isNotBlank(field) && (key != null)) {
        predicate = SqlUtils.createAndInIfValid(criteriaBuilder.conjunction(), field, Collections.singleton(key),
                root, criteriaBuilder);
    } else {
        predicate = criteriaBuilder.conjunction();
    }
    query.where(predicate);
    List<T> objects = sqlExecutor.query(query, max);
    DbCacheData<T> newDbCacheData = new DbCacheData<>();
    newDbCacheData.setCacheTime(DateUtils.now());
    if (!CollectionUtils.isEmpty(objects)) {
        newDbCacheData.setData(objects);
        newDbCacheData.setStatus(DbCacheData.STATUS_OK);
    } else {
        newDbCacheData.setStatus(DbCacheData.STATUS_PLACEHOLDER);
    }

    try {
        CacheConfiguration config = getConfiguration();
        long ttl = useShortTtl(cacheKey) ? config.getShortDbCacheTtl() : config.getDbCacheTtl();
        RedisAccess<String, Object> access = getRedisAccess();
        if (!access.set(cacheKey, newDbCacheData, ttl)) {
            log.warn("ignored set error, key {}", cacheKey);
        }
    } catch (Exception e) {
        log.warn("ignored error {} {}", e.getMessage(), e.getStackTrace());
    }
    if (NumericUtils.equals(DbCacheData.STATUS_OK, newDbCacheData.getStatus())) {
        return newDbCacheData;
    } else {
        return null;
    }
}

参考

compiling ace (linux)

compiling ace (linux)

  • fast download (2020-04-05)

e.g.

# ACE-x.y.z.tar.bz2
axel -an 32 http://download.dre.vanderbilt.edu/previous_versions/ACE-6.2.1.tar.bz2

# or
axel -an 32 http://download.dre.vanderbilt.edu/previous_versions/ACE-6.4.0.tar.bz2
  • unpack and prepare
# unpack
cd /path/to/wrapper/repo/root
git checkout v6.2.1

# or:
tar -xf ACE-6.2.1.tar.bz2
cd ACE_wrappers/

# prepare
# set the environment variable ACE_ROOT to point to the directory where ace is
# unpacked, update your .bashrc accordingly e.g.:
export ACE_ROOT=$(pwd)

# Build a symlink to config.h as (config.h is in ACE_wrappers/ace):
cd $ACE_ROOT/ace

# and (which can be found in ACE_wrappers/include/makeinclude):
cd $ACE_ROOT/include/makeinclude
ln -s platform_linux.GNU platform_macros.GNU
  • compile
cd $ACE_ROOT/ace
make clean
make -j
  • install
cd $ACE_ROOT
sudo -s bash
export ACE_ROOT=$(pwd)

# If you wish to install ACE (using "make install"), set the installation
# prefix in platform_macros.GNU.
cd $ACE_ROOT/include/makeinclude

# INSTALL_PREFIX = /opt/lib/ace-6.2.1

cd $ACE_ROOT/ace
make install

# cleanup
make clean

sudo rm -rf *

exit

git checkout -- .
  • pack
cd /opt/lib/ace6.2.1

tar -cjf ../ace6.2.1-xenial-amd64.tar.bz2 .

refs

install pre-built debian package (16.04)

curl -s https://yuiwong.org/debrepo/setup/setup-ubuntu-deb.sh | sudo bash

sudo apt update
sudo apt install libace6.2.1-nonofficial-dev

Find missing number (in C++)

Find missing number

Like the missing number problem, but given a list of N integers and these integers are in the range of [X, Y].
There are no duplicates in list. Maybe one or more than one of the integers is missing in the list.
Write an efficient code to find the first missing integer

— use binary search method:

/**
 * find the missing number (the first one) in a sorted array (min to max).
 * @tparam T integral value type,
 * @param nums the sorted numbers to find missing number.
 * @param nNums number count of @a nums.
 * @param rangeMin to find the number is this range.
 * @param rangeMax to find the number is this range.
 * @param[out] result result when success.
 * @return true when success, false when params invalid.
 *
 * @note
 * - @a nums should NOT nil, valid and sorted.
 * - @a nNums should valid.
 * - rangeMin should less than rangeMax!
 */
template<typename T>
typename CPPBASE2_ENABLE_IF<CPPBASE2_IS_INTEGRAL<T>::value, bool>::type
searchFirstMissingNumber(
  const T* const nums,
  const size_t nNums,
  const T rangeMin,
  const T rangeMax,
  T& result) {
  if ((!nums) || (rangeMin >= rangeMax)) {
    return false;
  }
  const T n = (rangeMax - rangeMin) + 1;
  if (uint64_t(n) <= nNums) {
    return false;
  }
  if (nNums < 1) {
    result = rangeMin;
    return true;
  }

  size_t leftIdx = 0;
  size_t rightIdx = nNums - 1;
  T expectMin = rangeMin;
  //T expectMax = rangeMax;
  size_t midIdx;
  T expectMid;

  // e.g.1: [0, 9], {0, 1, 2, 3, 5, 6, 7, 8, 9}
  //     2: [0, 9], {0, 1, 2, 4, 5, 6, 7, 8, 9}
  //     3: [0, 9], {0, 1, 2, 3, 4, 6, 7, 8, 9}
  //     4: [0, 9], {7, 8}
  //     5: [0, 9], {9}
  //     6: [0, 9], {0}
  //     7: [0, 9], {0, 1, 2, 3, 4, 5, 7, 8, 9}
  //     8: [-3, 5], {-3, -2, -1, 2, 3, 4, 5}
  while (leftIdx <= rightIdx) {
    if (nums[leftIdx] > expectMin) {
      result = expectMin;
      return true;
    } else {
      midIdx = (leftIdx + rightIdx) / 2;
      expectMid = rangeMin + midIdx;
      const T& midNum = nums[midIdx];
      if ((midNum == expectMid)
        && ((midIdx >= rightIdx) || ((expectMid + 1) != nums[midIdx + 1]))) {
        result = expectMid + 1;
        return true;
      } else if (midNum > expectMid) {
        // search left
        rightIdx = midIdx - 1;
        //expectMax = midNum - 1;
        continue;
      }
      // search right
      leftIdx = midIdx + 1;
      expectMin = midNum + 1;
    }
  }

  throw std::logic_error("bug!");
  return false;
}

some test code:

TEST(m, large) {
  const int N = 1024 * 1024 * 100;
  std::vector<int> a;
  a.reserve(N);
  SteadyTime t1, t2;

  const Duration maxET = Duration::milliseconds(1);
  Duration sum1, sum2;

  for (int k = 0; k < 100; ++k) {
    int missingNumber = kRg->get32() % a.capacity();
    int missingNumber2 = kRg->get32() % a.capacity();
    if (missingNumber > missingNumber2) {
      std::swap(missingNumber, missingNumber2);
    }
    int offset = kRg->get32() % a.capacity();
    if (k % 2) {
      offset *= -1;
    }
    a.clear();
    for (int i = 0; i < N; ++i) {
      if (i != missingNumber && i != missingNumber2) {
        a.push_back(offset + i);
      }
    }
    int result = -1;
    t1 = SteadyTime::now();
    EXPECT_TRUE(algorithm::searchFirstMissingNumberSlow(
      a.data(), a.size(), offset, offset + (N - 1), result));
    t2 = SteadyTime::now();
    sum1 += (t2 - t1);

    EXPECT_EQ(offset + missingNumber, result);
    EXPECT_LE((t2 - t1), Duration::milliseconds(100));

    CPPBASE2_INFO("searchFirstMissingNumberSlow"
           " size " << a.size()
      << ", elapsed time " << (t2 - t1).toNanosec() << " ns, "
      << ", result = " << (offset + missingNumber));

    result = -1;
    t1 = SteadyTime::now();
    EXPECT_TRUE(algorithm::searchFirstMissingNumber(
      a.data(), a.size(), offset, offset + (N - 1), result));
    t2 = SteadyTime::now();
    sum2 += (t2 - t1);

    EXPECT_EQ(offset + missingNumber, result);
    EXPECT_LE((t2 - t1), maxET);

    CPPBASE2_INFO("searchFirstMissingNumber"
           " size " << a.size()
      << ", elapsed time " << (t2 - t1).toNanosec() << " ns, "
      << ", result = " << (offset + missingNumber));
  }

  EXPECT_LE((sum1.toNanosec() / 100), Duration::milliseconds(20).toNanosec());
  EXPECT_GE((sum1.toNanosec() / 100), Duration::milliseconds(5).toNanosec());
  EXPECT_LE((sum2.toNanosec() / 100), Duration::microseconds(10).toNanosec());
}