背景
最近产品下发一个需求:考虑在程序中加缓存,刚开始以为只是 Redis 缓存,后面才直到是本地缓存(Caffeine) + Redis。
在 SpringBoot2.x 后默认的缓存就是 Caffeine,所以本地缓存也选择了 Caffeine。
ps:我们的数据不是从程序中插入或者更新,是每天会有数据专门同步。
问题
基于提出的需求,我认为主要有以下两个问题:
- 因为有本地缓存,如何保证数据一致性。当一个节点数据改变,其他节点的数据如何失效?
- 数据不对,需要重新同步,缓存如何失效?
流程图
接下来就是配合产品和其他开发人员画出流程图,如下:
- 使用一张配置表,记录是否需要缓存,是否开启缓存,来达到通知时候缓存失效的情况。
- 因为项目要求一般,即使消息丢失,也不会存在太大的影响,所以最终选择了 redis 里面的订阅、发布功能,实现通知其他节点失效本地缓存。
开发
上面问题清楚了,流程图也清楚了。那就准备开始写 bug 了。整体思路是自定义注解实现切面,尽量降低对业务代码的耦合度。
CacheConfig
主要是结合业务定义一个 CacheManager,代码里面的解释都有。因为这个是直接占用程序内存的,所有得特别注意最大可缓存条数,别把内存肝爆了。当然也不能太小了,因为还要考虑命中率的问题。所以这就得结合实际得业务来确定最终的大小。
@Bean(name = JKDDCX)
@Primary
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
// 设置最后一次写入或访问后经过固定时间过期
.expireAfterAccess(EXPIRE, TIME_UNIT)
//设置本地缓存写入后过期时间
.expireAfterWrite(EXPIRE, TIME_UNIT)
// 初始的缓存空间大小
.initialCapacity(500)
// 缓存的最大条数
.maximumSize(1000));// 使用人数 * 5 (每个人不同的入参 5 条)\
return cacheManager;
}
@CaffeineCache
自定义注解,把可以用到的参数都能加上。
@Target({ ElementType.METHOD ,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CaffeineCache {
public String moudleId() default "";
//用于在数据库中配置参数
public String methodId() default "";
public String cachaName() default "";
//动态切换实际的 CacheManager
public String cacheManager() default "";
}
CacheMessageListener
缓存监听器,主要是保证多节点数据一致性的问题。当一个节点缓存更新,通知其他的节点相应处理。主要技术是 Redis 的发布、订阅功能,实现 MessageListener 接口。
当然下面还有个细节就是一般生产环境是禁用 Redis#keys 命令的,所以得换个方式扫描对应的 key。
public class CacheMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
CacheMessage cacheMessage = (CacheMessage) redisTemplate.getValueSerializer().deserialize(message.getBody());
logger.info("收到redis清除缓存消息, 开始清除本地缓存, the cacheName is {}, the key is {}", cacheMessage.getCacheName(), cacheMessage.getKey());
// redisCaffeineCacheManager.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey());
String prefixKey = RedisConstant.WXYMG_DATA_CACHE + cacheMessage.getCacheName();
Set<String> keys = redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
Set<String> keysTmp = new HashSet<>();
Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().
match(prefixKey + "*").
count(50).build());
while (cursor.hasNext()) {
keysTmp.add(new String(cursor.next()));
}
return keysTmp;
});
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
if (iterator.next().toString().equals(cacheMessage.getKey())) {
iterator.remove();
}
}
redisTemplate.delete(keys);
cacheConfig.cacheManager().getCache(cacheMessage.getCacheName()).clear(); //cacheName 下的都删除
}
}
CaffeineCacheAspect
然后就是切面的逻辑处理,里面的内容和 流程图 一模一样,只是使用代码实现了需求。
其中:下面的代码是 Redis 发布消息。
redisTemplate.convertAndSend(CacheConfig.TOPIC, new CacheMessage(caffeineCache.cachaName(), redisKey));
CacheMessage
这是在 Redis 发布消息的时候一个消息体,也是自定义的,可以加更多的参数属性
public class CacheMessage implements Serializable {
private static final long serialVersionUID = -1L;
private String cacheName;
private Object key;
public CacheMessage(String cacheName, Object key) {
super();
this.cacheName = cacheName;
this.key = key;
}
}
总结
- Redis 天然适合分布式缓存,但是本地缓存还得考虑数据一致性的问题,这里使用的是 Redis 的发布、订阅功能
- Caffeine 的简单学习了解使用
- 结合自定义注解,使用低耦合的二级缓存
到此这篇关于caffeine_redis自定义二级缓存的文章就介绍到这了,更多相关caffeine_redis二级缓存内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!