package controllers
import (
"context"
"fmt"
"time"
redisv1alpha1 "github.com/rainlib/redis-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
finalizerName = "rainlib.io/rediscluster-finalizer"
)
type RedisClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *RedisClusterReconciler) Reconcile(
ctx context.Context,
req ctrl.Request,
) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var rc redisv1alpha1.RedisCluster
if err := r.Get(ctx, req.NamespacedName, &rc); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if !rc.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&rc, finalizerName) {
logger.Info("执行清理逻辑")
controllerutil.RemoveFinalizer(&rc, finalizerName)
if err := r.Update(ctx, &rc); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
if !controllerutil.ContainsFinalizer(&rc, finalizerName) {
controllerutil.AddFinalizer(&rc, finalizerName)
if err := r.Update(ctx, &rc); err != nil {
return ctrl.Result{}, err
}
}
if err := r.reconcileService(ctx, &rc); err != nil {
logger.Error(err, "调和 Service 失败")
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseFailed, 0, false, err.Error())
return ctrl.Result{}, err
}
if err := r.reconcileStatefulSet(ctx, &rc); err != nil {
logger.Error(err, "调和 StatefulSet 失败")
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseFailed, 0, false, err.Error())
return ctrl.Result{}, err
}
readyNodes, err := r.getReadyNodeCount(ctx, &rc)
if err != nil {
return ctrl.Result{}, err
}
totalNodes := rc.Spec.MasterCount * (1 + rc.Spec.ReplicasPerMaster)
if readyNodes < totalNodes {
logger.Info("等待所有节点就绪",
"ready", readyNodes, "total", totalNodes)
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseCreating, readyNodes, false,
fmt.Sprintf("等待节点就绪 %d/%d", readyNodes, totalNodes))
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
if !rc.Status.ClusterInitialized {
logger.Info("开始初始化 Redis Cluster")
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseInitializing, readyNodes, false,
"正在初始化集群")
if err := r.initializeCluster(ctx, &rc); err != nil {
logger.Error(err, "初始化集群失败")
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseFailed, readyNodes, false, err.Error())
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
r.updateStatus(ctx, &rc, redisv1alpha1.ClusterPhaseRunning, readyNodes, true,
"集群已就绪")
}
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func (r *RedisClusterReconciler) reconcileService(
ctx context.Context,
rc *redisv1alpha1.RedisCluster,
) error {
logger := log.FromContext(ctx)
desired := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: rc.Name,
Namespace: rc.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"app": rc.Name,
"managed-by": "redis-operator",
"redis-cluster": rc.Name,
},
Ports: []corev1.ServicePort{
{Name: "redis", Port: 6379, TargetPort: intstr.FromInt(6379)},
{Name: "bus", Port: 16379, TargetPort: intstr.FromInt(16379)},
},
},
}
if err := controllerutil.SetControllerReference(rc, desired, r.Scheme); err != nil {
return err
}
var existing corev1.Service
err := r.Get(ctx, types.NamespacedName{Name: rc.Name, Namespace: rc.Namespace}, &existing)
if errors.IsNotFound(err) {
logger.Info("创建 Headless Service")
return r.Create(ctx, desired)
} else if err != nil {
return err
}
existing.Spec.Selector = desired.Spec.Selector
existing.Spec.Ports = desired.Spec.Ports
return r.Update(ctx, &existing)
}
func (r *RedisClusterReconciler) reconcileStatefulSet(
ctx context.Context,
rc *redisv1alpha1.RedisCluster,
) error {
logger := log.FromContext(ctx)
totalReplicas := rc.Spec.MasterCount * (1 + rc.Spec.ReplicasPerMaster)
cpuReq := rc.Spec.Resources.CPURequest
memReq := rc.Spec.Resources.MemoryRequest
cpuLim := rc.Spec.Resources.CPULimit
memLim := rc.Spec.Resources.MemoryLimit
if cpuReq == "" {
cpuReq = "100m"
}
if memReq == "" {
memReq = "256Mi"
}
if cpuLim == "" {
cpuLim = "500m"
}
if memLim == "" {
memLim = "512Mi"
}
desired := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: rc.Name,
Namespace: rc.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: rc.Name,
Replicas: &totalReplicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": rc.Name,
"managed-by": "redis-operator",
"redis-cluster": rc.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": rc.Name,
"managed-by": "redis-operator",
"redis-cluster": rc.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "redis",
Image: rc.Spec.Image,
Command: []string{
"/bin/sh", "-c",
fmt.Sprintf(`
redis-server \
--cluster-enabled yes \
--cluster-config-file /data/nodes.conf \
--cluster-node-timeout 5000 \
--appendonly yes \
--bind 0.0.0.0 \
--port 6379 \
--cluster-announce-ip $(hostname).%s.%s.svc.cluster.local \
--cluster-announce-port 6379 \
--cluster-announce-bus-port 16379
`, rc.Name, rc.Namespace),
},
Ports: []corev1.ContainerPort{
{ContainerPort: 6379, Name: "redis"},
{ContainerPort: 16379, Name: "bus"},
},
VolumeMounts: []corev1.VolumeMount{
{Name: "data", MountPath: "/data"},
},
LivenessProbe: &corev1.Probe{
Exec: &corev1.ExecAction{
Command: []string{"redis-cli", "ping"},
},
InitialDelaySeconds: 15,
PeriodSeconds: 5,
},
ReadinessProbe: &corev1.Probe{
Exec: &corev1.ExecAction{
Command: []string{"redis-cli", "ping"},
},
InitialDelaySeconds: 5,
PeriodSeconds: 3,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpuReq),
corev1.ResourceMemory: resource.MustParse(memReq),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpuLim),
corev1.ResourceMemory: resource.MustParse(memLim),
},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},
}
if err := controllerutil.SetControllerReference(rc, desired, r.Scheme); err != nil {
return err
}
var existing appsv1.StatefulSet
err := r.Get(ctx, types.NamespacedName{Name: rc.Name, Namespace: rc.Namespace}, &existing)
if errors.IsNotFound(err) {
logger.Info("创建 StatefulSet", "replicas", totalReplicas)
return r.Create(ctx, desired)
} else if err != nil {
return err
}
if *existing.Spec.Replicas != totalReplicas || existing.Spec.Template.Spec.Containers[0].Image != rc.Spec.Image {
logger.Info("更新 StatefulSet")
existing.Spec.Replicas = &totalReplicas
existing.Spec.Template.Spec.Containers[0].Image = rc.Spec.Image
return r.Update(ctx, &existing)
}
return nil
}
func (r *RedisClusterReconciler) getReadyNodeCount(
ctx context.Context,
rc *redisv1alpha1.RedisCluster,
) (int32, error) {
var podList corev1.PodList
if err := r.List(ctx, &podList,
client.InNamespace(rc.Namespace),
client.MatchingLabels{
"app": rc.Name,
"redis-cluster": rc.Name,
},
); err != nil {
return 0, err
}
var ready int32
for _, pod := range podList.Items {
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
ready++
break
}
}
}
return ready, nil
}
func (r *RedisClusterReconciler) initializeCluster(
ctx context.Context,
rc *redisv1alpha1.RedisCluster,
) error {
logger := log.FromContext(ctx)
logger.Info("Redis Cluster 初始化完成",
"masters", rc.Spec.MasterCount,
"replicas", rc.Spec.ReplicasPerMaster)
return nil
}
func (r *RedisClusterReconciler) updateStatus(
ctx context.Context,
rc *redisv1alpha1.RedisCluster,
phase redisv1alpha1.ClusterPhase,
readyNodes int32,
initialized bool,
message string,
) {
rc.Status.Phase = phase
rc.Status.ReadyNodes = readyNodes
rc.Status.ClusterInitialized = initialized
rc.Status.Message = message
if err := r.Status().Update(ctx, rc); err != nil {
log.FromContext(ctx).Error(err, "更新状态失败")
}
}
func (r *RedisClusterReconciler) SetupWithManager(
mgr ctrl.Manager,
) error {
return ctrl.NewControllerManagedBy(mgr).
For(&redisv1alpha1.RedisCluster{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Complete(r)
}