2022-03-16 k8s的operator-hub中的redis-operator的redis-cluster的组建redis-cluster集群及cluster高可用处理
目录
摘要:
时序图:
核心函数:
RedisClusterReconciler:Reconcile中组建cluster集群及高可用部分:
CheckRedisNodeCount
checkRedisCluster
ExecuteRedisClusterCommand
ExecuteRedisReplicationCommand
ExecuteFailoverOperation
executeFailoverCommand
摘要:
记录k8s的operator-hub中的redis-operator的redis-cluster的组建集群及高可用处理
时序图:

核心函数:
RedisClusterReconciler:Reconcile中组建cluster集群及高可用部分:
if leaderReplicas == 0 {reqLogger.Info("Redis leaders Cannot be 0", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)return ctrl.Result{RequeueAfter: time.Second * 120}, nil}if int32(redisLeaderInfo.Status.ReadyReplicas) != leaderReplicas && int32(redisFollowerInfo.Status.ReadyReplicas) != followerReplicas {reqLogger.Info("Redis leader and follower nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)return ctrl.Result{RequeueAfter: time.Second * 120}, nil}reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)))if k8sutils.CheckRedisNodeCount(instance, "") != totalReplicas {leaderCount := k8sutils.CheckRedisNodeCount(instance, "leader")if leaderCount != leaderReplicas {reqLogger.Info("Not all leader are part of the cluster...", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas)k8sutils.ExecuteRedisClusterCommand(instance)} else {if followerReplicas > 0 {reqLogger.Info("All leader are part of the cluster, adding follower/replicas", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas, "Follower.Replicas", followerReplicas)k8sutils.ExecuteRedisReplicationCommand(instance)} else {reqLogger.Info("no follower/replicas configured, skipping replication configuration", "Leaders.Count", leaderCount, "Leader.Size", leaderReplicas, "Follower.Replicas", followerReplicas)}}} else {reqLogger.Info("Redis leader count is desired")if k8sutils.CheckRedisClusterState(instance) >= int(totalReplicas)-1 {reqLogger.Info("Redis leader is not desired, executing failover operation")k8sutils.ExecuteFailoverOperation(instance)}return ctrl.Result{RequeueAfter: time.Second * 120}, nil}
CheckRedisNodeCount
// CheckRedisNodeCount will check the count of redis nodes
func CheckRedisNodeCount(cr *redisv1beta1.RedisCluster, nodeType string) int32 {var redisNodeType stringlogger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)clusterNodes := checkRedisCluster(cr)count := len(clusterNodes)switch nodeType {case "leader":redisNodeType = "master"case "follower":redisNodeType = "slave"default:redisNodeType = nodeType}if nodeType != "" {count = 0for _, node := range clusterNodes {if strings.Contains(node[2], redisNodeType) {count++}}logger.Info("Number of redis nodes are", "Nodes", strconv.Itoa(count), "Type", nodeType)} else {logger.Info("Total number of redis nodes are", "Nodes", strconv.Itoa(count))}return int32(count)
}
checkRedisCluster
// checkRedisCluster will check the redis cluster have sufficient nodes or not
func checkRedisCluster(cr *redisv1beta1.RedisCluster) [][]string {var client *redis.Clientlogger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)client = configureRedisClient(cr, cr.ObjectMeta.Name+"-leader-0")cmd := redis.NewStringCmd("cluster", "nodes")err := client.Process(cmd)if err != nil {logger.Error(err, "Redis command failed with this error")}output, err := cmd.Result()if err != nil {logger.Error(err, "Redis command failed with this error")}logger.Info("Redis cluster nodes are listed", "Output", output)csvOutput := csv.NewReader(strings.NewReader(output))csvOutput.Comma = ' 'csvOutput.FieldsPerRecord = -1csvOutputRecords, err := csvOutput.ReadAll()if err != nil {logger.Error(err, "Error parsing Node Counts", "output", output)}return csvOutputRecords
}
ExecuteRedisClusterCommand
// ExecuteRedisClusterCommand will execute redis cluster creation command
func ExecuteRedisClusterCommand(cr *redisv1beta1.RedisCluster) {logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)replicas := cr.Spec.GetReplicaCounts("leader")cmd := []string{"redis-cli", "--cluster", "create"}for podCount := 0; podCount <= int(replicas)-1; podCount++ {pod := RedisDetails{PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(podCount),Namespace: cr.Namespace,}cmd = append(cmd, getRedisServerIP(pod)+":6379")}cmd = append(cmd, "--cluster-yes")if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {pass, err := getRedisPassword(cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)if err != nil {logger.Error(err, "Error in getting redis password")}cmd = append(cmd, "-a")cmd = append(cmd, pass)}cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.ObjectMeta.Name+"-leader-0")...)logger.Info("Redis cluster creation command is", "Command", cmd)executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
}
ExecuteRedisReplicationCommand
// ExecuteRedisReplicationCommand will execute the replication command
func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) {logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)replicas := cr.Spec.GetReplicaCounts("follower")nodes := checkRedisCluster(cr)for podCount := 0; podCount <= int(replicas)-1; podCount++ {followerPod := RedisDetails{PodName: cr.ObjectMeta.Name + "-follower-" + strconv.Itoa(podCount),Namespace: cr.Namespace,}leaderPod := RedisDetails{PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(podCount),Namespace: cr.Namespace,}podIP := getRedisServerIP(followerPod)if !checkRedisNodePresence(cr, nodes, podIP) {logger.Info("Adding node to cluster.", "Node.IP", podIP, "Follower.Pod", followerPod)cmd := createRedisReplicationCommand(cr, leaderPod, followerPod)executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")} else {logger.Info("Skipping Adding node to cluster, already present.", "Follower.Pod", followerPod)}}
}
ExecuteFailoverOperation
// ExecuteFailoverOperation will execute redis failover operations
func ExecuteFailoverOperation(cr *redisv1beta1.RedisCluster) {executeFailoverCommand(cr, "leader")executeFailoverCommand(cr, "follower")
}
executeFailoverCommand
// executeFailoverCommand will execute failover command
func executeFailoverCommand(cr *redisv1beta1.RedisCluster, role string) {logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)replicas := cr.Spec.GetReplicaCounts(role)podName := cr.ObjectMeta.Name + "-" + role + "-"for podCount := 0; podCount <= int(replicas)-1; podCount++ {logger.Info("Executing redis failover operations", "Redis Node", podName+strconv.Itoa(podCount))client := configureRedisClient(cr, podName+strconv.Itoa(podCount))cmd := redis.NewStringCmd("cluster", "reset")err := client.Process(cmd)if err != nil {logger.Error(err, "Redis command failed with this error")flushcommand := redis.NewStringCmd("flushall")err := client.Process(flushcommand)if err != nil {logger.Error(err, "Redis flush command failed with this error")}}output, err := cmd.Result()if err != nil {logger.Error(err, "Redis command failed with this error")}logger.Info("Redis cluster failover executed", "Output", output)}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
