开发一个简单的 etcd operator

共 15526字,需浏览 32分钟

 ·

2020-11-22 15:25

前面我们了解了 etcd 的集群搭建模式,也了解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让我们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成我们的 golang 代码。这里我们分成不同的版本来渐进式开发,首先第一个版本我们开发一个最简单的 Operator,直接用我们的 Operator 去生成前面的 StatefulSet 模板即可。

项目初始化

同样在开发 Operator 之前我们需要先提前想好我们的 CRD 资源对象,比如我们想要通过下面的 CR 资源来创建对应的 etcd 集群:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
  name: demo
spec:
 size: 3  # 副本数量
 image: cnych/etcd:v3.4.13  # 镜像

因为其他信息都是通过脚本获取的,所以基本上我们通过 size 和 image 两个字段就可以确定一个 Etcd 集群部署的样子了,所以我们的第一个版本非常简单,只要能够写出正确的部署脚本即可,然后我们在 Operator 当中根据上面我们定义的 EtcdCluster 这个 CR 资源来组装一个 StatefulSet 和 Headless SVC 对象就可以了。

首先初始化项目,这里我们使用 kubebuilder 来构建我们的脚手架:

➜  kubebuilder init --domain ydzs.io --owner cnych --repo github.com/cnych/etcd-operator
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api

项目脚手架创建完成后,然后定义资源 API:

➜  kubebuilder create api --group etcd --version v1alpha1 --kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go

这样我们的项目就初始化完成了,整体的代码结构如下所示:

➜  etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│   └── v1alpha1
├── bin
│   └── manager
├── config
│   ├── certmanager
│   ├── crd
│   ├── default
│   ├── manager
│   ├── prometheus
│   ├── rbac
│   ├── samples
│   └── webhook
├── controllers
│   ├── etcdcluster_controller.go
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

14 directories, 10 files

然后根据我们上面设计的 EtcdCluster 这个对象来编辑 Operator 的结构体即可,修改文件 api/v1alpha1/etcdcluster_types.go 中的 EtcdClusterSpec 结构体:

// api/v1alpha1/etcdcluster_types.go

// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
 // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 // Important: Run "make" to regenerate code after modifying this file

 Size  uint   `json:"size"`
 Image string `json:"image"`
}

要注意每次修改完成后需要执行 make 命令重新生成代码:

➜  make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go

接下来我们就可以去控制器的 Reconcile 函数中来实现我们自己的业务逻辑了。

业务逻辑

首先在目录 controllers 下面创建一个 resource.go 文件,用来根据我们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。

// controllers/resource.go

package controllers

import (
 "strconv"

 "github.com/cnych/etcd-operator/api/v1alpha1"
 appsv1 "k8s.io/api/apps/v1"
 corev1 "k8s.io/api/core/v1"
 "k8s.io/apimachinery/pkg/api/resource"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
 EtcdClusterLabelKey = "etcd.ydzs.io/cluster"
 EtcdClusterCommonLabelKey = "app"
 EtcdDataDirName     = "datadir"
)

func MutateStatefulSet(cluster *v1alpha1.EtcdCluster, sts *appsv1.StatefulSet) {
 sts.Labels = map[string]string{
  EtcdClusterCommonLabelKey: "etcd",
 }
 sts.Spec = appsv1.StatefulSetSpec{
  Replicas:    cluster.Spec.Size,
  ServiceName: cluster.Name,
  Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
   EtcdClusterLabelKey: cluster.Name,
  }},
  Template: corev1.PodTemplateSpec{
   ObjectMeta: metav1.ObjectMeta{
    Labels: map[string]string{
     EtcdClusterLabelKey: cluster.Name,
     EtcdClusterCommonLabelKey: "etcd",
    },
   },
   Spec: corev1.PodSpec{
    Containers: newContainers(cluster),
   },
  },
  VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
   corev1.PersistentVolumeClaim{
    ObjectMeta: metav1.ObjectMeta{
     Name: EtcdDataDirName,
    },
    Spec: corev1.PersistentVolumeClaimSpec{
     AccessModes: []corev1.PersistentVolumeAccessMode{
      corev1.ReadWriteOnce,
     },
     Resources: corev1.ResourceRequirements{
      Requests: corev1.ResourceList{
       corev1.ResourceStorage: resource.MustParse("1Gi"),
      },
     },
    },
   },
  },
 }
}

func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
 return []corev1.Container{
  corev1.Container{
   Name:  "etcd",
   Image: cluster.Spec.Image,
   Ports: []corev1.ContainerPort{
    corev1.ContainerPort{
     Name:          "peer",
     ContainerPort: 2380,
    },
    corev1.ContainerPort{
     Name:          "client",
     ContainerPort: 2379,
    },
   },
   Env: []corev1.EnvVar{
    corev1.EnvVar{
     Name:  "INITIAL_CLUSTER_SIZE",
     Value: strconv.Itoa(int(*cluster.Spec.Size)),
    },
    corev1.EnvVar{
     Name:  "SET_NAME",
     Value: cluster.Name,
    },
    corev1.EnvVar{
     Name: "POD_IP",
     ValueFrom: &corev1.EnvVarSource{
      FieldRef: &corev1.ObjectFieldSelector{
       FieldPath: "status.podIP",
      },
     },
    },
    corev1.EnvVar{
     Name: "MY_NAMESPACE",
     ValueFrom: &corev1.EnvVarSource{
      FieldRef: &corev1.ObjectFieldSelector{
       FieldPath: "metadata.namespace",
      },
     },
    },
   },
   VolumeMounts: []corev1.VolumeMount{
    corev1.VolumeMount{
     Name:      EtcdDataDirName,
     MountPath: "/var/run/etcd",
    },
   },
   Command: []string{
    "/bin/sh""-ec",
    "HOSTNAME=$(hostname)\n\n              ETCDCTL_API=3\n\n              eps() {\n                  EPS=\"\"\n                  for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                      EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n                  done\n                  echo ${EPS}\n              }\n\n              member_hash() {\n                  etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n              }\n\n              initial_peers() {\n                  PEERS=\"\"\n                  for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                    PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n                  done\n                  echo ${PEERS}\n              }\n\n              # etcd-SET_ID\n              SET_ID=${HOSTNAME##*-}\n\n              # adding a new member to existing cluster (assuming all initial pods are available)\n              if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n                  # export ETCDCTL_ENDPOINTS=$(eps)\n                  # member already added?\n\n                  MEMBER_HASH=$(member_hash)\n                  if [ -n \"${MEMBER_HASH}\" ]; then\n                      # the member hash exists but for some reason etcd failed\n                      # as the datadir has not be created, we can remove the member\n                      # and retrieve new hash\n                      echo \"Remove member ${MEMBER_HASH}\"\n                      etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n                  fi\n\n                  echo \"Adding new member\"\n\n                  etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n                  if [ $? -ne 0 ]; then\n                      echo \"member add ${HOSTNAME} error.\"\n                      rm -f /var/run/etcd/new_member_envs\n                      exit 1\n                  fi\n\n                  echo \"==> Loading env vars of existing cluster...\"\n                  sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n                  cat /var/run/etcd/new_member_envs\n                  . /var/run/etcd/new_member_envs\n\n                  exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n                      --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n                      --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n                      --data-dir /var/run/etcd/default.etcd\n              fi\n\n              for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                  while true; do\n                      echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n                      ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n                      sleep 1s\n                  done\n              done\n\n              echo \"join member ${HOSTNAME}\"\n              # join member\n              exec etcd --name ${HOSTNAME} \\\n                  --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n                  --listen-peer-urls http://${POD_IP}:2380 \\\n                  --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n                  --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n                  --initial-cluster-token etcd-cluster-1 \\\n                  --data-dir /var/run/etcd/default.etcd \\\n                  --initial-cluster $(initial_peers) \\\n                  --initial-cluster-state new",
   },
   Lifecycle: &corev1.Lifecycle{
    PreStop: &corev1.Handler{
     Exec: &corev1.ExecAction{
      Command: []string{
       "/bin/sh""-ec",
       "HOSTNAME=$(hostname)\n\n                    member_hash() {\n                        etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n                    }\n\n                    eps() {\n                        EPS=\"\"\n                        for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                            EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n                        done\n                        echo ${EPS}\n                    }\n\n                    export ETCDCTL_ENDPOINTS=$(eps)\n                    SET_ID=${HOSTNAME##*-}\n\n                    # Removing member from cluster\n                    if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n                        echo \"Removing ${HOSTNAME} from etcd cluster\"\n                        etcdctl member remove $(member_hash)\n                        if [ $? -eq 0 ]; then\n                            # Remove everything otherwise the cluster will no longer scale-up\n                            rm -rf /var/run/etcd/*\n                        fi\n                    fi",
      },
     },
    },
   },
  },
 }
}

func MutateHeadlessSvc(cluster *v1alpha1.EtcdCluster, svc *corev1.Service) {
 svc.Labels = map[string]string{
  EtcdClusterCommonLabelKey: "etcd",
 }
 svc.Spec = corev1.ServiceSpec{
  ClusterIP: corev1.ClusterIPNone,
  Selector: map[string]string{
   EtcdClusterLabelKey: cluster.Name,
  },
  Ports: []corev1.ServicePort{
   corev1.ServicePort{
    Name: "peer",
    Port: 2380,
   },
   corev1.ServicePort{
    Name: "client",
    Port: 2379,
   },
  },
 }
}

上面的代码虽然很多,但逻辑很简单,就是根据我们的 EtcdCluter 去构造 StatefulSet 和 Headless SVC 资源对象,构造完成后,当我们创建 EtcdCluster 的时候就可以在控制器的 Reconcile 函数中去进行逻辑处理了,这里我们也可以使用前面示例中的代码来简单处理即可,代码如下所示:

// controllers/etcdcluster_controller.go

func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
 ctx := context.Background()
 log := r.Log.WithValues("etcdcluster", req.NamespacedName)

 // 首先我们获取 EtcdCluster 实例
 var etcdCluster etcdv1alpha1.EtcdCluster
 if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
  // EtcdCluster was deleted,Ignore
  return ctrl.Result{}, client.IgnoreNotFound(err)
 }

 // 得到 EtcdCluster 过后去创建对应的StatefulSet和Service
 // CreateOrUpdate

 // (就是观察的当前状态和期望的状态进行对比)

 // 调谐,获取到当前的一个状态,然后和我们期望的状态进行对比是不是就可以

 // CreateOrUpdate Service
 var svc corev1.Service
 svc.Name = etcdCluster.Name
 svc.Namespace = etcdCluster.Namespace
 or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
  // 调谐必须在这个函数中去实现
  MutateHeadlessSvc(&etcdCluster, &svc)
  return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
 })
 if err != nil {
  return ctrl.Result{}, err
 }
 log.Info("CreateOrUpdate""Service", or)

 // CreateOrUpdate StatefulSet
 var sts appsv1.StatefulSet
 sts.Name = etcdCluster.Name
 sts.Namespace = etcdCluster.Namespace
 or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
  // 调谐必须在这个函数中去实现
  MutateStatefulSet(&etcdCluster, &sts)
  return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
 })
 if err != nil {
  return ctrl.Result{}, err
 }
 log.Info("CreateOrUpdate""StatefulSet", or)

 return ctrl.Result{}, nil
}

这里我们就是去对我们的 EtcdCluster 对象进行调谐,然后去创建或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简单,这样我们就实现我们的第一个版本的 etcd-operator。

调试

接下来我们首先安装我们的 CRD 对象,让我们的 Kubernetes 系统识别我们的 EtcdCluster 对象:

➜  make install
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured

然后运行控制器:

➜  make run    
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800    INFO    controller-runtime.metrics      metrics server is starting to listen    {"addr"":8080"}
2020-11-20T17:44:48.223+0800    INFO    setup   starting manager
2020-11-20T17:44:48.223+0800    INFO    controller-runtime.manager      starting metrics server {"path""/metrics"}
2020-11-20T17:44:48.223+0800    INFO    controller-runtime.controller   Starting EventSource    {"controller""etcdcluster""source""kind source: /, Kind="}
2020-11-20T17:44:48.326+0800    INFO    controller-runtime.controller   Starting Controller     {"controller""etcdcluster"}
2020-11-20T17:44:48.326+0800    INFO    controller-runtime.controller   Starting workers        {"controller""etcdcluster""worker count": 1}

控制器启动成功后我们就可以去创建我们的 Etcd 集群了,将示例 CR 资源清单修改成下面的 YAML:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
  name: etcd-sample
spec:
  size: 3
  image: cnych/etcd:v3.4.13

另外开启一个终端创建上面的资源对象:

➜  kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created

创建完成后我们可以查看对应的 EtcdCluster 对象:

➜  kubectl get etcdcluster
NAME          AGE
etcd-sample   2m35s

对应也会自动创建我们的 StatefulSet 和 Service 资源清单:

➜  kubectl get all -l app=etcd
NAME                READY   STATUS    RESTARTS   AGE
pod/etcd-sample-0   1/1     Running   0          85s
pod/etcd-sample-1   1/1     Running   0          71s
pod/etcd-sample-2   1/1     Running   0          66s

NAME                  TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
service/etcd-sample   ClusterIP   None                 2380/TCP,2379/TCP   86s

NAME                           READY   AGE
statefulset.apps/etcd-sample   3/3     87s

到这里我们的 Etcd 集群就启动起来了,我们是不是只通过简单的几行代码就实现了一个 etcd-operator。

当然还有很多细节没有处理,比如还没有添加对 StatefulSet 和 Headless SVC 的 RBAC 权限声明以及这两个资源对象变更的 Watch,这个前面我们已经讲解过了,大家可以试着完善这块实现。不过这里我们实现 etcd operator 的方式比较讨巧,我们需要提前去编写启动脚本,这个当然不算一个常规的方式,但是我们知道了如果去启动 etcd 集群了,后续也就可以用 golang 代码去实现了,所以这只是一个一个过程的实现而已~



本文节选自《Kubernetes 开发课》课程文档,该课程正在持续更新中,对于 Kubernetes 二次开发感兴趣的朋友可以扫描下方二维码了解课程详情。

浏览 44
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报