深入K8S Job(二):job controller源码分析

k8s version: v1.11.0

author: lbl167612@alibaba-inc.com

源码流程图

job controller流程图

JobController 结构

路径:pkg/controller/job/job_controller.go

type JobController struct {  
    // 访问 kube-apiserver 的client
    // 需要查询 job、pod 等元数据信息
    kubeClient clientset.Interface
    // pod 控制器,用于创建和删除pod使用
    podControl controller.PodControlInterface
    // 用于更新 job status
    updateHandler func(job *batch.Job) error
    // job controller 核心接口,用于 sync job
    syncHandler   func(jobKey string) (bool, error)
    // job controller 在启动时会对 job & pod 先进行同步
    // 用于判断是否已经对 pod 同步过
    podStoreSynced cache.InformerSynced
    // 用于判断是否已经对 job 同步过
    jobStoreSynced cache.InformerSynced
    // expectations cache,记录该job下pods的adds & dels次数,
    // 并提供接口进行调整,已达到期望值。
    expectations controller.ControllerExpectationsInterface

    // jobLister 用于获取job元数据及根据pod的labels来匹配jobs
    // 该controller 会使用到的接口如下:
    // 1. GetPodJobs(): 用于根据pod反推jobs
    // 2. Get(): 根据namespace & name 获取job 元数据
    jobLister batchv1listers.JobLister

    // podStore 提供了接口用于获取指定job下管理的所有pods
    podStore corelisters.PodLister

    // Jobs queue
    // job controller通过kubeClient watch jobs & pods的数据变更,
    // 比如add、delete、update,来操作该queue。
    // 并启动相应的worker,调用syncJob处理该queue中的jobs。
    queue workqueue.RateLimitingInterface
    // jobs的相关events,通过该recorder进行广播
    recorder record.EventRecorder
}

startJobController()

路径:cmd/kube-controller-manager/app/batch.go

startJobController() 是启动 job controller 的入口函数,该函数会注册到 kube-controller-manager 组件的 NewControllerInitializers() 接口中。
具体的 kube-controller-manager 组件的启动实现可以自己看下相关代码,这里先只关注 job controller 的实现。

func startJobController(ctx ControllerContext) (bool, error) {  
    // 在启动job controller之前,判断下job 是否有配置生效
    // 用户可以在创建k8s clusters时,通过修改kube-apiserver --runtime-config配置想要生效的 resource
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] {
        return false, nil
    }
    // 初始化 JobController结构,并Run
    // Run的时候指定了gorutinue的数量,每个gorutinue 就是一个worker
    go job.NewJobController(
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Batch().V1().Jobs(),
        ctx.ClientBuilder.ClientOrDie("job-controller"),
    ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
    return true, nil
}

NewJobController()

路径:pkg/controller/job/job_controller.go

func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {  
    // 初始化event broadcaster
    // 用于该controller 发送job 相关的events
    eventBroadcaster := record.NewBroadcaster()
    // 注册打印event信息的function
    // eventBroadcaster.StartEventWatcher()会创建gorutinue并开始watch event,
    // 根据注册的eventHandler轮询处理每个event,这里就是通过glog.Infof打印日志
    eventBroadcaster.StartLogging(glog.Infof)
    // EventSinkImpl 包含了一个EventInterface, 实现了Create/Update/Delete/Get/Watch/Patch..等等操作
    // 这一步跟上面一样,也是通过eventBroadcaster.StartEventWatcher() 注册了EventInterface实现,
    // 用来从指定的eventBroadcaster接收event,并发送给指定的接收器。
    // k8s event实现可以单独进行源码分析,值得学习下。
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    // kubernetes 内部的限流策略
    // 对apiserver来说,每个controller及scheduler都是client,所以内部的限流策略也至关重要。 
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    // 初始化JobController
    jm := &JobController{
        // 连接kube-apiserver的client
        kubeClient: kubeClient,
        // podControl,用于manageJob()中创建和删除pod
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
        },
        // 维护的期望状态下的Pod Cache,并且提供了修正该Cache的接口
        // 比如会存jobs 下pods 的adds & dels 值,并提供了接口修改这两个值。
        expectations: controller.NewControllerExpectations(),
        // jobs queue, 后面会创建对应数量的workers 从该queue 中处理各个jobs。
        queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
        // event recorder,用于发送job 相关的events
        recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    }
    // 注册jobInformer 的Add、Update、Delete 函数
    // 该controller 获取到job 的Add、Update、Delete事件之后,会调用对应的function
  // 这些function 的核心还是去操作了上面的queue,让syncJob 处理queue 中的jobs
    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
    // 上面结构中已经有介绍
    jm.jobLister = jobInformer.Lister()
    jm.jobStoreSynced = jobInformer.Informer().HasSynced

    // 注册 podInformer 的Add、Update、Delete 函数
  // job 最终是依托了pod 去运行,所以相关的pods 事件也需要关心。
  // 该podInformer 会监听所有的pods 变更事件,所以函数中都会去判断该pod 的containerRef是否是“job”,
    // 如果是的话再更新对应的expectations & queue, 触发syncJob进行处理。
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addPod,
        UpdateFunc: jm.updatePod,
        DeleteFunc: jm.deletePod,
    })
    // 上面结构中已经有介绍
    jm.podStore = podInformer.Lister()
    jm.podStoreSynced = podInformer.Informer().HasSynced

    // 注册更新job status的函数
    jm.updateHandler = jm.updateJobStatus
    // 注册sync job handler
    // 核心实现
    jm.syncHandler = jm.syncJob

    return jm
}

Run()

路径:pkg/controller/job/job_controller.go

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {  
    defer utilruntime.HandleCrash()
    defer jm.queue.ShutDown()

    glog.Infof("Starting job controller")
    defer glog.Infof("Shutting down job controller")

    // 每次启动都会先等待Job & Pod cache 是否有同步过,即指queue是否已经同步过数据,
    // 因为每个worker干的活都是从queue中获取,所以只有queue有数据才应该继续往下创建worker。
    if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
        return
    }

    // 创建指定数量的gorutinue
    // 每个gorutinue 执行worker,每个worker 执行完了之后sleep 1s,然后继续循环执行
    for i := 0; i < workers; i++ {
        go wait.Until(jm.worker, time.Second, stopCh)
    }

    <-stopCh
}

看下具体的worker 实现:

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {  
    for jm.processNextWorkItem() {
    }
}

func (jm *JobController) processNextWorkItem() bool {  
    // 从queque 中获取job key
    // key 构成: namespace + "/" + name 
    key, quit := jm.queue.Get()
    if quit {
        return false
    }
    defer jm.queue.Done(key)

    // 调用初始化时注册的 syncJob()
    // 如果执行成功,且forget = true, 则从queue 中删除该 key。
    forget, err := jm.syncHandler(key.(string))
    if err == nil {
        if forget {
            jm.queue.Forget(key)
        }
        return true
    }
    // 如果syncJob() 出错, 则打印出错信息
    // 该utilruntime.HandleError() 会记录最近一次的错误时间点并进行限速,防止频繁打印错误信息。
    utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
    // 如果syncJob() 出错,则把该job key 继续丢回queue 中, 等待下次sync。
    jm.queue.AddRateLimited(key)

    return true
}

syncJob()

worker的关键就是调用了syncJob,下面继续看下该函数具体做了什么:

func (jm *JobController) syncJob(key string) (bool, error) {  
    // 惯用招数,看下每次sync 花了多久
    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
    }()

    // 把key 拆分成job namespace & name
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return false, err
    }
    if len(ns) == 0 || len(name) == 0 {
        return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
    }
    // 获取job 信息
    // 如果没有找到该job的话,表示已经被删除,并从ControllerExpectations中删除该key
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            glog.V(4).Infof("Job has been deleted: %v", key)
            jm.expectations.DeleteExpectations(key)
            return true, nil
        }
        return false, err
    }
    job := *sharedJob

    // 根据job.Status.Conditions是否处于“JobComplete” or "JobFailed", 来判断该job 是否已经完成。
    // 如果已经完成的话,直接return
    if IsJobFinished(&job) {
        return true, nil
    }

    // 根据该 job key 失败的次数来计算该job 已经重试的次数。
    // job 默认会有6次的重试机会
    previousRetry := jm.queue.NumRequeues(key)

    // 判断该key 是否需要调用manageJob()进行sync,条件如下:
    // 1. 该key 在ControllerExpectations中的adds和dels 都 <= 0
    // 2. 该key 在ControllerExpectations中已经超过5min没有更新了
    // 3. 该key 在ControllerExpectations中没有查到
    // 4. 调用GetExpectations()接口失败
    jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

    // 获取该job管理的所有pods
    pods, err := jm.getPodsForJob(&job)
    if err != nil {
        return false, err
    }

    // 获取处于active 的pods
    activePods := controller.FilterActivePods(pods)
    // 获取active & succeeded & failed pods数量
    active := int32(len(activePods))
    succeeded, failed := getStatus(pods)
    conditions := len(job.Status.Conditions)
    // 看下该job是否是第一次启动,是的话,设置StartTime;
    // 并判断是否设置了job.Spec.ActiveDeadlineSeconds, 如果设置了的话,在ActiveDeadlineSeconds秒后,在将该key 丢入queue
    if job.Status.StartTime == nil {
        now := metav1.Now()
        job.Status.StartTime = &now
        // enqueue a sync to check if job past ActiveDeadlineSeconds
        if job.Spec.ActiveDeadlineSeconds != nil {
            glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
                key, *job.Spec.ActiveDeadlineSeconds)
            jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
        }
    }

    var manageJobErr error
    jobFailed := false
    var failureReason string
    var failureMessage string

    // 确认该job是否有新的pod failed
    jobHaveNewFailure := failed > job.Status.Failed
    // 确认重试次数是否有超出预期值
    exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
        (int32(previousRetry)+1 > *job.Spec.BackoffLimit)

    // 如果job重试的次数超过了job.Spec.BackoffLimit(默认是6次),则标记该job为failed并指明原因;
    // 计算job重试的次数,还跟job中的pod template设置的重启策略有关,如果设置成“RestartPolicyOnFailure”,
    // job重试的次数 = 所有pods InitContainerStatuses 和 ContainerStatuses 的RestartCount 之和,
    // 也需要判断这个重试次数是否超过 BackoffLimit;
    if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
        jobFailed = true
        failureReason = "BackoffLimitExceeded"
        failureMessage = "Job has reached the specified backoff limit"
    // 如果job 运行的时间超过了ActiveDeadlineSeconds,则标记该job为failed并指明原因
    } else if pastActiveDeadline(&job) {
        jobFailed = true
        failureReason = "DeadlineExceeded"
        failureMessage = "Job was active longer than specified deadline"
    }

    // 如果job failed,则并发等待所有active pods删除结束;
    // 修改job.Status.Conditions, 并且根据之前记录的失败信息发送event
    if jobFailed {
        errCh := make(chan error, active)
        jm.deleteJobPods(&job, activePods, errCh)
        select {
        case manageJobErr = <-errCh:
            if manageJobErr != nil {
                break
            }
        default:
        }

        failed += active
        active = 0
        job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
        jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
    // 如果job 没有标记为failed
    } else {
        // 根据之前判断的job是否需要sync,且该job 还未被删除,则调用mangeJob()。
        // manageJob() 后面单独解析
        if jobNeedsSync && job.DeletionTimestamp == nil {
            active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
        }
        completions := succeeded
        complete := false
        // job.Spec.Completions 表示该job只有成功创建这些数量的pods,才算完成。
        // 如果该值没有设置,表示只要其中有一个pod 成功过,该job 就算完成了,
        // 但是需要注意,如果当前还有正在运行的pods,则需要等待这些pods都退出,才能标记该job完成任务了。
        if job.Spec.Completions == nil {
            if succeeded > 0 && active == 0 {
                complete = true
            }
        // 如果设置了Completions值,只要该job下成功创建的pods数量 >= Completions,该job就成功结束了。
        // 还需要发送一些异常events, 比如已经达到要求的成功创建的数量后,还有处于active的pods;
        // 或者成功的次数 > 指定的次数,这些应该都是预期之外的事件。
        } else {
            if completions >= *job.Spec.Completions {
                complete = true
                if active > 0 {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
                }
                if completions > *job.Spec.Completions {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
                }
            }
        }
        // 如果job成功结束,则更新job.Status.Conditions && job.Status.CompletionTime
        if complete {
            job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
            now := metav1.Now()
            job.Status.CompletionTime = &now
        }
    }

    forget := false
    // 如果这次有成功的pod 产生,则forget 该次job key
    if job.Status.Succeeded < succeeded {
        forget = true
    }

    // 更新job.Status
    if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
        job.Status.Active = active
        job.Status.Succeeded = succeeded
        job.Status.Failed = failed
        // 更新job失败的话,将该job key继续丢入queue中。
        if err := jm.updateHandler(&job); err != nil {
            return forget, err
        }
        // 如果这次job 有新的pod failed,且该job还未完成,则继续把该job key丢入queue中
        if jobHaveNewFailure && !IsJobFinished(&job) {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }
        // 否则forget job
        forget = true
    }

    return forget, manageJobErr
}

manageJob()

在syncJob()中有个关键函数 manageJob(),它主要做的事情就是根据 job 配置的并发数来确认当前处于 active 的 pods 数量是否合理,如果不合理的话则进行调整。 具体实现如下:

func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {  
    var activeLock sync.Mutex
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    // 获取job key, 根据 namespace + "/" + name进行拼接。
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
        return 0, nil
    }

    var errCh chan error
    // 如果处于active pods 大于job设置的并发数,则并发删除超出部分的active pods。
    // 需要注意的是,需要删除的active pods是有一定的优先级的:
    // not-ready < ready;unscheduled < scheduled;pending < running。
    // 先基于上面的优先级对activePods 进行排序,然后再从头执行删除操作。
    // 如果删除pods失败,则需要回滚之前设置的ControllerExpectations 和 active 值。
    if active > parallelism {
        diff := active - parallelism
        errCh = make(chan error, diff)
        jm.expectations.ExpectDeletions(jobKey, int(diff))
        glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
        sort.Sort(controller.ActivePods(activePods))

        active -= diff
        wait := sync.WaitGroup{}
        wait.Add(int(diff))
        for i := int32(0); i < diff; i++ {
            go func(ix int32) {
                defer wait.Done()
                if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
                    defer utilruntime.HandleError(err)
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
                    jm.expectations.DeletionObserved(jobKey)
                    activeLock.Lock()
                    active++
                    activeLock.Unlock()
                    errCh <- err
                }
            }(i)
        }
        wait.Wait()
    // 如果active pods少于设置的并发值,则先计算diff值,具体的计算跟Completions和Parallelism的配置有关。
    // 1.job.Spec.Completions == nil && succeeded pods > 0, 则diff = 0;
    // 2.job.Spec.Completions == nil && succeeded pods = 0,则diff = Parallelism;
    // 3.job.Spec.Completions != nil 则diff等于(job.Spec.Completions - succeeded - active)和parallelism中的最小值(非负值);
    // 计算好diff值即知道了还需要创建多少pods,由于等待创建的pods数量可能会非常庞大,所以这里有个分批创建的逻辑:
    // 第一批创建1个,第二批创建2个,后续按2的倍数继续往下分批创建,但是每次创建的数量都不会大于diff值(diff值每次都会减掉对应的分批数量)。
    // 如果创建pod超时,则直接return;
    // 如果创建pod失败,则回滚ControllerExpectations的adds 和 active 值,并不在执行后续未执行的 pods.
    } else if active < parallelism {
        wantActive := int32(0)
        if job.Spec.Completions == nil {
            if succeeded > 0 {
                wantActive = active
            } else {
                wantActive = parallelism
            }
        } else {
            wantActive = *job.Spec.Completions - succeeded
            if wantActive > parallelism {
                wantActive = parallelism
            }
        }
        diff := wantActive - active
        if diff < 0 {
            utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
            diff = 0
        }
        jm.expectations.ExpectCreations(jobKey, int(diff))
        errCh = make(chan error, diff)
        glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

        active += diff
        wait := sync.WaitGroup{}

        // 分批创建 diff 数量的 pods
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
            errorCount := len(errCh)
            wait.Add(int(batchSize))
            for i := int32(0); i < batchSize; i++ {
                go func() {
                    defer wait.Done()
                    err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                    if err != nil && errors.IsTimeout(err) {
                        return
                    }
                    if err != nil {
                        defer utilruntime.HandleError(err)
                        glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                        jm.expectations.CreationObserved(jobKey)
                        activeLock.Lock()
                        active--
                        activeLock.Unlock()
                        errCh <- err
                    }
                }()
            }
            wait.Wait()

            // 如果这次分批创建pods有失败的情况,则不在处理后续未执行的pods
            // 需要计算剩余未执行的pods数量,并更新 ControllerExpectations 的 adds 和 active 值
            skippedPods := diff - batchSize
            if errorCount < len(errCh) && skippedPods > 0 {
                glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
                active -= skippedPods
                for i := int32(0); i < skippedPods; i++ {
                    jm.expectations.CreationObserved(jobKey)
                }
                break
            }
            diff -= batchSize
        }
    }

    select {
    case err := <-errCh:
        // 只要前面有错误产生,则返回出错并会将该job 继续丢入queue,等待下次sync
        if err != nil {
            return active, err
        }
    default:
    }

    return active, nil
}

整个job controller实现流程到这里就结束了,后面会继续分析cronJob controller的源码实现!

luobingli

Read more posts by this author.

Subscribe to The Terminus Blog

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!