文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Prometheus告警规则管理

2024-12-14 01:15

关注

今天主要带来告警规则的分析。Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件,Prometheus后端对这些触发规则进行周期性计算,当满足触发条件后则会触发告警通知。

什么是告警Rule

告警是prometheus的一个重要功能,接下来从源码的角度来分析下告警的执行流程。

怎么定义告警Rule

一条典型的告警规则如下所示:

  1. groups: 
  2. name: example 
  3.   rules: 
  4.   - alert: HighErrorRate 
  5.     #指标需要在触发告警之前的10分钟内大于0.5。 
  6.     expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5 
  7.     for: 10m 
  8.     labels: 
  9.       severity: page 
  10.     annotations: 
  11.       summary: High request latency 
  12.       description: description info 

在告警规则文件中,我们可以将一组相关的规则设置定义在一个group下。在每一个group中我们可以定义多个告警规则(rule)。一条告警规则主要由以下几部分组成:

Rule管理器

规则管理器会根据配置的规则,基于规则PromQL表达式告警的触发条件,用于计算是否有时间序列满足该条件。在满足该条件时,将告警信息发送给告警服务。

  1. type Manager struct { 
  2.  opts     *ManagerOptions //外部的依赖 
  3.  groups   map[string]*Group //当前的规则组 
  4.  mtx      sync.RWMutex //规则管理器读写锁 
  5.  block    chan struct{}  
  6.  done     chan struct{}  
  7.  restored bool  
  8.  
  9.  logger log.Logger  

读取Rule组配置

在Prometheus Server启动的过程中,首先会调用Manager.Update()方法加载Rule配置文件并进行解析,其大致流程如下。

  1. func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error { 
  2.  m.mtx.Lock() 
  3.  defer m.mtx.Unlock() 
  4.     // 从当前文件中加载规则 
  5.  groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...) 
  6.  if errs != nil { 
  7.   for _, e := range errs { 
  8.    level.Error(m.logger).Log("msg""loading groups failed""err", e) 
  9.   } 
  10.   return errors.New("error loading rules, previous rule set restored"
  11.  } 
  12.  m.restored = true 
  13.  
  14.  var wg sync.WaitGroup 
  15.    //循环遍历规则组 
  16.  for _, newg := range groups { 
  17.   // If there is an old group with the same identifier, 
  18.   // check if new group equals with the old group, if yes then skip it. 
  19.   // If not equals, stop it and wait for it to finish the current iteration. 
  20.   // Then copy it into the new group
  21.   //根据新的rules.Group的信息获取规则组名 
  22.   gn := GroupKey(newg.file, newg.name
  23.    //根据规则组名获取到老的规则组并删除原有的rules.Group实例 
  24.   oldg, ok := m.groups[gn] 
  25.   delete(m.groups, gn) 
  26.  
  27.   if ok && oldg.Equals(newg) { 
  28.    groups[gn] = oldg 
  29.    continue 
  30.   } 
  31.  
  32.   wg.Add(1) 
  33.     //为每一个rules.Group实例启动一个goroutine 
  34.   go func(newg *Group) { 
  35.    if ok { 
  36.     oldg.stop() 
  37.      //将老的规则组中的状态信息复制到新的规则组 
  38.     newg.CopyState(oldg) 
  39.    } 
  40.    wg.Done() 
  41.    // Wait with starting evaluation until the rule manager 
  42.    // is told to run. This is necessary to avoid running 
  43.    // queries against a bootstrapping storage. 
  44.    <-m.block 
  45.      //调用rules.Group.run()方法,开始周期性的执行PromQl语句 
  46.    newg.run(m.opts.Context) 
  47.   }(newg) 
  48.  } 
  49.  
  50.  // Stop remaining old groups. 
  51.  //停止所有老规则组的服务 
  52.  wg.Add(len(m.groups)) 
  53.  for n, oldg := range m.groups { 
  54.   go func(n string, g *Group) { 
  55.    g.markStale = true 
  56.    g.stop() 
  57.    if m := g.metrics; m != nil { 
  58.     m.IterationsMissed.DeleteLabelValues(n) 
  59.     m.IterationsScheduled.DeleteLabelValues(n) 
  60.     m.EvalTotal.DeleteLabelValues(n) 
  61.     m.EvalFailures.DeleteLabelValues(n) 
  62.     m.GroupInterval.DeleteLabelValues(n) 
  63.     m.GroupLastEvalTime.DeleteLabelValues(n) 
  64.     m.GroupLastDuration.DeleteLabelValues(n) 
  65.     m.GroupRules.DeleteLabelValues(n) 
  66.     m.GroupSamples.DeleteLabelValues((n)) 
  67.    } 
  68.    wg.Done() 
  69.   }(n, oldg) 
  70.  } 
  71.  
  72.  wg.Wait() 
  73.     //更新规则管理器中的规则组 
  74.  m.groups = groups  
  75.  
  76.  return nil 

运行Rule组调度方法

规则组启动流程(Group.run):进入Group.run方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为g.interval;然后定义规则运算调度方法:iter,调度周期为g.interval;在iter方法中调用g.Eval方法执行下一层次的规则运算调度。

规则运算的调度周期g.interval,由prometheus.yml配置文件中global中的 [ evaluation_interval:| default = 1m ]指定。实现如下:

  1. func (g *Group) run(ctx context.Context) { 
  2.  defer close(g.terminated) 
  3.  
  4.  // Wait an initial amount to have consistently slotted intervals. 
  5.  evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval) 
  6.  select { 
  7.  case <-time.After(time.Until(evalTimestamp))://初始化等待 
  8.  case <-g.done: 
  9.   return 
  10.  } 
  11.  
  12.  ctx = promql.NewOriginContext(ctx, map[string]interface{}{ 
  13.   "ruleGroup": map[string]string{ 
  14.    "file": g.File(), 
  15.    "name": g.Name(), 
  16.   }, 
  17.  }) 
  18.     //定义规则组规则运算调度算法 
  19.  iter := func() { 
  20.   g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() 
  21.  
  22.   start := time.Now() 
  23.     //规则运算的入口 
  24.   g.Eval(ctx, evalTimestamp) 
  25.   timeSinceStart := time.Since(start) 
  26.  
  27.   g.metrics.IterationDuration.Observe(timeSinceStart.Seconds()) 
  28.   g.setEvaluationTime(timeSinceStart) 
  29.   g.setLastEvaluation(start) 
  30.  } 
  31.  
  32.  // The assumption here is that since the ticker was started after having 
  33.  // waited for `evalTimestamp` to pass, the ticks will trigger soon 
  34.  // after each `evalTimestamp + N * g.interval` occurrence. 
  35.  tick := time.NewTicker(g.interval) //设置规则运算定时器 
  36.  defer tick.Stop() 
  37.  
  38.  defer func() { 
  39.   if !g.markStale { 
  40.    return 
  41.   } 
  42.   go func(now time.Time) { 
  43.    for _, rule := range g.seriesInPreviousEval { 
  44.     for _, r := range rule { 
  45.      g.staleSeries = append(g.staleSeries, r) 
  46.     } 
  47.    } 
  48.    // That can be garbage collected at this point. 
  49.    g.seriesInPreviousEval = nil 
  50.    // Wait for 2 intervals to give the opportunity to renamed rules 
  51.    // to insert new series in the tsdb. At this point if there is a 
  52.    // renamed rule, it should already be started. 
  53.    select { 
  54.    case <-g.managerDone: 
  55.    case <-time.After(2 * g.interval): 
  56.     g.cleanupStaleSeries(ctx, now) 
  57.    } 
  58.   }(time.Now()) 
  59.  }() 
  60.     //调用规则组规则运算的调度方法 
  61.  iter() 
  62.  if g.shouldRestore { 
  63.   // If we have to restore, we wait for another Eval to finish. 
  64.   // The reason behind this is, during first eval (or before it) 
  65.   // we might not have enough data scraped, and recording rules would not 
  66.   // have updated the latest valueson which some alerts might depend. 
  67.   select { 
  68.   case <-g.done: 
  69.    return 
  70.   case <-tick.C: 
  71.    missed := (time.Since(evalTimestamp) / g.interval) - 1 
  72.    if missed > 0 { 
  73.     g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) 
  74.     g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) 
  75.    } 
  76.    evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) 
  77.    iter() 
  78.   } 
  79.  
  80.   g.RestoreForState(time.Now()) 
  81.   g.shouldRestore = false 
  82.  } 
  83.  
  84.  for { 
  85.   select { 
  86.   case <-g.done: 
  87.    return 
  88.   default
  89.    select { 
  90.    case <-g.done: 
  91.     return 
  92.    case <-tick.C: 
  93.     missed := (time.Since(evalTimestamp) / g.interval) - 1 
  94.     if missed > 0 { 
  95.      g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) 
  96.      g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) 
  97.     } 
  98.     evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) 
  99.      //调用规则组规则运算的调度方法 
  100.     iter() 
  101.    } 
  102.   } 
  103.  } 

运行Rule调度方法

规则组对具体规则的调度在Group.Eval中实现,在Group.Eval方法中会将规则组下的每条规则通过QueryFunc将(promQL)放到查询引擎(queryEngine)中执行,如果被执行的是AlertingRule类型,那么执行结果指标会被NotifyFunc组件发送给告警服务;如果是RecordingRule类型,最后将改结果指标存储到Prometheus的储存管理器中,并对过期指标进行存储标记处理。

  1. // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. 
  2. func (g *Group) Eval(ctx context.Context, ts time.Time) { 
  3.  var samplesTotal float64 
  4.     遍历当前规则组下的所有规则 
  5.  for i, rule := range g.rules { 
  6.   select { 
  7.   case <-g.done: 
  8.    return 
  9.   default
  10.   } 
  11.  
  12.   func(i intrule Rule) { 
  13.    sp, ctx := opentracing.StartSpanFromContext(ctx, "rule"
  14.    sp.SetTag("name"rule.Name()) 
  15.    defer func(t time.Time) { 
  16.     sp.Finish() 
  17.       //更新服务指标-规则的执行时间 
  18.     since := time.Since(t) 
  19.     g.metrics.EvalDuration.Observe(since.Seconds()) 
  20.     rule.SetEvaluationDuration(since) 
  21.       //记录本次规则执行的耗时 
  22.     rule.SetEvaluationTimestamp(t) 
  23.    }(time.Now()) 
  24.      //记录规则运算的次数 
  25.    g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() 
  26.      //运算规则 
  27.    vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) 
  28.    if err != nil { 
  29.       //规则出现错误后,终止查询 
  30.     rule.SetHealth(HealthBad) 
  31.     rule.SetLastError(err) 
  32.      //记录查询失败的次数 
  33.     g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() 
  34.  
  35.     // Canceled queries are intentional termination of queries. This normally 
  36.     // happens on shutdown and thus we skip logging of any errors here. 
  37.     if _, ok := err.(promql.ErrQueryCanceled); !ok { 
  38.      level.Warn(g.logger).Log("msg""Evaluating rule failed""rule"rule"err", err) 
  39.     } 
  40.     return 
  41.    } 
  42.    samplesTotal += float64(len(vector)) 
  43.             //判断是否是告警类型规则 
  44.    if ar, ok := rule.(*AlertingRule); ok { 
  45.                 发送告警 
  46.     ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) 
  47.    } 
  48.    var ( 
  49.     numOutOfOrder = 0 
  50.     numDuplicates = 0 
  51.    ) 
  52.     //此处为Recording获取存储器指标 
  53.    app := g.opts.Appendable.Appender(ctx) 
  54.    seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) 
  55.    defer func() { 
  56.     if err := app.Commit(); err != nil { 
  57.      rule.SetHealth(HealthBad) 
  58.      rule.SetLastError(err) 
  59.      g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() 
  60.  
  61.      level.Warn(g.logger).Log("msg""Rule sample appending failed""err", err) 
  62.      return 
  63.     } 
  64.     g.seriesInPreviousEval[i] = seriesReturned 
  65.    }() 
  66.  
  67.    for _, s := range vector { 
  68.     if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil { 
  69.      rule.SetHealth(HealthBad) 
  70.      rule.SetLastError(err) 
  71.  
  72.      switch errors.Cause(err) { 
  73.                         储存指标返回的各种错误码处理 
  74.      case storage.ErrOutOfOrderSample: 
  75.       numOutOfOrder++ 
  76.       level.Debug(g.logger).Log("msg""Rule evaluation result discarded""err", err, "sample", s) 
  77.      case storage.ErrDuplicateSampleForTimestamp: 
  78.       numDuplicates++ 
  79.       level.Debug(g.logger).Log("msg""Rule evaluation result discarded""err", err, "sample", s) 
  80.      default
  81.       level.Warn(g.logger).Log("msg""Rule evaluation result discarded""err", err, "sample", s) 
  82.      } 
  83.     } else { 
  84.       //缓存规则运算后的结果指标 
  85.      seriesReturned[s.Metric.String()] = s.Metric 
  86.     } 
  87.    } 
  88.    if numOutOfOrder > 0 { 
  89.     level.Warn(g.logger).Log("msg""Error on ingesting out-of-order result from rule evaluation""numDropped", numOutOfOrder) 
  90.    } 
  91.    if numDuplicates > 0 { 
  92.     level.Warn(g.logger).Log("msg""Error on ingesting results from rule evaluation with different value but same timestamp""numDropped", numDuplicates) 
  93.    } 
  94.  
  95.    for metric, lset := range g.seriesInPreviousEval[i] { 
  96.     if _, ok := seriesReturned[metric]; !ok { 
  97.       //设置过期指标的指标值 
  98.      // Series no longer exposed, mark it stale. 
  99.      _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) 
  100.      switch errors.Cause(err) { 
  101.      case nil: 
  102.      case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 
  103.       // Do not count these in logging, as this is expected if series 
  104.       // is exposed from a different rule
  105.      default
  106.       level.Warn(g.logger).Log("msg""Adding stale sample failed""sample", metric, "err", err) 
  107.      } 
  108.     } 
  109.    } 
  110.   }(i, rule
  111.  } 
  112.  if g.metrics != nil { 
  113.   g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) 
  114.  } 
  115.  g.cleanupStaleSeries(ctx, ts) 

然后就是规则的具体执行了,我们这里先只看AlertingRule的流程。首先看下AlertingRule的结构:

  1. // An AlertingRule generates alerts from its vector expression. 
  2. type AlertingRule struct { 
  3.     // The name of the alert. 
  4.     name string 
  5.     // The vector expression from which to generate alerts. 
  6.     vector parser.Expr 
  7.     // The duration for which a labelset needs to persist in the expression 
  8.     // output vector before an alert transitions from Pending to Firing state. 
  9.     holdDuration time.Duration 
  10.     // Extra labels to attach to the resulting alert sample vectors. 
  11.     labels labels.Labels 
  12.     // Non-identifying key/value pairs. 
  13.     annotations labels.Labels 
  14.     // External labels from the global config. 
  15.     externalLabels map[string]string 
  16.     // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE 
  17.     // only after the restoration. 
  18.     restored bool 
  19.     // Protects the below. 
  20.     mtx sync.Mutex 
  21.     // Time in seconds taken to evaluate rule
  22.     evaluationDuration time.Duration 
  23.     // Timestamp of last evaluation of rule
  24.     evaluationTimestamp time.Time 
  25.     // The health of the alerting rule
  26.     health RuleHealth 
  27.     // The last error seen by the alerting rule
  28.     lastError error 
  29.     // A map of alerts which are currently active (Pending or Firing), keyed by 
  30.     // the fingerprint of the labelset they correspond to
  31.     active map[uint64]*Alert 
  32.     logger log.Logger 

这里比较重要的就是active字段了,它保存了执行规则后需要进行告警的资源,具体是否告警还要执行一系列的逻辑来判断是否满足告警条件。具体执行的逻辑如下:

  1. func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { 
  2.     res, err := query(ctx, r.vector.String(), ts) 
  3.     if err != nil { 
  4.         r.SetHealth(HealthBad) 
  5.         r.SetLastError(err) 
  6.         return nil, err 
  7.     } 
  8.     // ...... 

这一步通过创建Manager时传入的QueryFunc函数执行规则配置中的expr表达式,然后得到返回的结果,这里的结果是满足表达式的指标的集合。比如配置的规则为:

  1. cpu_usage > 90 

那么查出来的结果可能是

  1. cpu_usage{instance="192.168.0.11"} 91 
  2. cpu_usage{instance="192.168.0.12"} 92 

然后遍历查询到的结果,根据指标的标签生成一个hash值,然后判断这个hash值是否之前已经存在(即之前是否已经有相同的指标数据返回),如果是,则更新上次的value及annotations,如果不是,则创建一个新的alert并保存至该规则下的active alert列表中。然后遍历规则的active alert列表,根据规则的持续时长配置、alert的上次触发时间、alert的当前状态、本次查询alert是否依然存在等信息来修改alert的状态。具体规则如下:

如果alert之前存在,但本次执行时不存在

如果alert之前存在并且本次执行仍然存在

其余情况修改alert的状态为StatePending

上面那一步只是修改了alert的状态,但是并没有真正执行发送告警操作。下面才是真正要执行告警操作:

  1. // 判断规则是否是alert规则,如果是则发送告警信息(具体是否真正发送由ar.sendAlerts中的逻辑判断) 
  2. if ar, ok := rule.(*AlertingRule); ok { 
  3.     ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) 
  4. // ....... 
  5. func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { 
  6.     alerts := []*Alert{} 
  7.     r.ForEachActiveAlert(func(alert *Alert) { 
  8.         if alert.needsSending(ts, resendDelay) { 
  9.             alert.LastSentAt = ts 
  10.             // Allow for two Eval or Alertmanager send failures. 
  11.             delta := resendDelay 
  12.             if interval > resendDelay { 
  13.                 delta = interval 
  14.             } 
  15.             alert.ValidUntil = ts.Add(4 * delta) 
  16.             anew := *alert 
  17.             alerts = append(alerts, &anew) 
  18.         } 
  19.     }) 
  20.     notifyFunc(ctx, r.vector.String(), alerts...) 
  21. func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool { 
  22.     if a.State == StatePending { 
  23.         return false 
  24.     } 
  25.     // if an alert has been resolved since the last send, resend it 
  26.     if a.ResolvedAt.After(a.LastSentAt) { 
  27.         return true 
  28.     } 
  29.     return a.LastSentAt.Add(resendDelay).Before(ts) 

概括一下以上逻辑就是:

  1. 如果alert的状态是StatePending,则不发送告警
  2. 如果alert的已经被解决,那么再次发送告警标记该条信息已经被解决
  3. 如果当前时间距离上次发送告警的时间大于配置的重新发送延时时间(ResendDelay),则发送告警,否则不发送

以上就是prometheus的告警流程。学习这个流程主要是问了能够对prometheus的rules相关的做二次开发。我们可以修改LoadGroups()方法,让其可以动态侧加载定义在mysql中定义的规则,动态实现告警规则更新。

参考: 

《深入浅出prometheus原理、应用、源码与拓展详解》

 

来源:运维开发故事 内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯