文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一篇带给你Sentinel 流控原理

2024-12-03 04:26

关注

我们在项目中添加 Spring Cloud Sentinel 依赖添加后 spring-cloud-starter-alibaba-sentinel 在 Spring-Boot 启动的过程中回去初始化 spring.factories 中的配置信息,如:SentinelWebAutoConfiguration 、SentinelAutoConfiguration 等配置文件来初始化

再讲代码之前我先声明一下我的版本号sentinel 1.8.0 。后续的所有内容均基于该版本进行

@ResoureSetinel 工作原理

配置流控规则我们最简单的方式就是通过 @ResoureSetinel 的方式来管理,该注解可以直接定义流控规则、降级规则。下面是一个简单的使用例子:

  1. @SentinelResource(value = "ResOrderGet"
  2.                   fallback = "fallback"
  3.                   fallbackClass = SentinelResourceExceptionHandler.class, 
  4.                   blockHandler = "blockHandler"
  5.                   blockHandlerClass = SentinelResourceExceptionHandler.class 
  6.                  ) 
  7. @GetMapping("/order/get/{id}"
  8. public CommonResult getStockDetails(@PathVariable Integer id) { 
  9.   StockModel stockModel = new StockModel(); 
  10.   stockModel.setCode("STOCK==>1000"); 
  11.   stockModel.setId(id); 
  12.   return CommonResult.success(stockModel); 

如果大家熟悉 Spring 相关的组件大家都可以想到,这里多半是通过Spring Aop. 的方式来拦截 getStockDetails 方法。我们先看看SentinelAutoConfiguration 配置文件,我们可以找到 SentinelResourceAspect Bean 的定义方法。

  1. @Bean 
  2. @ConditionalOnMissingBean 
  3. public SentinelResourceAspect sentinelResourceAspect() { 
  4.    return new SentinelResourceAspect(); 

让后我们再来看看 SentinelResourceAspect 具体是怎么处理的,源码如下:

  1. // 定义 Pointcut 
  2. @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)"
  3. public void sentinelResourceAnnotationPointcut() { 
  4. // Around 来对被标记 @SentinelResource 注解的方法进行处理 
  5. @Around("sentinelResourceAnnotationPointcut()"
  6. public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { 
  7.   Method originMethod = resolveMethod(pjp); 
  8.   // 获取注解信息 
  9.   SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); 
  10.   // 获取资源名称 
  11.   String resourceName = getResourceName(annotation.value(), originMethod); 
  12.   EntryType entryType = annotation.entryType(); 
  13.   int resourceType = annotation.resourceType(); 
  14.   Entry entry = null
  15.   try { 
  16.     // 执行 entry 
  17.     entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); 
  18.     // 执行业务方法 
  19.     Object result = pjp.proceed(); 
  20.     // 返回 
  21.     return result; 
  22.   } catch (BlockException ex) { 
  23.     // 处理 BlockException 
  24.     return handleBlockException(pjp, annotation, ex); 
  25.   } catch (Throwable ex) { 
  26.     Class[] exceptionsToIgnore = annotation.exceptionsToIgnore(); 
  27.     // The ignore list will be checked first
  28.     if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { 
  29.       throw ex; 
  30.     } 
  31.     if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { 
  32.       traceException(ex); 
  33.       // 处理降级 
  34.       return handleFallback(pjp, annotation, ex); 
  35.     } 
  36.     // No fallback function can handle the exception, so throw it out
  37.     throw ex; 
  38.   } 

我们总结一下, @SentinelResource 的执行过程, 首先是通过 Aop 进行拦截,然后通过 SphU.entry 执行对应的流控规则,最后调用业务方法。如果触发流控规则首先处理流控异常 BlockException 然后在判断是否有服务降级的处理,如果有就调用 fallback 方法。通过 handleBlockException 、handleFallback 进行处理。

责任链模式处理流控

通过上面的梳理,我们知道对于流控的过程,核心处理方法就是 SphU.entry 。在这个方法中其实主要就是初始化流控 Solt 和执行 Solt. 在这个过程中会对:簇点定义、流量控制、熔断降级、系统白名单等页面功能进行处理。

1. 初始化责任链

下面是初始化 Solt 的核心代码在 SphU.entryWithPriority

  1. // 删减部分代码 
  2. private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) 
  3.   throws BlockException { 
  4.   // 初始化责任链 
  5.   ProcessorSlot chain = lookProcessChain(resourceWrapper); 
  6.   Entry e = new CtEntry(resourceWrapper, chain, context); 
  7.   try { 
  8.     // 执行 entry 
  9.     chain.entry(context, resourceWrapper, nullcount, prioritized, args); 
  10.   } catch (BlockException e1) { 
  11.     e.exit(count, args); 
  12.     // 异常抛出,让 SentinelResourceAspect.invokeResourceWithSentinel 统一处理 
  13.     throw e1; 
  14.   } catch (Throwable e1) { 
  15.     // This should not happen, unless there are errors existing in Sentinel internal. 
  16.     RecordLog.info("Sentinel unexpected exception", e1); 
  17.   } 
  18.   return e; 
  19. 通过 lookProcessChain 方法我逐步的查找,我们可以看到最终的责任链初始化类,默认是 DefaultSlotChainBuilder

    1. public class DefaultSlotChainBuilder implements SlotChainBuilder { 
    2.     @Override 
    3.     public ProcessorSlotChain build() { 
    4.         ProcessorSlotChain chain = new DefaultProcessorSlotChain(); 
    5.         // Note: the instances of ProcessorSlot should be different, since they are not stateless. 
    6.         // 通过 SPI 去加载所有的  ProcessorSlot 实现,通过 Order 排序 
    7.         List sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); 
    8.         for (ProcessorSlot slot : sortedSlotList) { 
    9.             if (!(slot instanceof AbstractLinkedProcessorSlot)) { 
    10.                 RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); 
    11.                 continue
    12.             } 
    13.                       // 添加到 chain 尾部 
    14.             chain.addLast((AbstractLinkedProcessorSlot) slot); 
    15.         } 
    16.         return chain; 
    17.     } 

    2. 责任链的处理过程

    我们可以通过断点的方式来查看在 sortedSlotList 集合中所有的 solt 顺序如下图所示:

    我们可以通过如下的顺序进行逐个的简单的分析一下

    • NodeSelectorSolt
    • CusterBuilderSolt
    • LogSlot
    • StatisicSlot
    • AuthoritySolt
    • SystemSolts
    • ParamFlowSolt
    • FlowSolt
    • DegradeSlot

    对于 Sentinel 的 Slot 流控协作流程可以参考官方给出的文档, 如下图所示:

    FlowSolt 流控

    通过 NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的请求数据处理,在 FlowSolt会进入流控规则,所有的 Solt 都会执行 entry 方法, 如下所示

    1. // FlowSolt 的 entry 方法 
    2. @Override 
    3. public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count
    4.                   boolean prioritized, Object... args) throws Throwable { 
    5.   // 检查流量 
    6.   checkFlow(resourceWrapper, context, node, count, prioritized); 
    7.   fireEntry(context, resourceWrapper, node, count, prioritized, args); 

    在后续的流程中,会执进行判断具体的流控策略,默认是快速失败,会执行 DefaultController 方法。

    1. // DefaultController 
    2. @Override 
    3. public boolean canPass(Node node, int acquireCount, boolean prioritized) { 
    4.   // 当前资源的调用次数 
    5.   int curCount = avgUsedTokens(node); 
    6.   // 当前资源的调用次数 + 1 > 当前阈值 
    7.   if (curCount + acquireCount > count) { 
    8.     // 删减比分代码 
    9.     // 不通过 
    10.     return false
    11.   } 
    12.   // 通过 
    13.   return true
    14. private int avgUsedTokens(Node node) { 
    15.   if (node == null) { 
    16.     return DEFAULT_AVG_USED_TOKENS; 
    17.   } 
    18.   return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); 

    如果上面返回不通过会回到,那么会抛出 FlowException

    1. public void checkFlow(Function> ruleProvider, ResourceWrapper resource, 
    2.                       Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { 
    3.   if (ruleProvider == null || resource == null) { 
    4.     return
    5.   } 
    6.   Collection rules = ruleProvider.apply(resource.getName()); 
    7.   if (rules != null) { 
    8.     for (FlowRule rule : rules) { 
    9.       if (!canPassCheck(rule, context, node, count, prioritized)) { 
    10.         // 流控规则不通过,会抛出 FlowException 
    11.         throw new FlowException(rule.getLimitApp(), rule); 
    12.       } 
    13.     } 
    14.   } 

    然后会在 StatisticSlot 中增加统计信息, 最后会抛出给 SentinelResourceAspect 进行处理,完成流控功能。我们再来看看这个异常信息,如果是BlockException 异常,会进入 handleBlockException 方法处理, 如果是其他的业务异常首先会判断是否有配置 fallback 处理如果有,就调用 handleFallback 没有就继续往外抛,至此完成流控功能

    1. try { 
    2.   entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); 
    3.   Object result = pjp.proceed(); 
    4.   return result; 
    5. } catch (BlockException ex) { 
    6.   return handleBlockException(pjp, annotation, ex); 
    7. } catch (Throwable ex) { 
    8.   Class[] exceptionsToIgnore = annotation.exceptionsToIgnore(); 
    9.   // The ignore list will be checked first
    10.   if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { 
    11.     throw ex; 
    12.   } 
    13.   if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { 
    14.     traceException(ex); 
    15.     return handleFallback(pjp, annotation, ex); 
    16.   } 
    17.   // No fallback function can handle the exception, so throw it out
    18.   throw ex; 

    DegradeSlot 降级

    断路器的作用是当某些资源一直出现故障时,将触发断路器。断路器不会继续访问已经发生故障的资源,而是拦截请求并返回故障信号。

    Sentinel 在 DegradeSlot 这个 Slot 中实现了熔断降级的功能,它有三个状态 OPEN 、HALF_OPEN、CLOSED 以ResponseTimeCircuitBreaker RT 响应时间维度来分析, 断路器工作的过程。下面是一个标准断路器的工作流程:

    在 Sentinel 实现的源码过程如下图所示:

    Sentinel 通过 Web 拦截器

    Sentinel 在默认情况下, 不使用 @ResourceSentinel 注解实现流控的时候, Sentinel 通过拦截器进行流控实现的。初始化类在 SentinelWebAutoConfiguration 它实现了 WebMvcConfigurer 接口,在 sentinelWebInterceptor 方法初始化 SentinelWebInterceptor 等 Bean。

    1. @Bean 
    2. @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled"
    3.                        matchIfMissing = true
    4. public SentinelWebInterceptor sentinelWebInterceptor( 
    5.   SentinelWebMvcConfig sentinelWebMvcConfig) { 
    6.   return new SentinelWebInterceptor(sentinelWebMvcConfig); 

    我们在 SentinelWebInterceptor 的核心方法 preHandle 中处理,这里面我们又可以看到 SphU.entry 熟悉的过程调用流控的责任链。由于逻辑都类似,此处不再多说。代码如下:

    1. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) 
    2.   throws Exception { 
    3.   try { 
    4.     String resourceName = getResourceName(request); 
    5.     if (StringUtil.isEmpty(resourceName)) { 
    6.       return true
    7.     } 
    8.     if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) { 
    9.       return true
    10.     } 
    11.     // Parse the request origin using registered origin parser. 
    12.     String origin = parseOrigin(request); 
    13.     String contextName = getContextName(request); 
    14.     ContextUtil.enter(contextName, origin); 
    15.     Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN); 
    16.     request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry); 
    17.     return true
    18.   } catch (BlockException e) { 
    19.     try { 
    20.       handleBlockException(request, response, e); 
    21.     } finally { 
    22.       ContextUtil.exit(); 
    23.     } 
    24.     return false
    25.   } 

    参考文档

    https://github.com/alibaba/Sentinel/wiki

    https://martinfowler.com/bliki/CircuitBreaker.html

     

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    软考中级精品资料免费领

    • 历年真题答案解析
    • 备考技巧名师总结
    • 高频考点精准押题
    • 资料下载
    • 历年真题
    • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

      难度     813人已做
      查看
    • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

      难度     354人已做
      查看
    • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

      难度     318人已做
      查看
    • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

      难度     435人已做
      查看
    • 2024年上半年系统架构设计师考试综合知识真题

      难度     224人已做
      查看

    相关文章

    发现更多好内容

    猜你喜欢

    AI推送时光机
    咦!没有更多了?去看看其它编程学习网 内容吧