client-go informers 机制中的Indexer索引解析
以下结合 Kubernetes client-go 源码中的数据结构,详细说明 Indexer 如何存储数据。我们将通过一个具体示例展示内存中的数据结构和索引关系,源码基于 k8s.io/client-go@v1.28.0
。
Indexer解析 Indexer 是 Informers 机制中缓存层的核心索引引擎,它通过多级索引实现高效的对象检索。
Indexer 在数据流中的关键作用:
持久化存储:作为本地缓存存储从 API Server 获取的资源对象
索引加速:提供基于自定义索引的快速检索能力
事件分发基础:为事件处理器提供查询上下文
一、核心数据结构定义 路径:client-go/tools/cache/thread_safe_store.go
1. 基础存储结构 Indexer接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 type Indexer interface { Store Index(indexName string , obj interface {}) ([]interface {}, error ) IndexKeys(indexName, indexedValue string ) ([]string , error ) ListIndexFuncValues(indexName string ) []string ByIndex(indexName, indexedValue string ) ([]interface {}, error ) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
Cache 和 threadSafeMap cache实现了Indexer接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc } type threadSafeMap struct { lock sync.RWMutex items map [string ]interface {} indexers Indexers indices Indices }
2. 索引结构定义 1 2 3 4 5 6 7 8 9 10 11 type Indexers map [string ]IndexFunc type Indices map [string ]Index type Index map [string ]sets.Set[string ] type IndexFunc func (obj interface {}) ([]string , error )
总结:indexer最终存储实现是靠底层的threadSafeMap,那相对于直接查找主存储的map,threadSafeMap是如何通过索引找到实际的存储对象呢?下面通过一个实例说明threadSafeMap利用索引实现的存储结构与流程。
二、示例数据存储解析 假设存储以下对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 metadata: name: web-1 namespace: default labels: env: production tier: frontend metadata: name: db-1 namespace: database labels: env: production tier: backend metadata: name: web-2 namespace: default labels: env: staging tier: frontend
数据存储后的内存结构 1. 基础存储 (items
) 1 2 3 4 5 items = map [string ]interface {}{ "default/web-1" : &Pod{metadata: {name: "web-1" , namespace: "default" }}, "database/db-1" : &Pod{metadata: {name: "db-1" , namespace: "database" }}, "default/web-2" : &Pod{metadata: {name: "web-2" , namespace: "default" }}, }
2. 索引函数 (indexers
) 1 2 3 4 5 6 7 8 9 10 indexers = Indexers{ "namespace" : func (obj interface {}) ([]string , error ) { pod := obj.(*v1.Pod) return []string {pod.Namespace}, nil }, "tier" : func (obj interface {}) ([]string , error ) { pod := obj.(*v1.Pod) return []string {pod.Labels["tier" ]}, nil }, }
3. 索引树结构 (indices
) 1 2 3 4 5 6 7 8 9 10 indices = Indices{ "namespace" : Index{ "default" : sets.Set[string ]{"default/web-1" , "default/web-2" }, "database" : sets.Set[string ]{"database/db-1" }, }, "tier" : Index{ "frontend" : sets.Set[string ]{"default/web-1" , "default/web-2" }, "backend" : sets.Set[string ]{"database/db-1" }, }, }
工作流程示例(以 namespace 索引为例)
三、索引操作源码解析 1. 添加对象时的索引更新 路径:client-go/tools/cache/thread_safe_store.go
说明:下述代码为源码流程简化说明,完整源码请查看路径中的实际源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (c *threadSafeMap) updateIndices(oldObj, newObj interface {}, key string ) { for name, indexFunc := range c.indexers { var oldIndexValues, newIndexValues []string if oldObj != nil { oldIndexValues, _ = indexFunc(oldObj) } if newObj != nil { newIndexValues, _ = indexFunc(newObj) } index := c.indices[name] if index == nil { index = make (Index) c.indices[name] = index } for _, value := range oldIndexValues { c.deleteKeyFromIndex(key, value, index) } for _, value := range newIndexValues { c.addKeyToIndex(key, value, index) } } }
**索引添加细节:addKeyToIndex()
**
1 2 3 4 5 6 7 8 func (c *threadSafeMap) addKeyToIndex(key, indexValue string , index Index) { set := index[indexValue] if set == nil { set = sets.New[string ]() index[indexValue] = set } set.Insert(key) }
设计亮点 :使用 sets.Set
(实现自 k8s.io/apimachinery/pkg/util/sets
) 确保:
去重存储
O(1)时间复杂度的添加/删除
集合运算能力(交集/并集)
2. 查询操作 ByIndex()
路径:client-go/tools/cache/thread_safe_store.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (c *threadSafeMap) ByIndex(indexName, indexedValue string ) ([]interface {}, error ) { c.lock.RLock() defer c.lock.RUnlock() index := c.indices[indexName] set := index[indexedValue] list := make ([]interface {}, 0 , set.Len()) for key := range set { list = append (list, c.items[key]) } return list, nil }
索引查询结果示例
查询所有 default
命名空间的 Pod
1 2 pods, _ := indexer.ByIndex("namespace" , "default" )
查询所有 frontend
Pod
1 2 pods, _ := indexer.ByIndex("tier" , "frontend" )
3.索引删除实现 对象删除时的索引清理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (c *threadSafeMap) Delete(key string ) { c.lock.Lock() defer c.lock.Unlock() obj, exists := c.items[key] if !exists { return } for name, index := range c.indices { indexValues, _ := c.indexers[name](obj) for _, value := range indexValues { set := index[value] if set != nil { set.Delete(key) if set.Len() == 0 { delete (index, value) } } } } delete (c.items, key) }
4.多值索引处理 当索引函数返回多个值时:
1 2 3 4 5 6 7 8 9 indexers["labels" ] = func (obj interface {}) ([]string , error ) { pod := obj.(*v1.Pod) labels := []string {} for k, v := range pod.Labels { labels = append (labels, k+"=" +v) } return labels, nil }
更新后的索引树结构:
1 2 3 4 5 6 indices["labels" ] = Index{ "env=production" : sets.New("default/web-1" , "database/db-1" ), "env=staging" : sets.New("default/web-2" ), "tier=frontend" : sets.New("default/web-1" , "default/web-2" ), "tier=backend" : sets.New("database/db-1" ), }
查询示例:
1 2 3 indexer.ByIndex("labels" , "env=production" )
5.设计特点总结
三级索引结构 索引名 → 索引值 → 对象键集合
存储优化原则
对象实体只存一份在 items
索引仅存储键名(而非对象指针)
空集合自动回收(内存优化)
数据去重存储 核心对象仅存储在 items
中,索引仅存储键
惰性计算 索引值在添加/删除时实时计算
并发安全 通过细粒度读写锁 (sync.RWMutex
) 保证一致性
实际内存布局示例:
1 2 3 4 5 6 7 8 9 10 11 12 Indexer ├── items: {"key1":obj1, "key2":obj2} (主存储) ├── indexers │ ├── "namespace": namespaceIndexFunc (索引函数) │ └── "tier": tierIndexFunc └── indices ├── "namespace": │ ├── "default": [key1, key3] (索引值->键集合) │ └── "database": [key2] └── "tier": ├── "frontend": [key1, key3] └── "backend": [key2]
这种设计使得 Kubernetes 控制器能够快速查询:
1 2 3 pods, err := indexer.ByIndex("namespace" , "default" ) services := indexer.ByIndex("service" , "order-service" )
这种 O(1) 复杂度的查询能力,正是 Kubernetes 支撑万级对象规模的关键基础设施。