cilium ipam机制源码分析
cilium作为k8s cni插件,提供了ipam的机制,可用来给pod分配ip地址,具体配置可参考官网,其中Kubernetes Host Scope表示每个node的cidr由k8s来分配,node上每个pod的ip由cilium-agent来分配;Cluster Scope为默认的ipam方式,每个node的cidr由cilium-operator来分配,并更新到ciliumnode crd中,node上每个pod的ip仍然由cilium-agent来分配,其他几种模式为云厂商提供的。
本文主要学习cilium中ipam相关的代码流程。
cilium会运行两个进程cilium-operator和cilium-agent,其中cilium-operator作为deployment运行,cilium-agent作为daemonset运行在每个node上。当然cilium作为k8s的cni,还会提供cilium-cni的binary文件,当创建pod的网络时,会调用cilium-cni来给pod分配ip,配置路由等。
cilium-operator ipam
cilium-operator初始化
cilium-operator初始化时会注册它需要的crd资源,包括ciliumnode等
//operator/main.go
rootCmd = &cobra.Command{initEnv()//解析配置文件// Prepopulate option.Config with options from CLI.option.Config.Populate()operatorOption.Config.Populate()runOperator()
}func runOperator()// Configure API server for the operator.srv, err := api.NewServer(shutdownSignal, k8sInitDone, getAPIServerAddr()...)go func() {err = srv.WithStatusCheckFunc(checkStatus).StartServer()if err != nil {log.WithError(err).Fatalf("Unable to start operator apiserver")}}()//注册cilium的crd,cilium-agent会等待crd资源创建成功后才会往下执行client.RegisterCRDs()CreateCustomResourceDefinitions(k8s.APIExtClient())resourceToCreateFnMapping := map[string]crdCreationFn{synced.CRDResourceName(k8sconstv2.CNPName): createCNPCRD,synced.CRDResourceName(k8sconstv2.CCNPName): createCCNPCRD,synced.CRDResourceName(k8sconstv2.CNName): createNodeCRD,synced.CRDResourceName(k8sconstv2.CIDName): createIdentityCRD,synced.CRDResourceName(k8sconstv2.CEPName): createCEPCRD,synced.CRDResourceName(k8sconstv2.CEWName): createCEWCRD,synced.CRDResourceName(k8sconstv2.CLRPName): createCLRPCRD,synced.CRDResourceName(k8sconstv2alpha1.CENPName): createCENPCRD,synced.CRDResourceName(k8sconstv2alpha1.CESName): createCESCRD,synced.CRDResourceName(k8sconstv2alpha1.CCECName): createCCECCRD,synced.CRDResourceName(k8sconstv2alpha1.CECName): createCECCRD,synced.CRDResourceName(k8sconstv2alpha1.BGPPName): createBGPPCRD,synced.CRDResourceName(k8sconstv2alpha1.BGPPoolName): createBGPPoolCRD,}for _, r := range synced.AllCRDResourceNames() {fn, ok := resourceToCreateFnMapping[r]if !ok {log.Fatalf("Unknown resource %s. Please update pkg/k8s/apis/cilium.io/client to understand this type.", r)}g.Go(func() error {return fn(clientset)})}return g.Wait()leaderelection.RunOrDie()onOperatorStartLeading
onOperatorStartLeading根据配置的ipam模式进行ipam分配器的初始化,并启动对ciliumnode资源的监听
func onOperatorStartLeading(ctx context.Context)//根据config.ipam的配置进行初始化,很显然如果模式为kubernetes就不用初始化了switch ipamMode := option.Config.IPAM; ipamMode {case ipamOption.IPAMAzure, ipamOption.IPAMENI, ipamOption.IPAMClusterPool, ipamOption.IPAMClusterPoolV2, ipamOption.IPAMAlibabaCloud://1. 根据ipammode找出allocatorProvidersalloc, providerBuiltin := allocatorProviders[ipamMode]if !providerBuiltin {log.Fatalf("%s allocator is not supported by this version of %s", ipamMode, binaryName)}//2. 初始化alloc.Init(ctx)//3. 启动nm, err := alloc.Start(ctx, &ciliumNodeUpdateImplementation{})nodeManager = nm}//4. 启动对ciliumnode资源的监听startSynchronizingCiliumNodes(ctx, nodeManager, withKVStore)
- 注册 allocatorProviders
//ipam分配器都是在init中注册的,这里只关注cluster-pool的
//operator/provider_operator_register.go
func init() {allocatorProviders[ipamOption.IPAMClusterPool] = &clusterpool.AllocatorOperator{}allocatorProviders[ipamOption.IPAMClusterPoolV2] = &clusterpool.AllocatorOperator{}
}//pkg/ipam/allocator/clusterpool/clusterpool.go
// AllocatorOperator is an implementation of IPAM allocator interface for Cilium
// IPAM.
type AllocatorOperator struct {v4CIDRSet, v6CIDRSet []podcidr.CIDRAllocator
}
- 初始化ipam分配器
// Init sets up Cilium allocator based on given options
func (a *AllocatorOperator) Init(ctx context.Context) error {if option.Config.EnableIPv4 {if len(operatorOption.Config.ClusterPoolIPv4CIDR) == 0 {return fmt.Errorf("%s must be provided when using ClusterPool", operatorOption.ClusterPoolIPv4CIDR)}//根据配置文件中指定的 cluster-pool-ipv4-cidr: 10.0.0.0/8 和 cluster-pool-ipv4-mask-size: "24" 进行初始化,//其中cluster-pool-ipv4-cidr指定了基础cidr,cluster-pool-ipv4-mask-size指定了给每个node分配的cidr掩码长度,//上面的例子中,给第一个node分配的cidr为10.0.0.0/24,第二个node的cidr为10.0.1.0/24,以此类推,最多可以给6万多//node分配,当然也可以在cluster-pool-ipv4-cidr中指定多个,比如10.0.0.0/8,11.0.0.0/8,12.0.0.0/8v4Allocators, err := newCIDRSets(false, operatorOption.Config.ClusterPoolIPv4CIDR, operatorOption.Config.NodeCIDRMaskSizeIPv4)if err != nil {return fmt.Errorf("unable to initialize IPv4 allocator %w", err)}a.v4CIDRSet = v4Allocators} else if len(operatorOption.Config.ClusterPoolIPv4CIDR) != 0 {return fmt.Errorf("%s must not be set if IPv4 is disabled", operatorOption.ClusterPoolIPv4CIDR)}...
}func newCIDRSets(isV6 bool, strCIDRs []string, maskSize int) ([]podcidr.CIDRAllocator, error) {cidrAllocators := make([]podcidr.CIDRAllocator, 0, len(strCIDRs))for _, strCIDR := range strCIDRs {addr, cidr, err := net.ParseCIDR(strCIDR)if err != nil {return nil, err}// Check if CIDRs collide with each other.for _, cidrAllocator := range cidrAllocators {if cidrAllocator.InRange(cidr) {return nil, &ErrCIDRColision{cidr: strCIDR,allocator: cidrAllocator,}}}cidrSet, err := newCIDRSet(isV6, addr, cidr, maskSize)cidrAllocators = append(cidrAllocators, cidrSet)}return cidrAllocators, nil
}func newCIDRSet(isV6 bool, addr net.IP, cidr *net.IPNet, maskSize int) (podcidr.CIDRAllocator, error) {...return cidrset.NewCIDRSet(cidr, maskSize)
}// NewCIDRSet creates a new CidrSet.
func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) (*CidrSet, error) {clusterMask := clusterCIDR.MaskclusterMaskSize, _ := clusterMask.Size()isV6 := clusterCIDR.IP.To4() == nilvar maxCIDRs intif isV6 {...} else if subNetMaskSize > maxSubNetMaskSizeIPv4 {return nil, ErrSubNetMaskSizeInvalid}//maxCIDRs为: 24-8=16, 2的16次方为65536(已上面的配置举例)maxCIDRs = 1 << uint32(subNetMaskSize-clusterMaskSize)return &CidrSet{clusterCIDR: clusterCIDR,clusterIP: clusterCIDR.IP,clusterMaskSize: clusterMaskSize,maxCIDRs: maxCIDRs,subNetMaskSize: subNetMaskSize,isV6: isV6,}, nil
}
- 启动ipam分配器
主要是启动了trigger,当cilium-operator监听的ciliumnode资源事件后,会触发trigger来执行syncToK8s,syncToK8s会给ciliumnode分配cidr
/ Start kicks of Operator allocation.
func (a *AllocatorOperator) Start(ctx context.Context, updater ipam.CiliumNodeGetterUpdater) (allocator.NodeEventHandler, error) {log.WithFields(logrus.Fields{logfields.IPv4CIDRs: operatorOption.Config.ClusterPoolIPv4CIDR,logfields.IPv6CIDRs: operatorOption.Config.ClusterPoolIPv6CIDR,}).Info("Starting ClusterPool IP allocator")nodeManager := podcidr.NewNodesPodCIDRManager(a.v4CIDRSet, a.v6CIDRSet, updater, iMetrics)return nodeManager, nil
}//pkg/ipam/allocator/podcidr/podcidr.go
func NewNodesPodCIDRManager(v4Allocators, v6Allocators []CIDRAllocator,nodeGetter ipam.CiliumNodeGetterUpdater,triggerMetrics trigger.MetricsObserver) *NodesPodCIDRManager {n := &NodesPodCIDRManager{nodesToAllocate: map[string]*v2.CiliumNode{},v4CIDRAllocators: v4Allocators,v6CIDRAllocators: v6Allocators,nodes: map[string]*nodeCIDRs{},ciliumNodesToK8s: map[string]*ciliumNodeK8sOp{},k8sReSyncController: controller.NewManager(),}// Have a trigger so that multiple calls, within a second, to sync with k8s// will result as it was a single call being made.t, err := trigger.NewTrigger(trigger.Parameters{MinInterval: time.Second,TriggerFunc: func([]string) {// Trigger execute UpdateController multiple times so that we// keep retrying the sync against k8s in case of failure.n.k8sReSyncController.UpdateController("update-cilium-nodes-pod-cidr",controller.ControllerParams{DoFunc: func(context.Context) error {n.Mutex.Lock()defer n.Mutex.Unlock()return syncToK8s(nodeGetter, n.ciliumNodesToK8s)},RunInterval: updateK8sInterval,},)},MetricsObserver: triggerMetrics,Name: "update-cilium-nodes-pod-cidr",})n.k8sReSync = treturn n
}
- 启动的ciliumnode资源的监听
func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.NodeEventHandler, withKVStore bool) error...if nodeManager != nil {nodeManagerSyncHandler = syncHandlerConstructor(func(name string) {nodeManager.Delete(name)},func(node *cilium_v2.CiliumNode) {// node is deep copied before it is stored in pkg/aws/eninodeManager.Update(node)})}...resourceEventHandler = cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err != nil {log.WithError(err).Warning("Unable to process CiliumNode Add event")return}if nodeManager != nil {nodeManagerQueue.Add(key)}...},...}//创建ciliumNodeInformer,用来listwatch ciliumnode资源事件,//当监听到事件后,调用resourceEventHandler处理,如果是添加事件,则将obj添加到队列 nodeManagerQueuevar ciliumNodeInformer cache.ControllerciliumNodeStore, ciliumNodeInformer = informer.NewInformer(cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),cilium_v2.CNPluralName, v1.NamespaceAll, fields.Everything()),&cilium_v2.CiliumNode{},0,resourceEventHandler,ciliumNodeConvertFunc,)//启动协程,从队列 nodeManagerQueue 中获取obj,并对obj执行 nodeManagerSyncHandlergo func() {cache.WaitForCacheSync(wait.NeverStop, ciliumNodeInformer.HasSynced)close(k8sCiliumNodesCacheSynced)log.Info("CiliumNodes caches synced with Kubernetes")// Only handle events if nodeManagerSyncHandler is not nil. If it is nil// then there isn't any event handler set for CiliumNodes events.if nodeManagerSyncHandler != nil {go func() {// infinite loop. run in a go routine to unblock code executionfor processNextWorkItem(nodeManagerQueue, nodeManagerSyncHandler) {//执行 nodeManagerSyncHandler,即 nodeManager.Update(NodesPodCIDRManager.Update)syncHandler(key.(string))}}()}...}//启动ciliumNodeInformergo ciliumNodeInformer.Run(wait.NeverStop)
给node分配cidr
当监听到ciliumnode资源添加事件后,会调用 nodeManager.Update 给ciliumnode分配cidr,并更新到ciliumnode
func (n *NodesPodCIDRManager) Update(node *v2.CiliumNode) bool {n.Mutex.Lock()defer n.Mutex.Unlock()return n.update(node)
}// Needs n.Mutex to be held.
func (n *NodesPodCIDRManager) update(node *v2.CiliumNode) bool {var (updateStatus, updateSpec boolcn *v2.CiliumNodeerr error)if option.Config.IPAMMode() == ipamOption.IPAMClusterPoolV2 {cn, updateSpec, updateStatus, err = n.allocateNodeV2(node)if err != nil {return false}} else {// FIXME: This code block falls back to the old behavior of clusterpool,// where we only assign one pod CIDR for IPv4 and IPv6. Once v2 becomes// fully backwards compatible with v1, we can remove this else block.var allocated bool//给node分配cidrcn, allocated, updateStatus, err = n.allocateNode(node)if err != nil {return false}// if allocated is false it means that we were unable to allocate// a CIDR so we need to update the status of the node into k8s.updateStatus = !allocated && updateStatus// ClusterPool v1 never updates both the spec and the statusupdateSpec = !updateStatus}if cn == nil {// no-opreturn true}if updateStatus {// the n.syncNode will never fail because it's only adding elements to a// map.// NodesPodCIDRManager will later on sync the node into k8s by the// controller defined, which keeps retrying to create the node in k8s// until it succeeds.// If the resource version is != "" it means the object already exists// in kubernetes so we should perform an update status instead of a create.if cn.GetResourceVersion() != "" {n.syncNode(k8sOpUpdateStatus, cn)} else {n.syncNode(k8sOpCreate, cn)}}//将分配的cidr更新到ciliumnode资源中,可通过kubectl describe cn xxx来查看if updateSpec {// If the resource version is != "" it means the object already exists// in kubernetes so we should perform an update instead of a create.if cn.GetResourceVersion() != "" {n.syncNode(k8sOpUpdate, cn)} else {n.syncNode(k8sOpCreate, cn)}}return true
}// syncNode adds the given node to the map of nodes that need to be synchronized
// with kubernetes and triggers a new resync.
// Needs n.Mutex to be held.
func (n *NodesPodCIDRManager) syncNode(op k8sOp, ciliumNode *v2.CiliumNode) {n.ciliumNodesToK8s[ciliumNode.GetName()] = &ciliumNodeK8sOp{ciliumNode: ciliumNode,op: op,}//触发 syncToK8s 的执行n.k8sReSync.Trigger()
}func syncToK8s(nodeGetterUpdater ipam.CiliumNodeGetterUpdater, ciliumNodesToK8s map[string]*ciliumNodeK8sOp) (retErr error) {for nodeName, nodeToK8s := range ciliumNodesToK8s {var (err, err2 errornewCiliumNode *v2.CiliumNodelog = log.WithFields(logrus.Fields{"node-name": nodeName,}))switch nodeToK8s.op {case k8sOpCreate:// Try creating the node_, err = nodeGetterUpdater.Create(nodeToK8s.ciliumNode)case k8sOpUpdate:var updatedNode *v2.CiliumNodeupdatedNode, err = nodeGetterUpdater.Update(nil, nodeToK8s.ciliumNode)...}...}
}//operator/cilium_node.go
func (c *ciliumNodeUpdateImplementation) Create(node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {return ciliumK8sClient.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{})
}func (c *ciliumNodeUpdateImplementation) Update(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {if origNode == nil || !origNode.Spec.DeepEqual(&node.Spec) {return ciliumK8sClient.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{})}return nil, nil
}
cilium-agent ipam
ipam相关初始化流程
func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointmanager.EndpointManager, dp datapath.Datapath) (*Daemon, *endpointRestoreState, error)//等待cilium operator创建所有的crd资源d.k8sWatcher.WaitForCRDsToRegister(d.ctx);//如果把operator停了,同时把ciliumnode crd删了,重启cilium agent pod后,会卡在这里等待ciliumnode被创建,//由此可证明,ciliumnode等crd资源是由cilium-operator创建的level=info msg="Waiting until all Cilium CRDs are available" subsys=k8s//如果ipam模式为clusterpool,则创建/更新ciliumnode资源,此时还没有cidr,cilium-operator监听到ciliumnode资源事件后,会//分配cidr并更新ciliumnode资源if option.Config.IPAM == ipamOption.IPAMClusterPool || option.Config.IPAM == ipamOption.IPAMClusterPoolV2 {// Create the CiliumNode custom resource. This call will block until// the custom resource has been createdd.nodeDiscovery.UpdateCiliumNodeResource()log.WithField(logfields.Node, nodeTypes.GetName()).Info("Creating or updating CiliumNode resource")ciliumClient := k8s.CiliumClient()n.mutateNodeResource(nodeResource)switch option.Config.IPAM {case ipamOption.IPAMClusterPool, ipamOption.IPAMClusterPoolV2:// We want to keep the podCIDRs untouched in these IPAM modes because// the operator will verify if it can assign such podCIDRs.// If the user was running in non-IPAM Operator mode and then switched// to IPAM Operator, then it is possible that the previous cluster CIDR// from the old IPAM mode differs from the current cluster CIDR set in// the operator.// There is a chance that the operator won't be able to allocate these// podCIDRs, resulting in an error in the CiliumNode status.default:nodeResource.Spec.IPAM.PodCIDRs = []string{}if cidr := node.GetIPv4AllocRange(); cidr != nil {nodeResource.Spec.IPAM.PodCIDRs = append(nodeResource.Spec.IPAM.PodCIDRs, cidr.String())}if cidr := node.GetIPv6AllocRange(); cidr != nil {nodeResource.Spec.IPAM.PodCIDRs = append(nodeResource.Spec.IPAM.PodCIDRs, cidr.String())}}}k8s.WaitForNodeInformation(d.ctx, d.k8sWatcher)n := waitForNodeInformation(ctx, nodeGetter, nodeName)//nodeRetrievalMaxRetries = 15 尝试获取15次for retry := 0; retry < nodeRetrievalMaxRetries; retry++ {n, err := retrieveNodeInformation(ctx, nodeGetter, nodeName)//如果ipam模式为clusterpool,则从ciliumnode获取cidr信息if option.Config.IPAM == ipamOption.IPAMClusterPool || option.Config.IPAM == ipamOption.IPAMClusterPoolV2 {ciliumNode, err := CiliumClient().CiliumV2().CiliumNodes().Get(ctx, nodeName, v1.GetOptions{})no := nodeTypes.ParseCiliumNode(ciliumNode)n = &nolog.WithField(logfields.NodeName, n.Name).Info("Retrieved node information from cilium node")} else {//如果ipam模式为kubernetes,则从k8s node获取cidr信息k8sNode, err := nodeGetter.GetK8sNode(ctx, nodeName)nodeInterface := ConvertToNode(k8sNode)typesNode := nodeInterface.(*slim_corev1.Node)n = ParseNode(typesNode, source.Unspec)log.WithField(logfields.NodeName, n.Name).Info("Retrieved node information from kubernetes node")}//n.IPv4AllocCIDR为空,说明cn的cidr还未被cilium-operator更新,需要等待一段时间继续获取if requireIPv4CIDR && n.IPv4AllocCIDR == nil {return nil, fmt.Errorf("required IPv4 PodCIDR not available")}return n, nil//如果获取失败,则等待一段时间继续尝试获取if err != nil {log.WithError(err).Warning("Waiting for k8s node information")backoff.Wait(ctx)continue}return n}//将获取的cidr保存到全局变量 addrsuseNodeCIDR(n)if n.IPv4AllocCIDR != nil && option.Config.EnableIPv4 {node.SetIPv4AllocRange(n.IPv4AllocCIDR)addrsMu.Lock()addrs.ipv4AllocRange = net.DeepCopy()addrsMu.Unlock()}// Configure IPAM without using the configuration yet.d.configureIPAM()// Start IPAMd.startIPAM()// Set up ipam conf after init() because we might be running d.conf.KVStoreIPv4Registrationd.ipam = ipam.NewIPAM(d.datapath.LocalNodeAddressing(), option.Config, d.nodeDiscovery, d.k8sWatcher, &d.mtuConfig)ipam := &IPAM{nodeAddressing: nodeAddressing,config: c,owner: map[string]string{},expirationTimers: map[string]string{},blacklist: IPBlacklist{ips: map[string]string{},},}switch c.IPAMMode() {case ipamOption.IPAMKubernetes, ipamOption.IPAMClusterPool:log.WithFields(logrus.Fields{//AllocationCIDR 返回 node.GetIPv4AllocRange() return addrs.ipv4AllocRange.DeepCopy()//最终返回全局变量 addrs 中的v4地址logfields.V4Prefix: nodeAddressing.IPv4().AllocationCIDR(),logfields.V6Prefix: nodeAddressing.IPv6().AllocationCIDR(),}).Infof("Initializing %s IPAM", c.IPAMMode())if c.IPv4Enabled() {ipam.IPv4Allocator = newHostScopeAllocator(nodeAddressing.IPv4().AllocationCIDR().IPNet)cidrRange, err := ipallocator.NewCIDRRange(n)a := &hostScopeAllocator{allocCIDR: n,allocator: cidrRange,}return a}...}
客户端请求ipam地址
比如cilium-cni的cmdAdd -> allocateIPsWithCiliumAgent -> client.IPAMAllocate(“”, podName, true)
// IPAMAllocate allocates an IP address out of address family specific pool.
func (c *Client) IPAMAllocate(family, owner string, expiration bool) (*models.IPAMResponse, error) {params := ipam.NewPostIpamParams().WithTimeout(api.ClientTimeout)resp, err := c.Ipam.PostIpam(params)if err != nil {return nil, Hint(err)}return resp.Payload, nil
服务器响应ipam请求
//daemon/cmd/ipam.go
// Handle incoming requests address allocation requests for the daemon.
func (h *postIPAM) Handle(params ipamapi.PostIpamParams) middleware.Responder {ipv4Result, ipv6Result, err := h.daemon.ipam.AllocateNextWithExpiration(family, owner, expirationTimeout)ipv4Result, ipv6Result, err = ipam.AllocateNext(family, owner)ipv4Result, err = ipam.AllocateNextFamily(IPv4, owner)ipam.allocateNextFamily(family, owner, needSyncUpstream)switch family {case IPv6:allocator = ipam.IPv6Allocatorcase IPv4:allocator = ipam.IPv4Allocator//调用 hostScopeAllocator 的 AllocateNextresult, err = allocator.AllocateNext(owner)resp := &models.IPAMResponse{HostAddressing: node.GetNodeAddressing(),Address: &models.AddressPair{},}func (h *hostScopeAllocator) AllocateNext(owner string) (*AllocationResult, error) {ip, err := h.allocator.AllocateNext()return &AllocationResult{IP: ip}, nil
}// AllocateNext reserves one of the IPs from the pool. ErrFull may
// be returned if there are no addresses left.
func (r *Range) AllocateNext() (net.IP, error) {offset, ok, err := r.alloc.AllocateNext()if err != nil {return nil, err}if !ok {return nil, ErrFull}return addIPOffset(r.base, offset), nil
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
