以下是基于 Kubernetes client-go 的 Informer 创建与启动流程的深度源码解析,分析版本为 v0.26.0:

一、完整创建启动时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sequenceDiagram
participant C as ClientSet
participant L as ListerWatcher
participant I as SharedIndexInformer
participant D as DeltaFIFO
participant R as Reflector
participant S as Indexer

C->>L: 创建ListWatch对象
L->>I: NewSharedIndexInformer()
I->>D: 初始化DeltaFIFO(KeyFunc)
I->>S: 创建ThreadSafeStore
I->>I: 注册默认索引(namespace)
I->>R: 创建Reflector(ListerWatcher, DeltaFIFO)
I->>I: 启动processor监听
note over I: 此时未真正运行

activate I
I->>R: Run()启动Reflector
R->>D: 开始ListAndWatch
D->>S: 首次全量同步
deactivate I

par 事件处理循环
R->>D: 持续推送watch事件
D->>I: 消费Delta队列
I->>S: 更新索引存储
I->>H: 分发事件到Handler
end

二、分阶段源码剖析

1. 创建阶段 (informer.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
// 关键数据结构初始化
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: resyncPeriod,
}

// DeltaFIFO构造函数
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: sharedIndexInformer.indexer,
})

// 创建控制器配置
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
Process: sharedIndexInformer.HandleDeltas, // 核心回调
...
}

// 创建Reflector
sharedIndexInformer.controller = New(cfg)
return sharedIndexInformer
}

关键点

  • MetaNamespaceKeyFunc:生成namespace/name格式的键
  • KnownObjects:将Indexer注入DeltaFIFO用于删除验证
  • 默认注册namespace索引:可通过indexers参数覆盖

2. 启动阶段 (controller.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

// 启动Reflector
go func() {
r.Run(stopCh) // 启动ListAndWatch
}()

// 消费队列主循环
wait.Until(c.processLoop, time.Second, stopCh)
}

func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}

3. 首次同步 (reflector.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (r *Reflector) list() error {
list, err := r.listerWatcher.List(options)

items, err := meta.ExtractList(list)
if err := meta.SetList(list, items); err != nil {
return err
}

// 全量插入DeltaFIFO
if err := r.syncWith(items, r.resourceVersion); err != nil {
return err
}

r.setLastSyncResourceVersion(resourceVersion)
return nil
}

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}

return r.store.Replace(found, resourceVersion) // 替换DeltaFIFO内容
}

数据流

1
2
3
4
5
graph LR
API-Server -->|List响应| Reflector
Reflector -->|Replace| DeltaFIFO
DeltaFIFO -->|Pop Deltas| Informer
Informer -->|Add| Indexer

4. 持续监听 (reflector.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, ...) error {
eventCount := 0
for {
select {
case <-stopCh:
return nil
case event, ok := <-w.ResultChan():
if !ok {
return errors.New("unexpected watch close")
}

switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
// 更新resourceVersion但不处理
case watch.Error:
return apierrs.FromObject(event.Object)
}

eventCount++
r.setLastSyncResourceVersion(newResourceVersion)
}
}
}

三、关键路径性能优化

1. 批次处理 (delta_fifo.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// 批量取出一个key的所有deltas
id := f.queue[0]
item := f.items[id]
delete(f.items, id)
f.queue = f.queue[1:]

// 执行合并处理
err := process(item) // 传入的process是HandleDeltas
if err != nil {
f.addIfNotPresent(id, item) // 出错则重新入队
}
return item, err
}

2. 事件合并策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (d *Deltas) newest() DeltaType {
if n := len(*d); n > 0 {
return (*d)[n-1].Type
}
return ""
}

// 去重逻辑
func dedupDeltas(deltas Deltas) Deltas {
if len(deltas) < 2 {
return deltas
}

// 删除重复的Added/Deleted
if a, b := deltas[len(deltas)-2], deltas[len(deltas)-1];
a.Type == Deleted && b.Type == Added {
return append(deltas[:len(deltas)-2], b)
}
return deltas
}

3. 内存复用池 (对象缓存)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var objectPool = sync.Pool{
New: func() interface{} {
return make([]runtime.Object, 0, 16) // 预分配slice
},
}

func processItems(items []runtime.Object) {
// 使用对象池减少GC压力
buffer := objectPool.Get().([]runtime.Object)
defer objectPool.Put(buffer[:0])

for _, item := range items {
buffer = append(buffer, item)
}
// 处理buffer内容...
}

四、工业级应用注意事项

1. 资源泄漏防护

1
2
3
4
5
6
7
8
9
10
11
// 必须确保调用的Stop()
func RunInformer(ctx context.Context, informer cache.SharedInformer) {
defer runtime.HandleCrash()

informer.Run(ctx.Done())

// 清理资源
if s, ok := informer.(Stoppable); ok {
s.Stop()
}
}

2. 控制器最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func NewController(informer cache.SharedInformer) *Controller {
queue := workqueue.NewRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
)

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
// 过滤未变更事件
if old.(*v1.Pod).ResourceVersion == new.(*v1.Pod).ResourceVersion {
return
}
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
})

return &Controller{informer: informer, queue: queue}
}

3. 监控指标埋点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var (
informerLag = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "informer_event_lag_seconds",
Help: "Delay of event processing",
Buckets: []float64{0.001, 0.01, 0.1, 1, 10},
},
[]string{"informer_type"},
)

informerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "informer_sync_errors_total",
Help: "Total count of informer errors",
},
[]string{"informer_type", "error_type"},
)
)

五、典型问题解决方案

1. 处理Watch断开

1
2
3
4
5
6
7
8
9
10
11
12
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
for {
if err := r.watchHandler(start, w, &resourceVersion, ...); err != nil {
if isExpiredError(err) {
// 资源版本过期,必须全量List
r.setLastSyncResourceVersion("")
return nil
}
return err
}
}
}

2. Indexer数据不一致

1
2
3
4
5
6
7
8
9
10
11
12
func VerifyIndexerConsistency(indexer Indexer) error {
list := indexer.List()
for _, indexName := range indexer.GetIndexers() {
switch v := indexer.(type) {
case cache.ThreadSafeStore:
if err := v.VerifyConsistency(indexName); err != nil {
return fmt.Errorf("index %s inconsistent: %v", indexName, err)
}
}
}
return nil
}

这种设计保证了以下特性:

  • 冷启动优化:首次全量同步时内存增长平缓
  • 事件有序性:严格依赖 resourceVersion 的单调递增
  • 失败隔离:单个Handler崩溃不会影响其他控制器
  • 水平扩展:可通过SharedInformerFactory创建多个独立Informer

生产环境建议通过以下监控指标确保稳定性:

指标名称 健康阈值 告警动作
informer_sync_lag <5s 检查API Server负载
watch_disconnects <10/min 检查网络连接
queue_depth <1000 扩容Worker