Kubernetes 源码解析 - HPA 水平自动伸缩如何工作

Kubernetes 源码解析 - HPA 水平自动伸缩如何工作

HPA - Horizontal Pod Autoscaler 的缩写,Pod 水平自动伸缩。通过对 Pod 负载的监控,来自动增加或者减少 Pod 的副本数量。

从字面意思来看,其主要包含了两部分:

  • 监控 Pod 的负载
  • 控制 Pod 的副本数量

那具体是如何实现的呢?以下基于1.17 源码,来分析下 HPA 如何工作。

注意:文章中的代码在源码的基础上进行了精简:删掉了注释、序列化等信息,或保留了部分核心代码,加上新的注释。

资源

HPA 的资源是HorizontalPodAutoscaler,在v1版本中,只支持基于 CPU 指标的计算;在v2beta2版本中加入了基于内存和自定义指标的计算。

v1

//staging/src/k8s.io/api/autoscaling/v1/types.go
type HorizontalPodAutoscaler struct {
	metav1.TypeMeta 
	metav1.ObjectMeta 
	Spec HorizontalPodAutoscalerSpec 
	Status HorizontalPodAutoscalerStatus 
}
type HorizontalPodAutoscalerSpec struct {
	ScaleTargetRef CrossVersionObjectReference //监控的目标资源
	MinReplicas *int32 //最小副本数
	MaxReplicas int32  //最大副本数
	TargetCPUUtilizationPercentage *int32  //触发调整的CPU 使用率
}

v2

//staging/src/k8s.io/api/autoscaling/v2beta2/types.go
type HorizontalPodAutoscaler struct {
	metav1.TypeMeta 
	metav1.ObjectMeta
	Spec HorizontalPodAutoscalerSpec
	Status HorizontalPodAutoscalerStatus 
}
type HorizontalPodAutoscalerSpec struct {
	ScaleTargetRef CrossVersionObjectReference //监控的目标资源
	MinReplicas *int32 
	MaxReplicas int32
	Metrics []MetricSpec //新加入的自定义指标
}
type MetricSpec struct {
	Type MetricSourceType //指标源的类型:Object(基于某个对象)、Pods(基于pod 数)、Resource(基于资源使用计算,比如v1 版本中cpu)、External(基于外部的指标)。对应 MetricsClient 接口的四个方法
	Object *ObjectMetricSource  //对应 Object 类型的指标源
	Pods *PodsMetricSource //对应 Pod 类型的指标源
	Resource *ResourceMetricSource  //对应 Resource 类型的指标源
	External *ExternalMetricSource  //对应 External 类型的指标源
}
type ObjectMetricSource struct { 
	DescribedObject CrossVersionObjectReference  //目标对象
	Target MetricTarget  //指定指标的目标值、平均值或者平均使用率
	Metric MetricIdentifier  //指标标识:名字、label选择器
}
type PodsMetricSource struct { 
	Metric MetricIdentifier 
	Target MetricTarget 
}
type ResourceMetricSource struct {
	Name v1.ResourceName 
	Target MetricTarget 
}
type ExternalMetricSource struct {
	Metric MetricIdentifier
	Target MetricTarget
}
type MetricTarget struct {
	Type MetricTargetType //类型:Utilization、Value、AverageValue
	Value *resource.Quantity
	AverageValue *resource.Quantity 
	AverageUtilization *int32
}

控制器 HorizontalController

HorizontalController被通过 key horizontalpodautoscaling 加入到 controller manager 中。用来控制HorizontalPodAutoscaler实例。

///cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    ...
    controllers["horizontalpodautoscaling"] = startHPAController
    ...
}

获取负载指标

既然 Pod 副本数量的计算是基于 Pod 的负载情况,那边需要途径获取负载数据,这个途径就是MetricsClient

MetricsClient有两种实现:REST 方式和传统(Legacy)方式,分别是restMetricsClientHeapsterMetricsClient。一个是REST 实现以支持自定义的指标;一个是传统的 Heapster 指标(heapster 已经从 1.13 版本开始被废弃了)。

//cmd/kube-controller-manager/app/autoscaling.go
func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
		return nil, false, nil
	}

	if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients {
		// use the new-style clients if support for custom metrics is enabled
		return startHPAControllerWithRESTClient(ctx)
	}

	return startHPAControllerWithLegacyClient(ctx)
}

控制器逻辑HorizontalController#Run()

//pkg/controller/podautoscaler/horizontal.go
func (a *HorizontalController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer a.queue.ShutDown()

	klog.Infof("Starting HPA controller")
	defer klog.Infof("Shutting down HPA controller")

      // 等待 informer 完成HorizontalPodAutoscaler相关事件的同步
	if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
		return
	}

	// start a single worker (we may wish to start more in the future)
	//执行 worker 逻辑,直到收到退出指令
	go wait.Until(a.worker, time.Second, stopCh)

	<-stopCh
}

worker的核心是从工作队列中获取一个 key(格式为:namespace/name),然后对 key 进行 reconcile(这个词是Kubernetes 的核心,翻译为“调和”、“和解”。个人更喜欢“调整”,即将实例的状态调整为期望的状态。此处,对于 hpa 的实例的每个事件,都会按照特定的逻辑调整目标实例的 Pod 的副本数量。)。

//pkg/controller/podautoscaler/horizontal.go
func (a *HorizontalController) worker() {
	for a.processNextWorkItem() {
	}
	klog.Infof("horizontal pod autoscaler controller worker shutting down")
}

func (a *HorizontalController) processNextWorkItem() bool {
	key, quit := a.queue.Get()
	if quit {
		return false
	}
	defer a.queue.Done(key)

	deleted, err := a.reconcileKey(key.(string))
	if err != nil {
		utilruntime.HandleError(err)
	}
	
	if !deleted {
		a.queue.AddRateLimited(key)
	}

	return true
}

对 key 进行 reconcile 的调用栈:HorizontalController#reconcileKey -> HorizontalController#reconcileAutoscaler -> HorizontalController#computeReplicasForMetrics -> ScaleInterface#Update

简单来说就是先从Informer中拿到 key 对应的HorizontalPodAutoscaler资源实例;然后通过HorizontalPodAutoscaler实例中的信息,检查目标资源的Pod 负载以及当前的副本数,得到期望的 Pod 副本数;最终通过 Scale API 来调整 Pod 的副本数。最后会将调整的原因、计算的结果等信息写入HorizontalPodAutoscaler实例的 condition 中。

计算期望的副本数

对每个指标进行计算,都会得到建议的副本数,然后最大的那个就是最终的期望副本数。

//pkg/controller/podautoscaler/horizontal.go
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
	metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
    ......
	for i, metricSpec := range metricSpecs {
		replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])

		if err != nil {
			if invalidMetricsCount <= 0 {
				invalidMetricCondition = condition
				invalidMetricError = err
			}
			invalidMetricsCount++
		}
		if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
			timestamp = timestampProposal
			replicas = replicaCountProposal
			metric = metricNameProposal
		}
	}
    ......
}

#computeStatusForObjectMetric(注意这个方法名少了个 “s”)使用MetricsClient得到指定指标的值。

这个流程的细节还可以继续深挖,但到此已够我们理解 HPA​ 的实现方式了。​

(转载本站文章请注明作者和出处乱世浮生,请勿用于任何商业用途)

comments powered by Disqus