开发一个简单的 etcd operator
前面我们了解了 etcd 的集群搭建模式,也了解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让我们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成我们的 golang 代码。这里我们分成不同的版本来渐进式开发,首先第一个版本我们开发一个最简单的 Operator,直接用我们的 Operator 去生成前面的 StatefulSet 模板即可。
项目初始化
同样在开发 Operator 之前我们需要先提前想好我们的 CRD 资源对象,比如我们想要通过下面的 CR 资源来创建对应的 etcd 集群:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: demo
spec:
size: 3 # 副本数量
image: cnych/etcd:v3.4.13 # 镜像
因为其他信息都是通过脚本获取的,所以基本上我们通过 size 和 image 两个字段就可以确定一个 Etcd 集群部署的样子了,所以我们的第一个版本非常简单,只要能够写出正确的部署脚本即可,然后我们在 Operator 当中根据上面我们定义的 EtcdCluster 这个 CR 资源来组装一个 StatefulSet 和 Headless SVC 对象就可以了。
首先初始化项目,这里我们使用 kubebuilder 来构建我们的脚手架:
➜ kubebuilder init --domain ydzs.io --owner cnych --repo github.com/cnych/etcd-operator
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
项目脚手架创建完成后,然后定义资源 API:
➜ kubebuilder create api --group etcd --version v1alpha1 --kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
这样我们的项目就初始化完成了,整体的代码结构如下所示:
➜ etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│ └── v1alpha1
├── bin
│ └── manager
├── config
│ ├── certmanager
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ ├── samples
│ └── webhook
├── controllers
│ ├── etcdcluster_controller.go
│ └── suite_test.go
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go
14 directories, 10 files
然后根据我们上面设计的 EtcdCluster 这个对象来编辑 Operator 的结构体即可,修改文件 api/v1alpha1/etcdcluster_types.go
中的 EtcdClusterSpec 结构体:
// api/v1alpha1/etcdcluster_types.go
// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Size uint `json:"size"`
Image string `json:"image"`
}
要注意每次修改完成后需要执行 make 命令重新生成代码:
➜ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
接下来我们就可以去控制器的 Reconcile 函数中来实现我们自己的业务逻辑了。
业务逻辑
首先在目录 controllers 下面创建一个 resource.go
文件,用来根据我们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。
// controllers/resource.go
package controllers
import (
"strconv"
"github.com/cnych/etcd-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
EtcdClusterLabelKey = "etcd.ydzs.io/cluster"
EtcdClusterCommonLabelKey = "app"
EtcdDataDirName = "datadir"
)
func MutateStatefulSet(cluster *v1alpha1.EtcdCluster, sts *appsv1.StatefulSet) {
sts.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
EtcdClusterCommonLabelKey: "etcd",
},
},
Spec: corev1.PodSpec{
Containers: newContainers(cluster),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: EtcdDataDirName,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
}
func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: "etcd",
Image: cluster.Spec.Image,
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
Name: "peer",
ContainerPort: 2380,
},
corev1.ContainerPort{
Name: "client",
ContainerPort: 2379,
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "INITIAL_CLUSTER_SIZE",
Value: strconv.Itoa(int(*cluster.Spec.Size)),
},
corev1.EnvVar{
Name: "SET_NAME",
Value: cluster.Name,
},
corev1.EnvVar{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
corev1.EnvVar{
Name: "MY_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: EtcdDataDirName,
MountPath: "/var/run/etcd",
},
},
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n ETCDCTL_API=3\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n }\n\n initial_peers() {\n PEERS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n done\n echo ${PEERS}\n }\n\n # etcd-SET_ID\n SET_ID=${HOSTNAME##*-}\n\n # adding a new member to existing cluster (assuming all initial pods are available)\n if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n # export ETCDCTL_ENDPOINTS=$(eps)\n # member already added?\n\n MEMBER_HASH=$(member_hash)\n if [ -n \"${MEMBER_HASH}\" ]; then\n # the member hash exists but for some reason etcd failed\n # as the datadir has not be created, we can remove the member\n # and retrieve new hash\n echo \"Remove member ${MEMBER_HASH}\"\n etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n fi\n\n echo \"Adding new member\"\n\n etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n if [ $? -ne 0 ]; then\n echo \"member add ${HOSTNAME} error.\"\n rm -f /var/run/etcd/new_member_envs\n exit 1\n fi\n\n echo \"==> Loading env vars of existing cluster...\"\n sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n cat /var/run/etcd/new_member_envs\n . /var/run/etcd/new_member_envs\n\n exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --data-dir /var/run/etcd/default.etcd\n fi\n\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n while true; do\n echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n sleep 1s\n done\n done\n\n echo \"join member ${HOSTNAME}\"\n # join member\n exec etcd --name ${HOSTNAME} \\\n --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --initial-cluster-token etcd-cluster-1 \\\n --data-dir /var/run/etcd/default.etcd \\\n --initial-cluster $(initial_peers) \\\n --initial-cluster-state new",
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n }\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n export ETCDCTL_ENDPOINTS=$(eps)\n SET_ID=${HOSTNAME##*-}\n\n # Removing member from cluster\n if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n echo \"Removing ${HOSTNAME} from etcd cluster\"\n etcdctl member remove $(member_hash)\n if [ $? -eq 0 ]; then\n # Remove everything otherwise the cluster will no longer scale-up\n rm -rf /var/run/etcd/*\n fi\n fi",
},
},
},
},
},
}
}
func MutateHeadlessSvc(cluster *v1alpha1.EtcdCluster, svc *corev1.Service) {
svc.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
EtcdClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{
corev1.ServicePort{
Name: "peer",
Port: 2380,
},
corev1.ServicePort{
Name: "client",
Port: 2379,
},
},
}
}
上面的代码虽然很多,但逻辑很简单,就是根据我们的 EtcdCluter 去构造 StatefulSet 和 Headless SVC 资源对象,构造完成后,当我们创建 EtcdCluster 的时候就可以在控制器的 Reconcile 函数中去进行逻辑处理了,这里我们也可以使用前面示例中的代码来简单处理即可,代码如下所示:
// controllers/etcdcluster_controller.go
func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("etcdcluster", req.NamespacedName)
// 首先我们获取 EtcdCluster 实例
var etcdCluster etcdv1alpha1.EtcdCluster
if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
// EtcdCluster was deleted,Ignore
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 得到 EtcdCluster 过后去创建对应的StatefulSet和Service
// CreateOrUpdate
// (就是观察的当前状态和期望的状态进行对比)
// 调谐,获取到当前的一个状态,然后和我们期望的状态进行对比是不是就可以
// CreateOrUpdate Service
var svc corev1.Service
svc.Name = etcdCluster.Name
svc.Namespace = etcdCluster.Namespace
or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
// 调谐必须在这个函数中去实现
MutateHeadlessSvc(&etcdCluster, &svc)
return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "Service", or)
// CreateOrUpdate StatefulSet
var sts appsv1.StatefulSet
sts.Name = etcdCluster.Name
sts.Namespace = etcdCluster.Namespace
or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
// 调谐必须在这个函数中去实现
MutateStatefulSet(&etcdCluster, &sts)
return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "StatefulSet", or)
return ctrl.Result{}, nil
}
这里我们就是去对我们的 EtcdCluster 对象进行调谐,然后去创建或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简单,这样我们就实现我们的第一个版本的 etcd-operator。
调试
接下来我们首先安装我们的 CRD 对象,让我们的 Kubernetes 系统识别我们的 EtcdCluster 对象:
➜ make install
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured
然后运行控制器:
➜ make run
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800 INFO controller-runtime.metrics metrics server is starting to listen {"addr": ":8080"}
2020-11-20T17:44:48.223+0800 INFO setup starting manager
2020-11-20T17:44:48.223+0800 INFO controller-runtime.manager starting metrics server {"path": "/metrics"}
2020-11-20T17:44:48.223+0800 INFO controller-runtime.controller Starting EventSource {"controller": "etcdcluster", "source": "kind source: /, Kind="}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting Controller {"controller": "etcdcluster"}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting workers {"controller": "etcdcluster", "worker count": 1}
控制器启动成功后我们就可以去创建我们的 Etcd 集群了,将示例 CR 资源清单修改成下面的 YAML:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: etcd-sample
spec:
size: 3
image: cnych/etcd:v3.4.13
另外开启一个终端创建上面的资源对象:
➜ kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created
创建完成后我们可以查看对应的 EtcdCluster 对象:
➜ kubectl get etcdcluster
NAME AGE
etcd-sample 2m35s
对应也会自动创建我们的 StatefulSet 和 Service 资源清单:
➜ kubectl get all -l app=etcd
NAME READY STATUS RESTARTS AGE
pod/etcd-sample-0 1/1 Running 0 85s
pod/etcd-sample-1 1/1 Running 0 71s
pod/etcd-sample-2 1/1 Running 0 66s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/etcd-sample ClusterIP None 2380/TCP,2379/TCP 86s
NAME READY AGE
statefulset.apps/etcd-sample 3/3 87s
到这里我们的 Etcd 集群就启动起来了,我们是不是只通过简单的几行代码就实现了一个 etcd-operator。
当然还有很多细节没有处理,比如还没有添加对 StatefulSet 和 Headless SVC 的 RBAC 权限声明以及这两个资源对象变更的 Watch,这个前面我们已经讲解过了,大家可以试着完善这块实现。不过这里我们实现 etcd operator 的方式比较讨巧,我们需要提前去编写启动脚本,这个当然不算一个常规的方式,但是我们知道了如果去启动 etcd 集群了,后续也就可以用 golang 代码去实现了,所以这只是一个一个过程的实现而已~
本文节选自《Kubernetes 开发课》课程文档,该课程正在持续更新中,对于 Kubernetes 二次开发感兴趣的朋友可以扫描下方二维码了解课程详情。