Prometheus Discovery 之 K8S 代码分析

k8s技术圈

共 35123字,需浏览 71分钟

 ·

2021-07-05 14:27

Service Discovery interface

Service Discovery 必须实现 Discovery 接口,定义如下:

type Discoverer interface {
   Run(ctx context.Context, ch chan<- []*targetgroup.Group)
}

Prometheus 支持了众多的 SD 发现机制,代码位于 discovery 目录下。

Group 的结构定义如下:

// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
 // Targets is a list of targets identified by a label set. Each target is
 // uniquely identifiable in the group by its address label.
 Targets []model.LabelSet
 // Labels is a set of labels that is common across all targets in the group.
 Labels model.LabelSet

 // Source is an identifier that describes a group of targets.
 Source string
}

K8S 的 Discoverer 定义在文件 gprometheus/discovery/kubernetes/kubernetes.go 中。

在 init 函数中注册了metrics prometheus_sd_kubernetes_events_total,用于分析发现过程中的事件接受数量:

func init() {
 prometheus.MustRegister(eventCount)

 // Initialize metric vectors.
 for _, role := range []string{"endpoints""node""pod""service"} {
  for _, evt := range []string{"add""delete""update"} {
   eventCount.WithLabelValues(role, evt)
  }
 }
}

第一次可以全量的将全部事件发送到接口中定义的 ch 中,后续的更新事件,只需要发送更新的事件信息内容即可,如果信息被删除了则可以发送一个为空的事件内容(包含 Source),所有事件通过 Source 字段作为唯一 key。

// Discovery implements the discoverer interface for discovering
// targets from Kubernetes.
// 每个 role 会启动一个单独的 Discovery 进行跟踪
type Discovery struct {
 sync.RWMutex
 client             kubernetes.Interface  // 连接到 k8s 的 client
 role               Role
 logger             log.Logger
 namespaceDiscovery *NamespaceDiscovery // 保存需要监控的 namespace 
 discoverers        []discoverer        // 每个 role 按照 namespace 进行划分,单独跟踪的 sd
}

其中 discoverers 分不同的 namespace,每个 namespace 会单独起一个内部的 discoverer 来进行单独的跟踪。

// This is only for internal use.
type discoverer interface {
 Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

初始化和运行

main 函数入口

func main() {
    // ...
    // 初始化 
 discoveryManagerScrape  = discovery.NewManager(ctxScrape, log.With(logger, "component""discovery manager scrape"))
    
    // Notify 的作用待定?
    discoveryManagerNotify  = discovery.NewManager(ctxNotify, log.With(logger, "component""discovery manager notify"))
 
 // ...
 
 reloaders := []func(cfg *config.Config) error{
   // JobName 为 key ,对应到相关配置
         // v.JobName] = v.ServiceDiscoveryConfig
    func(cfg *config.Config) error {
   c := make(map[string]sd_config.ServiceDiscoveryConfig)
   for _, v := range cfg.ScrapeConfigs {
    c[v.JobName] = v.ServiceDiscoveryConfig
   }
   return discoveryManagerScrape.ApplyConfig(c)
  },
  // ...
        
     {
  // Scrape discovery manager.
  g.Add(
   func() error {
               // 调用 Run 启动
    err := discoveryManagerScrape.Run()
    level.Info(logger).Log("msg""Scrape discovery manager stopped")
    return err
   },
   func(err error) {
    level.Info(logger).Log("msg""Stopping scrape discovery manager...")
    cancelScrape()
   },
  )
 }
        
   if err := g.Run(); err != nil {
  level.Error(logger).Log("err", err)
  os.Exit(1)
 }
}

整个 DS 的入口在 prometheus/discovery/manager.go

NewManager 函数用于生成 DS Mgr 对象:

// NewManager is the Discovery Manager constructor
func NewManager(ctx context.Context, logger log.Logger) *Manager {
   if logger == nil {
      logger = log.NewNopLogger()
   }
   return &Manager{
      logger:         logger,
      syncCh:         make(chan map[string][]*targetgroup.Group),
      targets:        make(map[poolKey]map[string]*targetgroup.Group),
      discoverCancel: []context.CancelFunc{},
      ctx:            ctx,
   }
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
 m.mtx.Lock()
 defer m.mtx.Unlock()

 m.cancelDiscoverers()
    // 对于配置文件组织管理
 for name, scfg := range cfg {
  m.registerProviders(scfg, name)
 }
    
    // 全部启动
 for _, prov := range m.providers {
  m.startProvider(m.ctx, prov)
 }

 return nil
}

将每个 JobName 为 key 的结构进行注册管理

// provider 用于管理可能相同配置的不同 job 任务
// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
 name   string       // "kubernetes_sd_configs/[0-n]"
 d      Discoverer   // 对应的 Discoverer 
 subs   []string     // 配置相关情况下的,可能是不同的 JobName
 config interface{}  // 对应的相关配置
}

Manager 结构如下:

// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
type Manager struct {
   logger         log.Logger
   mtx            sync.RWMutex
   ctx            context.Context
   discoverCancel []context.CancelFunc

   // Some Discoverers(eg. k8s) send only the updates for a given target group
   // so we use map[tg.Source]*targetgroup.Group to know which group to update.
    // poolkey: job_name + provider_n, 将事件放到各个 job 任务的队列中
   targets map[poolKey]map[string]*targetgroup.Group 
   // providers keeps track of SD providers.
   providers []*provider
   // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
    // 经过聚合后,将需要更新的对象信息发送到 jobName 的队列中
   syncCh chan map[string][]*targetgroup.Group
}

registerProviders:

func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string){
    // setName 为 JobName
 add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
  t := reflect.TypeOf(cfg).String() // kubernetes_sd_configs
  for _, p := range m.providers {
   if reflect.DeepEqual(cfg, p.config) {
    p.subs = append(p.subs, setName)
    return
   }
  }
  
        // call kubernetes.New(log.With(m.logger, "discovery""k8s"), cfg)
  d, err := newDiscoverer()
  provider := provider{
            // t = "kubernetes_sd_configs"
   name:   fmt.Sprintf("%s/%d", t, len(m.providers)),
   d:      d,
   config: cfg,
   subs:   []string{setName},
  }
  m.providers = append(m.providers, &provider)
 }
 
 // ...
    // 循环处理 k8s 相关的配置
 for _, c := range cfg.KubernetesSDConfigs {
  add(c, func() (Discoverer, error) {
   return kubernetes.New(log.With(m.logger, "discovery""k8s"), c)
  })
 }
 // ...

整体结构如下:

Manager --> []provider -> provider[k8s/0] -->  Discovery ->  [roleA]discoverys -> ns1, ns2 
                          provider[k8s/n]                ->  [roleB]discoverys -> ns1, ns2
                          provider[xxx/n]         discovery: Service, Endpoints, Service..

启动:

func (m *Manager) startProvider(ctx context.Context, p *provider) {
   level.Debug(m.logger).Log("msg""Starting provider""provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
   ctx, cancel := context.WithCancel(ctx)
    
   // updates 为 SD 对外输出目标的通道,需要重点关注
   updates := make(chan []*targetgroup.Group)

   m.discoverCancel = append(m.discoverCancel, cancel)

   go p.d.Run(ctx, updates) // 循环启动
   go m.updater(ctx, p, updates)
}

updater(ctx, p, updates) 负责从 SD 发送的通道中读取数据:

func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
   ticker := time.NewTicker(5 * time.Second)
   defer ticker.Stop()

   triggerUpdate := make(chan struct{}, 1)

   for {
      select {
      case <-ctx.Done():
         return
      case tgs, ok := <-updates:
         if !ok {
            level.Debug(m.logger).Log("msg""discoverer channel closed, sending the last update""provider", p.name)
            select {
            case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update.
               level.Debug(m.logger).Log("msg""discoverer exited""provider", p.name)
               return
            case <-ctx.Done():
               return
            }

         }
          
          // s: job_name, provider: k8s/n
          // 针对可能的订阅者(包括相同配置的订阅者) 发送事件
         for _, s := range p.subs {
            m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
         }

         select {
         case triggerUpdate <- struct{}{}:
         default:
         }
      case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
         select {
         case <-triggerUpdate:
            select {
                // m.allGroups() 按照 pkey.SetName (job_name 进行聚合)
            case m.syncCh <- m.allGroups(): 
            default:
               level.Debug(m.logger).Log("msg""discovery receiver's channel was full so will retry the next cycle""provider", p.name)
               select {
               case triggerUpdate <- struct{}{}:
               default:
               }
            }
         default:
         }
      }
   }
}

读取 channel 数据后,放入到相对应的 poolkey 中进行更新

func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
 m.mtx.Lock()
 defer m.mtx.Unlock()

 for _, tg := range tgs {
  if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
   if _, ok := m.targets[poolKey]; !ok {
    m.targets[poolKey] = make(map[string]*targetgroup.Group)
   }
            // poolkey: job_name + provider_n, 
   m.targets[poolKey][tg.Source] = tg
  }
 }
}

最终 Manager 将数据汇总到了 syncCh chan map[string][]*targetgroup.Group 中的定义的 JobName 对应的队列中,通过其 SyncCh 函数将该通道返回出去

处理更新后的事件

main 函数中,scrapeManager 负责从 DS Manager 的输出队列中读取数据:

int main(){
    
 // ...
 {
  // Scrape manager.
  g.Add(
   func() error {
    // When the scrape manager receives a new targets list
    // it needs to read a valid config for each job.
    // It depends on the config being in sync with the discovery manager so
    // we wait until the config is fully loaded.
    <-reloadReady.C

    err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
    level.Info(logger).Log("msg""Scrape manager stopped")
    return err
   },
   func(err error) {
    // Scrape manager needs to be stopped before closing the local TSDB
    // so that it doesn't try to write samples to a closed storage.
    level.Info(logger).Log("msg", "Stopping scrape manager...")
    scrapeManager.Stop()
   },
  )
 }
 
    // ...
}

scraple/manager.go 中定义:

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
 logger    log.Logger
 append    Appendable
 graceShut chan struct{}

 mtxTargets     sync.Mutex // Guards the fields below.
 targetsActive  []*Target
 targetsDropped []*Target
 targetsAll     map[string][]*Target

 mtxScrape     sync.Mutex // Guards the fields below.
 scrapeConfigs map[string]*config.ScrapeConfig
 scrapePools   map[string]*scrapePool
}

// Run starts background processing to handle target updates and reload the scraping loops.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
 for {
  select {
  case ts := <-tsets:
   m.reload(ts)
  case <-m.graceShut:
   return nil
  }
 }
}

reload 函数定义如下:

func (m *Manager) reload(t map[string][]*targetgroup.Group) {
 m.mtxScrape.Lock()
 defer m.mtxScrape.Unlock()

 tDropped := make(map[string][]*Target)
 tActive := make(map[string][]*Target)

 for tsetName, tgroup := range t {
  var sp *scrapePool
  if existing, ok := m.scrapePools[tsetName]; !ok {
   scrapeConfig, ok := m.scrapeConfigs[tsetName]
   if !ok {
    level.Error(m.logger).Log("msg""error reloading target set""err", fmt.Sprintf("invalid config id:%v", tsetName))
    continue
   }
   sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
   m.scrapePools[tsetName] = sp
  } else {
   sp = existing
  }
        
        // Sync 函数中用于过滤相关符合条件的 Target
  tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
 }
    
    // 更新获取和丢弃的目标,可以通过界面查询到对应的结果 
 m.targetsUpdate(tActive, tDropped)
}

sync 定义如下:

// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
// 同步将目标组转换为实际的抓取目标,并将当前运行的抓取与结果集同步,并返回所有刮取和删除的目标。
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
 start := time.Now()

 var all []*Target
 sp.mtx.Lock()
 sp.droppedTargets = []*Target{}
 for _, tg := range tgs {
        // targetsFromGroup 函数通过相关配置完成转换
  targets, err := targetsFromGroup(tg, sp.config)

  for _, t := range targets {
            // 返回目标的标签,不是处理后的,不以 “__” 为前缀
   if t.Labels().Len() > 0 { // 不存在以 “__" 为前缀匹配的标签
    all = append(all, t)
   } else if t.DiscoveredLabels().Len() > 0 {
    sp.droppedTargets = append(sp.droppedTargets, t)
   }
  }
 }
 sp.mtx.Unlock()
 sp.sync(all)

 targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
  time.Since(start).Seconds(),
 )
 targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()

 sp.mtx.RLock()
 for _, t := range sp.targets {
  tActive = append(tActive, t)
 }
 tDropped = sp.droppedTargets
 sp.mtx.RUnlock()

 return tActive, tDropped
}

targetsFromGroup函数根据输入的对象信息和相关配置完成过滤的整体工作:

// targetsFromGroup builds targets based on the given TargetGroup and config.
// targetsFromGroup 根据给定的 TargetGroup 和 config 构建目标
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
 targets := make([]*Target, 0, len(tg.Targets))

 for i, tlset := range tg.Targets {
        // 将每个数组中的标签和通用标签合并进行过滤
  lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))
  
        // 合并发现的标签
  for ln, lv := range tlset {
   lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
  }
        
        // 合并通用标签
  for ln, lv := range tg.Labels {
   if _, ok := tlset[ln]; !ok {
    lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
   }
  }
  
        // 复制一份合并后的标签列表
  lset := labels.New(lbls...)

        // 根据 cfg 配置来进行过滤, lset 为本次的全量标签; lbls 为根据配置处理后的标签集合,origLabels 为处理之前的原始标签集
  lbls, origLabels, err := populateLabels(lset, cfg)

        // 如果 lbls 或者 origLabels 有一个不为空,则加入
  if lbls != nil || origLabels != nil {    // cfg.Params 配置中添加到 url 后的参数
   targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
  }
 }
 return targets, nil
}

过滤后的 Target 结构定义如下:

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
 // Labels before any processing.
 discoveredLabels labels.Labels
 
    // Any labels that are added to this target and its metrics.
 labels labels.Labels
    
 // Additional URL parmeters that are part of the target URL.
 params url.Values

 mtx        sync.RWMutex
 lastError  error
 lastScrape time.Time
 health     TargetHealth
 metadata   metricMetadataStore
}

populateLabels 根据给定的标签集和 scrape 配置构建标签集。会在重新标记应用之前返回标签集作为第二个返回值。如果在重新标记期间丢弃目标,则返回在应用重新标记之前找到的原始发现标签集。

// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
// 函数根据给定的 labels 和 相关配置的选项,来进行 relabel 的处理,返回的第一个参数为匹配后的结果集,第二个参数返回应用之前的 labels, 如果第一个参数为空,则表示该目标被丢失,比如 action:drop 
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
   // Copy labels into the labelset for the target if they are not set already.
   scrapeLabels := []labels.Label{
      {Name: model.JobLabel, Value: cfg.JobName},
      {Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
      {Name: model.SchemeLabel, Value: cfg.Scheme},
   }
   lb := labels.NewBuilder(lset)

   for _, l := range scrapeLabels {
      if lv := lset.Get(l.Name); lv == "" {
         lb.Set(l.Name, l.Value)
      }
   }
   // Encode scrape query parameters as labels.
   for k, v := range cfg.Params {
      if len(v) > 0 {
         lb.Set(model.ParamLabelPrefix+k, v[0])
      }
   }

   preRelabelLabels := lb.Labels()
   // relabel.Process 进程返回给定标签集的重新标记的副本。relabel 按输入顺序应用。
   // 如果删除标签集,则返回nill
   // 可以返回修改的输入 labelSet。
   // Process 会自动添加 job 和 instance 两个lable, 如果 lset 为空这说明不是监控的目标
   lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)

   // Check if the target was dropped.
   if lset == nil {
      return nil, preRelabelLabels, nil
   }
   if v := lset.Get(model.AddressLabel); v == "" {
      return nil, nil, fmt.Errorf("no address")
   }

   lb = labels.NewBuilder(lset)

   // addPort checks whether we should add a default port to the address.
   // If the address is not valid, we don't append a port either.
   addPort := func(s string) bool {
      // If we can split, a port exists and we don'
t have to add one.
      if _, _, err := net.SplitHostPort(s); err == nil {
         return false
      }
      // If adding a port makes it valid, the previous error
      // was not due to an invalid address and we can append a port.
      _, _, err := net.SplitHostPort(s + ":1234")
      return err == nil
   }
   addr := lset.Get(model.AddressLabel)
   // If it's an address with no trailing port, infer it based on the used scheme.
   if addPort(addr) {
      // Addresses reaching this point are already wrapped in [] if necessary.
      switch lset.Get(model.SchemeLabel) {
      case "http", "":
         addr = addr + ":80"
      case "https":
         addr = addr + ":443"
      default:
         return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
      }
      lb.Set(model.AddressLabel, addr)
   }

   if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
      return nil, nil, err
   }

   // Meta labels are deleted after relabelling. Other internal labels propagate to
   // the target which decides whether they will be part of their label set.
   for _, l := range lset {
      if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
         lb.Del(l.Name)
      }
   }

   // Default the instance label to the target address.
   if v := lset.Get(model.InstanceLabel); v == "" {
      lb.Set(model.InstanceLabel, addr)
   }

   res = lb.Labels()
   for _, l := range res {
      // Check label values are valid, drop the target if not.
      if !model.LabelValue(l.Value).IsValid() {
         return nil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
      }
   }
   return res, preRelabelLabels, nil
}

关于 prometheus_client 相关的测试样例参见:https://github.com/DavadDi/Kubernetes_study/tree/master/prometheus_client

原文链接:https://www.cn18k.com/2018/09/13/Prometheus-Discovery-K8S/

K8S 进阶训练营


 点击屏末  | 即刻学习


扫描二维码获取

更多云原生知识





k8s 技术圈






浏览 69
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报