数据库与缓存读写模式策略
写完数据库后是否需要马上更新缓存还是直接删除缓存?
(1)、如果写数据库的值与更新到缓存值是一样的,不需要经过任何的计算,可以马上更新缓存,但是如果对于那种写数据频繁而读数据少的场景并不合适这种解决方案,因为也许还没有查询就被删除或修改了,这样会浪费时间和资源
(2)、如果写数据库的值与更新缓存的值不一致,写入缓存中的数据需要经过几个表的关联计算后得到的结果插入缓存中,那就没有必要马上更新缓存,只有删除缓存即可,等到查询的时候在去把计算后得到的结果插入到缓存中即可。
所以一般的策略是当更新数据时,先删除缓存数据,然后更新数据库,而不是更新缓存,等要查询的时候才把最新的数据更新到缓存
数据库与缓存双写情况下导致数据不一致问题
场景一
当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后删除缓存,发现删除缓存失败了,这意味着数据库存的是99,而缓存是100,这导致数据库和缓存不一致。
场景一解决方案
这种情况应该是先删除缓存,然后在更新数据库,如果删除缓存失败,那就不要更新数据库,如果说删除缓存成功,而更新数据库失败,那查询的时候只是从数据库里查了旧的数据而已,这样就能保持数据库与缓存的一致性。
场景二
在高并发的情况下,如果当删除完缓存的时候,这时去更新数据库,但还没有更新完,另外一个请求来查询数据,发现缓存里没有,就去数据库里查,还是以上面商品库存为例,如果数据库中产品的库存是100,那么查询到的库存是100,然后插入缓存,插入完缓存后,原来那个更新数据库的线程把数据库更新为了99,导致数据库与缓存不一致的情况
场景二解决方案
遇到这种情况,可以用队列的去解决这个问,创建几个队列,如20个,根据商品的ID去做hash值,然后对队列个数取摸,当有数据更新请求时,先把它丢到队列里去,当更新完后在从队列里去除,如果在更新的过程中,遇到以上场景,先去缓存里看下有没有数据,如果没有,可以先去队列里看是否有相同商品ID在做更新,如果有也把查询的请求发送到队列里去,然后同步等待缓存更新完成。
这里有一个优化点,如果发现队列里有一个查询请求了,那么就不要放新的查询操作进去了,用一个while(true)循环去查询缓存,循环个200MS左右,如果缓存里还没有则直接取数据库的旧数据,一般情况下是可以取到的。
在高并发下解决场景二要注意的问题
(1)读请求时长阻塞
由于读请求进行了非常轻度的异步化,所以一定要注意读超时的问题,每个读请求必须在超时间内返回,该解决方案最大的风险在于可能数据更新很频繁,导致队列中挤压了大量的更新操作在里面,然后读请求会发生大量的超时,最后导致大量的请求直接走数据库,像遇到这种情况,一般要做好足够的压力测试,如果压力过大,需要根据实际情况添加机器。
(2)请求并发量过高
这里还是要做好压力测试,多模拟真实场景,并发量在最高的时候QPS多少,扛不住就要多加机器,还有就是做好读写比例是多少
(3)多服务实例部署的请求路由
可能这个服务部署了多个实例,那么必须保证说,执行数据更新操作,以及执行缓存更新操作的请求,都通过nginx服务器路由到相同的服务实例上
(4)热点商品的路由问题,导致请求的倾斜
某些商品的读请求特别高,全部打到了相同的机器的相同丢列里了,可能造成某台服务器压力过大,因为只有在商品数据更新的时候才会清空缓存,然后才会导致读写并发,所以更新频率不是太高的话,这个问题的影响并不是很大,但是确实有可能某些服务器的负载会高一些。
数据库与缓存数据一致性解决方案流程图
数据库与缓存数据一致性解决方案对应代码
商品库存实体
package com.shux.inventory.entity;
public class InventoryProduct {
private Integer productId;
private Long InventoryCnt;
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public Long getInventoryCnt() {
return InventoryCnt;
}
public void setInventoryCnt(Long inventoryCnt) {
InventoryCnt = inventoryCnt;
}
}
请求接口
public interface Request {
public void process();
public Integer getProductId();
public boolean isForceFefresh();
}
数据更新请求
package com.shux.inventory.request;
import org.springframework.transaction.annotation.Transactional;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
public class InventoryUpdateDBRequest implements Request{
private InventoryProductBiz inventoryProductBiz;
private InventoryProduct inventoryProduct;
public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){
this.inventoryProduct = inventoryProduct;
this.inventoryProductBiz = inventoryProductBiz;
}
@Override
@Transactional
public void process() {
inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId());
inventoryProductBiz.updateInventoryProduct(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return inventoryProduct.getProductId();
}
@Override
public boolean isForceFefresh() {
// TODO Auto-generated method stub
return false;
}
}
查询请求
package com.shux.inventory.request;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
public class InventoryQueryCacheRequest implements Request {
private InventoryProductBiz inventoryProductBiz;
private Integer productId;
private boolean isForceFefresh;
public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) {
this.productId = productId;
this.inventoryProductBiz = inventoryProductBiz;
this.isForceFefresh = isForceFefresh;
}
@Override
public void process() {
InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
inventoryProductBiz.setInventoryProductCache(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return productId;
}
public boolean isForceFefresh() {
return isForceFefresh;
}
public void setForceFefresh(boolean isForceFefresh) {
this.isForceFefresh = isForceFefresh;
}
}
spring启动时初始化队列线程池
package com.shux.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
import com.shux.utils.other.SysConfigUtil;
public class RequestProcessorThreadPool {
private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString());
private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString());
private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum);
private RequestProcessorThreadPool(){
for(int i=0;i<blockingQueueNum;i++){//初始化队列
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);//每个队列中放100条数据
RequestQueue.getInstance().addQueue(queue);
threadPool.submit(new RequestProcessorThread(queue));//把每个queue交个线程去处理,线程会处理每个queue中的数据
}
}
public static class Singleton{
private static RequestProcessorThreadPool instance;
static{
instance = new RequestProcessorThreadPool();
}
public static RequestProcessorThreadPool getInstance(){
return instance;
}
}
public static RequestProcessorThreadPool getInstance(){
return Singleton.getInstance();
}
public static void init(){
getInstance();
}
}
请求处理线程
package com.shux.inventory.thread;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
public class RequestProcessorThread implements Callable<Boolean>{
private ArrayBlockingQueue<Request> queue;
public RequestProcessorThread(ArrayBlockingQueue<Request> queue){
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
Request request = queue.take();
Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap();
//不需要强制刷新的时候,查询请求去重处理
if (!request.isForceFefresh()){
if (request instanceof InventoryUpdateDBRequest) {//如果是更新请求,那就置为false
flagMap.put(request.getProductId(), true);
} else {
Boolean flag = flagMap.get(request.getProductId());
if ( flag == null) {
flagMap.put(request.getProductId(), false);
}
if ( flag != null && flag) {
flagMap.put(request.getProductId(), false);
}
if (flag != null && !flag) {
flagMap.put(request.getProductId(), false);
return true;
}
}
}
request.process();
return true;
}
}
请求队列
package com.shux.inventory.request;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
public class RequestQueue {
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();
private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>();
private RequestQueue(){
}
private static class Singleton{
private static RequestQueue queue;
static{
queue = new RequestQueue();
}
public static RequestQueue getInstance() {
return queue;
}
}
public static RequestQueue getInstance(){
return Singleton.getInstance();
}
public void addQueue(ArrayBlockingQueue<Request> queue) {
queues.add(queue);
}
public int getQueueSize(){
return queues.size();
}
public ArrayBlockingQueue<Request> getQueueByIndex(int index) {
return queues.get(index);
}
public Map<Integer,Boolean> getFlagMap() {
return this.flagMap;
}
}
spring 启动初始化线程池类
package com.shux.inventory.listener;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import com.shux.inventory.thread.RequestProcessorThreadPool;
public class InitListener implements ApplicationListener<ContextRefreshedEvent>{
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// TODO Auto-generated method stub
if(event.getApplicationContext().getParent() != null){
return;
}
RequestProcessorThreadPool.init();
}
}
异步处理请求接口
package com.shux.inventory.biz;
import com.shux.inventory.request.Request;
public interface IRequestAsyncProcessBiz {
void process(Request request);
}
异步处理请求接口实现
package com.shux.inventory.biz.impl;
import java.util.concurrent.ArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
@Service("requestAsyncProcessService")
public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(Request request) {
// 做请求的路由,根据productId路由到对应的队列
ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId());
try {
queue.put(request);
} catch (InterruptedException e) {
logger.error("产品ID{}加入队列失败",request.getProductId(),e);
}
}
private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) {
RequestQueue requestQueue = RequestQueue.getInstance();
String key = String.valueOf(productId);
int hashcode;
int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16);
//对hashcode取摸
int index = (requestQueue.getQueueSize()-1) & hash;
return requestQueue.getQueueByIndex(index);
}
}
package com.shux.inventory.biz.impl;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.mapper.InventoryProductMapper;
import com.shux.redis.biz.IRedisBiz;
@Service("inventoryProductBiz")
public class InventoryProductBizImpl implements InventoryProductBiz {
private @Autowired IRedisBiz<InventoryProduct> redisBiz;
private @Resource InventoryProductMapper mapper;
@Override
public void updateInventoryProduct(InventoryProduct inventoryProduct) {
// TODO Auto-generated method stub
mapper.updateInventoryProduct(inventoryProduct);
}
@Override
public InventoryProduct loadInventoryProductByProductId(Integer productId) {
// TODO Auto-generated method stub
return mapper.loadInventoryProductByProductId(productId);
}
@Override
public void setInventoryProductCache(InventoryProduct inventoryProduct) {
redisBiz.set("inventoryProduct:"+inventoryProduct.getProductId(), inventoryProduct);
}
@Override
public void removeInventoryProductCache(Integer productId) {
redisBiz.delete("inventoryProduct:"+productId);
}
@Override
public InventoryProduct loadInventoryProductCache(Integer productId) {
// TODO Auto-generated method stub
return redisBiz.get("inventoryProduct:"+productId);
}
}
数据更新请求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.utils.other.Response;
@Controller("/inventory")
public class InventoryUpdateDBController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/updateDBInventoryProduct")
@ResponseBody
public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){
Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz);
requestAsyncProcessBiz.process(request);
return new Response(Response.SUCCESS,"更新成功");
}
}
数据查询请求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryQueryCacheRequest;
import com.shux.inventory.request.Request;
@Controller("/inventory")
public class InventoryQueryCacheController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/queryInventoryProduct")
public InventoryProduct queryInventoryProduct(Integer productId) {
Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false);
requestAsyncProcessBiz.process(request);//加入到队列中
long startTime = System.currentTimeMillis();
long allTime = 0L;
long endTime = 0L;
InventoryProduct inventoryProduct = null;
while (true) {
if (allTime > 200){//如果超过了200ms,那就直接退出,然后从数据库中查询
break;
}
try {
inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId);
if (inventoryProduct != null) {
return inventoryProduct;
} else {
Thread.sleep(20);//如果查询不到就等20毫秒
}
endTime = System.currentTimeMillis();
allTime = endTime - startTime;
} catch (Exception e) {
}
}
inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
if (inventoryProduct != null) {
Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true);
requestAsyncProcessBiz.process(forcRrequest);//这个时候需要强制刷新数据库,使缓存中有数据
return inventoryProduct;
}
return null;
}
}
到此这篇关于详解redis缓存与数据库一致性问题解决的文章就介绍到这了,更多相关redis缓存与数据库一致性内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!