client-go informers 机制中的Indexer索引解析

informers机制整理流程

image-20250614103412669

以下结合 Kubernetes client-go 源码中的数据结构,详细说明 Indexer 如何存储数据。我们将通过一个具体示例展示内存中的数据结构和索引关系,源码基于 k8s.io/client-go@v1.28.0


Indexer解析

Indexer 是 Informers 机制中缓存层的核心索引引擎,它通过多级索引实现高效的对象检索。

Indexer 在数据流中的关键作用:

  1. 持久化存储:作为本地缓存存储从 API Server 获取的资源对象
  2. 索引加速:提供基于自定义索引的快速检索能力
  3. 事件分发基础:为事件处理器提供查询上下文

一、核心数据结构定义

路径:client-go/tools/cache/thread_safe_store.go

1. 基础存储结构

Indexer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Indexer 扩展了 Store,支持多重索引(index)功能,
// 并限制每个累加器(accumulator)仅保存当前对象(在 Delete 后为空)。
//
// 这里涉及三种字符串:
// 1. 存储键(storage key),由 Store 接口定义;
// 2. 索引名称(index name);
// 3. “索引值”(indexed value),由 IndexFunc 生成,
// 可以是字段值或从对象中计算出的任意字符串。
type Indexer interface {
Store
// Index:返回与给定对象的索引值集合有交集的存储对象(通过 indexName 指定索引)。
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys:返回对象的 storage key(键),这些对象的 indexName 索引包含给定的 indexedValue。
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues:返回指定 index 的所有索引值。
ListIndexFuncValues(indexName string) []string
// ByIndex:与 IndexKeys 类似,但直接返回对象。
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers:返回所有已注册的 IndexFunc。
GetIndexers() Indexers
// AddIndexers:为当前 Store 添加新的索引器(支持动态添加索引)。
AddIndexers(newIndexers Indexers) error
}

Cache 和 threadSafeMap

cache实现了Indexer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// `*cache` 是 Indexer 接口的实现,底层基于 ThreadSafeStore 和指定的 KeyFunc。
type cache struct {
// cacheStorage 是线程安全的底层存储结构,负责并发访问控制。
cacheStorage ThreadSafeStore

// keyFunc 用于为存入或取出的对象生成 key,要求必须是确定性的(同一个对象应始终生成相同的 key)。
keyFunc KeyFunc
}


type threadSafeMap struct {
lock sync.RWMutex // 读写锁
items map[string]interface{} // 主存储: key=对象键, value=对象本体

indexers Indexers // 索引函数集
indices Indices // 索引树结构
}

2. 索引结构定义

1
2
3
4
5
6
7
8
9
10
11
// Indexers:将索引名称映射为对应的索引函数(IndexFunc)
type Indexers map[string]IndexFunc // 索引名 → 索引函数

// Indices:将索引名称映射为对应的实际索引结构(一个值到对象 key 集合的映射)
type Indices map[string]Index // 索引名 → 索引数据

// Index 映射某个索引值(如命名空间名)到与该值匹配的所有对象键(key)集合。
type Index map[string]sets.Set[string] // 索引值 → 对象键集合

// 索引函数原型 (如示例中的命名空间索引)
type IndexFunc func(obj interface{}) ([]string, error)

总结:indexer最终存储实现是靠底层的threadSafeMap,那相对于直接查找主存储的map,threadSafeMap是如何通过索引找到实际的存储对象呢?下面通过一个实例说明threadSafeMap利用索引实现的存储结构与流程。

二、示例数据存储解析

假设存储以下对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Pod 1
metadata:
name: web-1
namespace: default
labels:
env: production
tier: frontend

# Pod 2
metadata:
name: db-1
namespace: database
labels:
env: production
tier: backend

# Pod 3
metadata:
name: web-2
namespace: default
labels:
env: staging
tier: frontend

数据存储后的内存结构

1. 基础存储 (items)

1
2
3
4
5
items = map[string]interface{}{
"default/web-1": &Pod{metadata: {name: "web-1", namespace: "default"}},
"database/db-1": &Pod{metadata: {name: "db-1", namespace: "database"}},
"default/web-2": &Pod{metadata: {name: "web-2", namespace: "default"}},
}

2. 索引函数 (indexers)

1
2
3
4
5
6
7
8
9
10
indexers = Indexers{
"namespace": func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
return []string{pod.Namespace}, nil
},
"tier": func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
return []string{pod.Labels["tier"]}, nil
},
}

3. 索引树结构 (indices)

1
2
3
4
5
6
7
8
9
10
indices = Indices{
"namespace": Index{ // Namespace 索引树
"default": sets.Set[string]{"default/web-1", "default/web-2"},
"database": sets.Set[string]{"database/db-1"},
},
"tier": Index{ // Tier 标签索引树
"frontend": sets.Set[string]{"default/web-1", "default/web-2"},
"backend": sets.Set[string]{"database/db-1"},
},
}

工作流程示例(以 namespace 索引为例)

image-20250614202507005

三、索引操作源码解析

1. 添加对象时的索引更新

路径:client-go/tools/cache/thread_safe_store.go

说明:下述代码为源码流程简化说明,完整源码请查看路径中的实际源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *threadSafeMap) updateIndices(oldObj, newObj interface{}, key string) {
// 遍历所有注册的索引函数
for name, indexFunc := range c.indexers {
var oldIndexValues, newIndexValues []string

// 1. 计算新旧对象的索引值
if oldObj != nil {
oldIndexValues, _ = indexFunc(oldObj) // 旧索引值(用于清理)
}
if newObj != nil {
newIndexValues, _ = indexFunc(newObj) // 新索引值
}

// 2. 获取或创建索引树
index := c.indices[name]
if index == nil {
index = make(Index) // 创建索引树
c.indices[name] = index
}

// 3. 清理旧索引关联
for _, value := range oldIndexValues {
c.deleteKeyFromIndex(key, value, index)
}

// 4. 添加新索引关联
for _, value := range newIndexValues {
c.addKeyToIndex(key, value, index)
}
}
}

**索引添加细节:addKeyToIndex() **

1
2
3
4
5
6
7
8
func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
set = sets.New[string]() // 创建空集合
index[indexValue] = set
}
set.Insert(key) // 将对象键加入集合
}

设计亮点:使用 sets.Set (实现自 k8s.io/apimachinery/pkg/util/sets) 确保:

  • 去重存储
  • O(1)时间复杂度的添加/删除
  • 集合运算能力(交集/并集)

2. 查询操作 ByIndex()

路径:client-go/tools/cache/thread_safe_store.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

// 获取索引树
index := c.indices[indexName]

// 获取该索引值对应的键集合
set := index[indexedValue]

list := make([]interface{}, 0, set.Len())
for key := range set {
// 从主存储获取真实对象
list = append(list, c.items[key])
}
return list, nil
}

索引查询结果示例

  1. 查询所有 default 命名空间的 Pod
1
2
pods, _ := indexer.ByIndex("namespace", "default")
// 返回 [web-1, web-2] 两个对象
  1. 查询所有 frontend Pod
1
2
pods, _ := indexer.ByIndex("tier", "frontend")
// 返回 [web-1, web-2]

3.索引删除实现

对象删除时的索引清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()

obj, exists := c.items[key]
if !exists {
return
}

// 从所有索引中删除
for name, index := range c.indices {
indexValues, _ := c.indexers[name](obj) // 计算索引值
for _, value := range indexValues {
set := index[value]
if set != nil {
set.Delete(key) // 删除键

// 如果集合为空则删除整个索引项
if set.Len() == 0 {
delete(index, value)
}
}
}
}

// 从主存储删除
delete(c.items, key)
}

4.多值索引处理

当索引函数返回多个值时:

1
2
3
4
5
6
7
8
9
// 扩展索引函数支持多标签值
indexers["labels"] = func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
labels := []string{}
for k, v := range pod.Labels {
labels = append(labels, k+"="+v)
}
return labels, nil
}

更新后的索引树结构:

1
2
3
4
5
6
indices["labels"] = Index{
"env=production": sets.New("default/web-1", "database/db-1"),
"env=staging": sets.New("default/web-2"),
"tier=frontend": sets.New("default/web-1", "default/web-2"),
"tier=backend": sets.New("database/db-1"),
}

查询示例:

1
2
3
// 查询所有生产环境pod
indexer.ByIndex("labels", "env=production")
// 返回 [web-1, db-1]

5.设计特点总结

  1. 三级索引结构
    索引名 → 索引值 → 对象键集合

    image-20250614205305656

  2. 存储优化原则

    • 对象实体只存一份在 items
    • 索引仅存储键名(而非对象指针)
    • 空集合自动回收(内存优化)
  3. 数据去重存储
    核心对象仅存储在 items 中,索引仅存储键

  4. 惰性计算
    索引值在添加/删除时实时计算

  5. 并发安全
    通过细粒度读写锁 (sync.RWMutex) 保证一致性

实际内存布局示例:

1
2
3
4
5
6
7
8
9
10
11
12
Indexer
├── items: {"key1":obj1, "key2":obj2} (主存储)
├── indexers
│ ├── "namespace": namespaceIndexFunc (索引函数)
│ └── "tier": tierIndexFunc
└── indices
├── "namespace":
│ ├── "default": [key1, key3] (索引值->键集合)
│ └── "database": [key2]
└── "tier":
├── "frontend": [key1, key3]
└── "backend": [key2]

这种设计使得 Kubernetes 控制器能够快速查询:

1
2
3
// 典型控制器实现中的查询
pods, err := indexer.ByIndex("namespace", "default")
services := indexer.ByIndex("service", "order-service")

这种 O(1) 复杂度的查询能力,正是 Kubernetes 支撑万级对象规模的关键基础设施。