k8s 代码走读---client-go 编程之 informers

| 2020年8月14日

前言

根据我们社区制定的计划,本周是开始走读 client-go 中的 informers 模块了,但是无奈这周时间是相当的不充裕,公司内的事情也突然多了几当子要紧急赶工的事情,另外就是准备 GIAC 和和社区的同仁们组织云原生社区深圳站的交流,接着 GIAC 的风也顺便把这些天南地北的同仁们聚到了一起。所以这部分代码的走读计划还是有所延误了,但是还是要走起。

informers 机制介绍

Informer (就是 SharedInformer)是 client-go 的重要组成部分,在了解 client-go 之前,了解一下 Informer 的实现是很有必要的,下面引用了官方的图,可以看到 Informer 在 client-go 中的位置。 主要使用到 Informer 和 workqueue 两个核心组件。Controller 可以有一个或多个 informer 来跟踪某一个 resource。Informter 跟 API server 保持通讯获取资源的最新状态并更新到本地的 cache 中,一旦跟踪的资源有变化,informer 就会调用 callback。把关心的变更的 Object 放到 workqueue 里面。然后 woker 执行真正的业务逻辑,计算和比较 workerqueue 里 items 的当前状态和期望状态的差别,然后通过 client-go 向 API server 发送请求,直到驱动这个集群向用户要求的状态演化。

这里写了一个测试代码,主要是监控 deployment。

package main

import (
	"flag"
	"fmt"
	"log"
	"path/filepath"
	"time"

	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	// "k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/homedir"
)

// func demo(config *rest.Config, namespace string) {
func main() {
	var namespace = "default"
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Println(err)
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute)
	informer := informerFactory.Apps().V1().Deployments()
	informer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    onAdd,
			UpdateFunc: onUpdate,
			DeleteFunc: onDelete,
		})
	lister := informer.Lister()

	stopCh := make(chan struct{})
	defer close(stopCh)
	informerFactory.Start(stopCh)
	if !cache.WaitForCacheSync(stopCh, informer.Informer().HasSynced) {
		return
	}

	deployments, err := lister.Deployments(namespace).List(labels.Everything())
	if err != nil {
		panic(err)
	}
	for _, deployment := range deployments {
		fmt.Printf("%s\r\n", deployment.Name)
	}
	<-stopCh
}

func onAdd(obj interface{}) {
	deployment := obj.(*v1.Deployment)
	fmt.Printf("onAdd:%s\r\n", deployment.Name)
}

func onUpdate(old, new interface{}) {
	oldDeployment := old.(*v1.Deployment)
	newDeployment := new.(*v1.Deployment)
	fmt.Printf("onUpdate:%s to %s\r\n", oldDeployment.Name, newDeployment.Name)
}

func onDelete(obj interface{}) {
	deployment := obj.(*v1.Deployment)
	fmt.Printf("onDelete:%s\r\n", deployment.Name)
}