k8s源码分析- Informer机制

微信扫一扫,分享到朋友圈

k8s源码分析- Informer机制

Informer在k8s的重要性就不再赘述,直接切入正题。

先放一张调用关系图

高清地址

由于Informer这部分的源码比较复杂,调用链路也很长,后面的源码分析,都会围绕这一张图展开。

概述

k8s中,组件之间通过http通讯,在不依赖任何中间件的情况下,需要保证消息的可靠性、实时性、顺序性等?k8s是如何做到的呢?— 答案就是Informer。k8s的其他组件都是通过informer与api-server通讯的。

Informer运行原理

各个组件包括:

  • Reflector:用于监控(watch)指定的资源,当监控的资源发生变化时,触发相应的变更事件。并将资源对象存放到本地缓存DeltaFIFO中
  • DeltaFIFO:对资源对象的的操作类型进行队列的基本操作
    • FIFO:先进先出队列,提供资源对象的增删改查等操作
    • Dealta:资源对象存储,可以保存资源对象的操作类型。如:添加操作类型、更新操作类型、删除操作类型、同步操作类型
  • Indexer:存储资源对象,并自带索引功能的本地存储。
    • Reflect从DeltaFIFO中将消费出来的资源对象存储至Indexer
    • Indexer中的数据与Etcd完全一致,client-go可以从本地读取,减轻etcd和api-server的压力

Informer使用示例

  • 通过kubernetes.NewForConfig创建clientset对象。informer需要通过clientset与apiserver进行交互
  • 创建一个用于停止的channel,用于进程退出前通知informer提前退出。因为informer是一个持久运行的Groutine
  • informers.NewSharedInformerFactory实例化sharedInformer对象
    • 第一个参数是ClientSet
    • 第二个参数是多久同步一次
  • Informer方法可以获得特定资源的informer对象
  • AddEventHandler函数可以为对象添加回调方法,支持三种对象的回调方法
    • AddFunc:创建资源对象时触发的回调方法
    • UpdateFunc:更新资源对象时触发的回调方法
    • DeleteFunc:删除资源对象时触发的回调方法
  • Run方法运行当前的informer
// 通过informer机制,实现k8s资源的监控
func informer() {
// 因为informer是一个持久运行的groutine,channel作用:进程退出前通知informer退出
stopChan := make(chan struct{})
defer close(stopChan)
// 创建连接k8s的client对象
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Printf("init clientset error.")
return
}
// 第一步:创建sharedInformer对象,第二个参数为重新同步数据的间隔时间
sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
// 第二步:每个资源都有informer对象,这里获取pod资源的informer对象
podInformer := sharedInformers.Core().V1().Pods().Informer()
// 第三步:添加自定义回调函数
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// 添加资源的回调函数,返回的是接口类型,需要强制转换为真正的类型
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New pod added: %s", mObj.GetName())
},
// 更新资源的回调函数
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
},
// 删除资源的回调函数
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("pod deleted from store: %s", mObj.GetName())
},
})
// 第四步:开始运行informer对象
podInformer.Run(stopChan)
}
复制代码

资源Informer和SharedInformer

前面demo中,第一步便是创建SharedInformer对象,下面先介绍一下Informer和SharedInformer

资源Informer

  • 每一种资源都实现了Informer机制,允许监控不同的资源事件
  • 每一个Informer都会实现Informer和Lister方法
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
复制代码

SharedInformer

若同一个资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多相同的ListAndWatch,太多重复的序列化和反序列化操作会导致api-server负载过重

SharedInformer可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。

前面demo中第一步创建SharedInformer, sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute) ,内部初始化了一个sharedInformerFactory对象,先看下sharedInformerFactory

源码位置:vendor/k8s.io/client-go/informer/factory.go

type sharedInformerFactory struct {
client           kubernetes.Interface
namespace        string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock             sync.Mutex
defaultResync    time.Duration
customResync     map[reflect.Type]time.Duration
// 按照类型存放共享的informer
informers map[reflect.Type]cache.SharedIndexInformer
// 这个字段用来追踪informers是否被启动了
// 可以保证Start()方法安全的重复调用多次(幂等性)
startedInformers map[reflect.Type]bool
}
复制代码

Start方法

k8s中的Controller-Manager组件,源码中的Run方法调用了SharedInformerFactory的Start方法

源码位置:cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
controllerContext.InformerFactory.Start(controllerContext.Stop)
...
}
复制代码

源码位置:k8s.io/client-go/informers/factory.go

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
...
// 遍历所有的informers
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
// 每一种informer启动一个协程,运行Run方法
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
复制代码

获取Informer

前面demo中,创建好sharedInformer对象后,第二步是调用 podInformer := sharedInformers.Core().V1().Pods().Informer() ,获取具体的informer实例,下面开始分析Informer方法

关键逻辑包括:

  • sharedProcessor的初始化
  • List和Watch方法的注册:注册具体某个资源类型的list和watch方法
  • Indexer的初始化:实现类是cache类

以pod为例,源码位置:client-go/informers/core/v1/pod.go

// 获取pod的informer,内部调用InformerFor,参数需要传入f.defaultInformer
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// 最后一个参数,初始化indexers
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// 注册List、Watch方法
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// List方法是该种资源对象的List方法(这里是pod)
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// Watch方法是该种资源对象的Watch方法(这里是pod)
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 这里是processor的初始化
processor:                       &sharedProcessor{clock: realClock},
// 这里是Indexer的初始化,接口为Indexer,实现类为cache
indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher:                   lw,
objectType:                      exampleObject,
resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock:                           realClock,
}
return sharedIndexInformer
}
// Index接口,实现类是cache类
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc:      keyFunc,
}
}
复制代码

注册自定义回调函数

得到Informer对象之后,第三步是给该infomer注册自定义回调函数,当k8s的资源发送变更时,可以实现自己的业务逻辑。 下面分析一下 podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{...}) 的逻辑

// 开始注册事件处理函数
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
...
// 每一个监听者,都会注册为一个listner实例
// 每个listener中持有一个handler对象,后面事件发生时,框架会调用handler方法,也就走到用户注册的代码逻辑了
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
...
// 将listner添加到sharedProcessor中
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
// 将listner添加到sharedProcessor中
func (p *sharedProcessor) addListener(listener *processorListener) {
...
if p.listenersStarted {
// listener后台启动了两个协程,这两个协程很关键,后面会介绍
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
复制代码

Run方法

前面demo中,得到informer实例之后,最后一步就是调用Run方法,下面开始解析Run方法的逻辑, 核心逻辑包括:

  • DeltaFIFO的初始化
  • Controller的初始化
  • 运行process.Run方法
  • 运行controller.Run方法

源码位置:k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 初始化DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects:          s.indexer,
EmitDeltaTypeReplaced: true,
})
// config初始化
// 这里重点关注ListerWatcher对象和Process对象,Process关联的是HandleDeltas函数
// HandleDeltas是消费增量信息(Delta对象)的核心方法
cfg := &Config{
Queue:            fifo,
// ListAndWatch对象
ListerWatcher:    s.listerWatcher,
ObjectType:       s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError:     false,
ShouldResync:     s.processor.shouldResync,
// 注册回调函数 HandleDeltas,资源变更时,存到到本地Indexer
Process:           s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
// 这里主要是controller的初始化
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 初始化Controller对象
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// s.cacheMutationDetector.Run检查缓存对象是否存在
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 执行sharedProcessor.run方法
// 这个方法非常重要
wg.StartWithChannel(processorStopCh, s.processor.run)
...
// 调用Controller的Run方法
s.controller.Run(stopCh)
}
复制代码

process的Run方法

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
...
// sharedProcessor的所有Listner,每个后台启动两个协程
// 分别指向run和pop方法
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
...
}
复制代码

processorListener的run方法

processorListener代表一个消费者对象,该函数周期性执行,主要是从自己的nextCh通道中获取从api-server得到的增量信息,然后调用handler的相关方法,handler方法就是前面用户自定义传进来的方法。

这里我们只需要知道,run方法是消费者方法,负责消费事件。后面会介绍到谁是生产者,谁往processorLister的nextCh通道中放入增量信息(其实就是下面的pop方法)

func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// 消费者方法,不断从通道中获取事件
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
// 调用handler的方法,
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
// OnUpdate方法,内部就是调用demo中注册的UpdateFunc方法
// 其他方法类似
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
复制代码

processorListener的pop方法

这个方法的实现非常复杂,但是总体的目的很简单:就是往nextCh中生产消息,然后前面介绍的run方法就可以消费到

这里主要用到了类似缓冲区的方法,并不是一次就获取一个事件

func (p *processorListener) pop() {
...
var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 函数第一次进来,这个notification一定是空的,这个case会被阻塞住
// 当第二个case调用完成后,notification被赋值为notificationToAdd,才会进入到这里
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 第一次调用时,会进入这里,首先从addCh中获取数据(后面会介绍谁往addCh中放数据)
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
notification = notificationToAdd
// channel是引用类型,将p.nextCh指向nextCh,对nextCh的操作就是操作p.nextCh
// 这里也解答了前面的run方法里面提到的疑问,谁是nextCh的生产者,往nextCh放入数据
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
复制代码

pop函数中我们看到,主要是消费p.addCh,后面详细介绍p.addCh的生产者是谁。这里简单提一下,watch函数监测到api-server的事件变化时,触发HandlerDelta函数,这个函数会更新Indexer,同时调用distribute方法,将该事件通知给所有的listener,内部实现就是:往每个listener的addCh这个通道中放入数据。

Controller的Run方法

Run方法内部关键逻辑包括:

  • 初始化Reflector对象
  • 调用Reflector的Run方法
    • 调用List获取全部资源数据
    • 调用Watch实时监控资源变更情况,并放入队列
  • 调用controller的processLoop方法
    • 消费队列中的数据

源码位置:k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
// 调用NewReflector初始化一个Reflector
// 必须传入ListWatcher数据接口对象
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// 调用Reflector的Run方法,启动监控,并处理监控事件
wg.StartWithChannel(stopCh, r.Run)
// processLoop负责从DeltaFIFO取出数据并消费
wait.Until(c.processLoop, time.Second, stopCh)
}
复制代码

Reflector

这里先暂停源码分析,简单介绍一下Reflector,Reflector用于监控指定的k8s资源,并触发相应的变更事件。

  • NewReflector:创建Reflector对象,需传入ListerWatcher数据接口对象
  • Run:启动监控,并处理事件

Run中的核心函数是ListAndWatch,流程包括:

  • 获取资源列表数据
  • 监控资源对象:使用http协议的分块传输编码

Reflector类

type Reflector struct {
// 名称
name string
expectedTypeName string
// 期望放入缓存store的资源类型
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// 存放同步监听到的资源,这里是DeltaFIFO类
store Store
// 用来执行List和Watch的对象
listerWatcher ListerWatcher
backoffManager wait.BackoffManager
// resync周期
resyncPeriod time.Duration
ShouldResync func() bool
clock clock.Clock
paginatedResult bool
// 最新一次看到的资源版本号
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
复制代码

核心方法:Run

核心逻辑包括:

  • 调用List方法获取资源对象下所有的数据
  • 将资源数据转换为资源对象列表
  • 将资源信息存储到DeltaFIFO中,全量替换本地缓存
  • 调用Watch方法监听资源
  • 调用watchHandler函数,处理watch到的各种事件
// Run函数
func (r *Reflector) Run(stopCh <-chan struct{}) {
...
wait.BackoffUntil(func() {
// 核心函数:ListAndWatch
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
...
}
复制代码

ListAndWatch方法

// ListAndWatch函数
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
if err := func() error {
...
go func() {
...
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// 调用List方法获取资源对象下所有的数据
return r.listerWatcher.List(opts)
}))
...
}()
...
// 获取资源版本号
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 将资源数据转换为资源对象列表
items, err := meta.ExtractList(list)
// 将资源信息存储到DeltaFIFO中,全量替换本地缓存
// 内部调用了replace方法
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// 设置最新的资源版本号
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
go func() {
...
for {
...
// 同步资源
if r.ShouldResync == nil || r.ShouldResync() {
// 调用DeltaFIFO的Resync方法
if err := r.store.Resync(); err != nil {
...
}
}
resyncCh, cleanup = r.resyncChan()
}
}()
for {
...
// 监听资源
w, err := r.listerWatcher.Watch(options)
// 处理handler事件,用户注册的Add,Delete,Update函数
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
...
}
}
// syncWith函数
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 调用cache.Replace
return r.store.Replace(found, resourceVersion)
}
// cache.Replace
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
复制代码

watchHandler方法

watchHandler函数,处理watch到的各种事件,所有的事件都存放在ResultChan中,包括:事件类型,资源对象

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
for {
select {
...
// 获取watch接口中的事件的channel
case event, ok := <-w.ResultChan():
...
switch event.Type {
// 处理Add函数
case watch.Added:
// store是DeltaFIFO类
err := r.store.Add(event.Object)
// 处理Modified函数
case watch.Modified:
err := r.store.Update(event.Object)
// 处理Deleted函数
case watch.Deleted:
err := r.store.Delete(event.Object)
}
*resourceVersion = newResourceVersion
// 设置资源版本
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
复制代码

DeltaFIFO

后面的源码分析会用到DeltaFIFO,这里先介绍一下。

DeltaFIFO用来存储Watch API返回的各种事件,队列中会存在拥有不同操作类型的同一个资源对象 DeltaFIFO实现了Queue接口,Queue继承了Store接口

源码路径:vendor/k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// map结构存储:key是资源对象的key,value是对象的Deltas数组
items map[string]Deltas
// 存储资源对象的key
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// Index本地存储对象
knownObjects KeyListerGetter
closed     bool
closedLock sync.Mutex
}
复制代码

核心功能包括:

  • 生产者方法
  • 消费者方法
  • Resync机制

生产者方法

Reflector监听到资源变化后,将Add、Delete、Update等资源变更信息加入到DeltaFIFO。也就是队列的生产者,方法如下,内部都调用了

入口函数是 r.store.Add(event.Object),在前面的watchHandler介绍过

func (f *DeltaFIFO) Add(obj interface{}) error {
...
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
...
return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
...
return f.queueActionLocked(Deleted, obj)
}
// 举例说明其中一个处理函数,其他的类似,内部都调用queueActionLocked
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 内部调用queueActionLocked
return f.queueActionLocked(Updated, obj)
}
复制代码

queueActionLocked方法

// 内部主要是封装Delta事件,并加入队列,供消费者消费(HandleDeltas函数)
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 根据资源对象,得到key,一般是 namespace/name 格式
id, err := f.KeyOf(obj)
// 将watch到的事件类型和资源对象,封装成Delta对象
newDeltas := append(f.items[id], Delta{actionType, obj})
// 去重操作
newDeltas = dedupDeltas(newDeltas)
// 将Delta对象加入到队列中
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
复制代码

消费者方法-processLoop

前面的源码分析,分析完了Reflector的Run方法,下一步便是controller的processLoop方法

func (c *controller) Run(stopCh <-chan struct{}) {
// 调用NewReflector初始化一个Reflector
// 必须传入ListWatcher数据接口对象
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// 调用Reflector的Run方法,启动监控,并处理监控事件
wg.StartWithChannel(stopCh, r.Run)
// processLoop负责从DeltaFIFO取出数据并消费
wait.Until(c.processLoop, time.Second, stopCh)
}
复制代码
func (c *controller) processLoop() {
for {
// 从DeltaFIFO队列中取出数据,并交给process处理
// process函数保存在config.Process中,也就是前面传入的 HandleDeltas
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
...
}
}
复制代码

Pop方法

DeltaFIFO的消费方法为Pop,该函数需要传入process函数,用于接收并处理对象的回调方法

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
for {
for len(f.queue) == 0 {
// 当队列为空时,Pop函数阻塞住,知道新的数据入队列才唤醒
// 如果Close函数被调用,closed状态被设置,并且广播
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 走到这里,说明队列中有数据,取出数据
id := f.queue[0]
...
// 将数据交给上层回调函数处理
err := process(item)
// 出错则将数据重新放入队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
复制代码

process函数:HandleDeltas

  • Run方法中传入一个回调函数 HandleDeltas
  • processLoop内部执行的pop对象就是上面传入的HandleDeltas

核心逻辑包括:

  • 更新本地缓存cacheStorage,其实就是更新 threadSafeMap 这个数据结构
  • 将事件通知到所有的listener,其实就是往listener的addCh中放入数据,供消费者消费
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// 获取所有的Delta资源
for _, d := range obj.(Deltas) {
// 判断资源类型
switch d.Type {
// 如果是下列类型,将资源存储到Indexer
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// indexer的实现类是前面介绍过的cache
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 将资源对象分发至 SharedInformer 的事件处理函数中
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
// 更新本地缓存cacheStorage
// 其实就是更新 threadSafeMap 这个数据结构,threadSafeMap的初始化在前面介绍过
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
// distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
...
if sync {
for _, listener := range p.syncingListeners {
// 核心方法
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// listener.add方法,这里将事件添加到listener的addCh通道中
// 至此,也回答的前面的问题-- 谁往p.addCh中生产数据
func (p *processorListener) add(notification interface{}) {
// 不同更新类型的对象加入到channel中
// 供给processorListener的Run方法使用
p.addCh <- notification
}
复制代码

Resync机制

ListAndWatch方法中的三步:

  • List
  • Rsync
  • Watch

RSync负责将Indexer本地存储的资源对象同步到DeltaFIFO中,并设置资源类型为Sync类型。在Reflector中定时执行

func (f *DeltaFIFO) Resync() error {
// 获取indexer本地存储对象
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
复制代码

Indexer

前面的分析中说到,资源的变更会保存到本地的Indexer,这里介绍一下。

  • Indexer是client-go用来存储资源对象,并自带索引功能的本地存储
  • Reflector从DeltaFIFO中消费出来的资源对象存储至Indexer
  • Indexer数据与Etcd一致,client-go可以方便的从本地读取,减轻api-server的压力

四个重要的数据结构

源码位置:k8s.io/client-go/tools/cache/index.go

// 存储缓存数据
// type Empty struct{}
// type String map[string]Empty
// 这里的sets.String是用map模拟set,map中的value都是空结构体
type Index map[string]sets.String
// 索引器函数,接收资源对象,返回检索结果列表
type IndexFunc func(obj interface{}) ([]string, error)
// 存储索引器,key为索引器名称,value为索引器实现函数
type Indexers map[string]IndexFunc
// 存储缓存器,key为缓存器名称,value为缓存数据
type Indices map[string]Index
复制代码

ThreadSafeMap

Indexer基于ThreadSafeMap做了封装,先看一下ThreadSafeMap

  • ThreadSafeMap是一个内存存储,数据不会存入磁盘
  • 增删改查都会加锁,保证数据一致性
  • 内部用到了索引器、缓存器

源码位置:k8s.io/tools/cache/thread_safe_store.go

type threadSafeMap struct {
lock  sync.RWMutex
// map结构存储资源数据
// map中的key是通过keyFunc函数计算得出,默认使用MetaNamespaceFunc函数
// 该函数根据资源对象计算出<namespace>/<name>格式的key
// value是Delta对象,包括Type和Object资源对象
items map[string]interface{}
// 存储索引器,key为索引器名称,value为索引器实现函数
indexers Indexers
// 存储缓存器,key为缓存器名称,value为缓存的资源对象数据
indices Indices
}
// 通过执行索引器函数得到索引结果
// 需要两个参数:索引器名称、需要检索的key
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
// 查找指定的索引器函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 查找指定的缓存器函数
index := c.indices[indexName]
// 从缓存数据中查找并返回数据
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
复制代码

总结

Informer机制在k8s中占据重要的角色,它的源码也是非常的复杂。学习的过程中一定要配合文章开始的那个图,否则很容易就绕进去了。里面使用Queue和Channel来解耦各个组件。个人的一个心得是:围绕着一个核心思路,可能分析起来逻辑会清晰一点,那就是:谁往channel放了数据,谁又从channel取了数据。

2020年8月台积电营收超过40亿美元 再创新高

上一篇

游戏本电脑性价比排行2020年哪款好?即将发布的TA实力超强

下一篇

你也可能喜欢

k8s源码分析- Informer机制

长按储存图像,分享给朋友