使用Go+UDP实现一个Gossip协议工具包(Gossip集群谣言传播与反熵传播的具体实现)
使用Go+UDP实现一个Gossip协议工具包(Gossip集群谣言传播与反熵传播的具体实现)
详情看源码
工具包源码
-
Github
https://github.com/dpwgc/pekonode -
Gitee
https://gitee.com/dpwgc/pekonode
Gossip协议基本概念
什么是Gossip?
Gossip是一种具有随机性、传染性的网络协议。
Gossip的特点
使用简单,扩展性强,容错率高,去中心化,最终一致性。
Gossip是如何同步信息的?
- 谣言传播方式
当一个新节点A连接到Gossip集群内的某个节点B时,A节点会将自己的信息发送给B节点,然后B节点会在集群中随机选取几个未被传染的节点,向他们广播A节点的信息(首次传染),集群中的其他节点收到A节点的信息后,又会像B节点那样广播A节点的信息给其他未被传染的节点(二次传染)。直至多次传染后,集群所有节点都收到了A节点的信息,同步完成。
- 反熵传播方式
某一节点在集群中随机选取一个节点,与之交换数据,新数据将覆盖旧数据。
信息传递实现
使用UDP进行节点间的通讯
- UDP发送
// write 发送udp数据
func write(addr string, port int, data []byte) {socket, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(addr),Port: port,})//略......_, err = socket.Write(data) // 发送数据//略......err = socket.Close()//略......
}
- UDP监听
// listen udp服务端监听
func listen(addr string, port int, size int, mq chan []byte) {udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))//略......conn, err := net.ListenUDP("udp", udpAddr)//略......//循环监听for {//接收数组bs := make([]byte, size)//从UDP监听中接收数据n, _, err := conn.ReadFromUDP(bs)//略......//获取有效数据b := bs[:n]//将数据放入缓冲队列,异步处理数据mq <- b}
}
心跳更新机制实现
- 谣言传播方式
每个集群节点在运行时都会周期性的选取集群中的其中几个节点,向它们广播自己的信息(即心跳包),其他节点在收到心跳包后,也会广播心跳包给某些未接收该心跳包的节点,直至所有节点都收到心跳包,单次心跳广播工作完成。
当有个节点宕机后,其他节点迟迟收不到该宕机节点的心跳包,则从本地节点列表中删除该节点的信息。

- 定时广播心跳
//定时广播心跳
func task(nodeList *NodeList) {for {//将本地节点加入已传染的节点列表infected//注意这个infected集合是随着UDP数据包走的,不在节点上保存//每次广播都会根据infected来跳过已经传染的节点var infected = make(map[string]int)infected[nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)] = 1//更新本地节点信息nodeList.Set(nodeList.localNode)//设置心跳数据包p := packet{Node: nodeList.localNode, //节点数据Infected: infected, //被感染的节点列表}//广播心跳数据包broadcast(nodeList, p)//略......//间隔时间time.Sleep(time.Duration(nodeList.Cycle) * time.Second)}
}
- 广播推送信息
//广播推送信息
func broadcast(nodeList *NodeList, p packet) {//取出所有未过期的节点nodes := nodeList.Get()var packets []packet//选取部分未被传染的节点i := 0for _, v := range nodes {//如果超过Amount最大推送数量if i >= nodeList.Amount {//结束广播break}//如果该节点已经被传染过了if p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] == 1 {//跳过该节点continue}p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] = 1 //标记该节点为已传染状态p.TargetAddr = v.Addr //设置发送目标地址p.TargetPort = v.Port //设置发送目标端口//将该节点添加进广播列表packets = append(packets, p)i++}//向这些未被传染的节点广播传染数据for _, v := range packets {bs, err := json.Marshal(p)if err != nil {println("[error]:", err)}//通过UDP发送数据包write(v.TargetAddr, v.TargetPort, bs)}
}
信息存储与同步纠错实现
Gossip集群中的每个节点都存储着一份本地节点列表,各个节点通过Gossip协议周期性地向其他节点传播自己的信息,实时更新各个节点的本地节点列表,以此来确保集群中各个节点的本地节点列表都保存了当前所有集群节点信息。
节点列表主要有两个需要同步的数据,分别是nodes(sync.Map)集群所有节点集合、metadata(atomic.Value)元数据信息。nodes使用谣言传播方式进行同步更新。metadata使用谣言传播方式进行更新发布,使用反熵传播方式进行校验纠错。
每次心跳广播,都会更新各个节点所存储的nodes节点集合。metadata元数据更新方式与心跳广播类似,由用户手动调用Publish()函数发布新的元数据信息,新的元数据也会像心跳更新那样通过谣言传播方式覆盖所有节点的旧元数据。

除了使用Publish()函数更新集群元数据以外,节点还会定期在集群中随机选取一个目标节点,向其发起数据交换请求(反熵传播),如果发现两节点的元数据信息不一致,则进行更新操作(纠错,用最新的元数据信息覆盖旧的元数据信息),以此来避免因为UDP数据包丢失或者新节点的加入而导致的集群元数据不一致问题。

- 相关结构体
// NodeList 节点列表
type NodeList struct {nodes sync.Map //节点集合(key为Node结构体,value为节点最近更新的秒级时间戳)Amount int //每次给多少个节点发送同步信息Cycle int64 //同步时间周期(每隔多少秒向其他节点发送一次列表同步信息)Buffer int //UDP接收缓冲区大小(决定UDP监听服务可以异步处理多少个请求)Size int //单个UDP心跳数据包的最大容量(单位:字节)Timeout int64 //单个节点的过期删除界限(多少秒后删除)localNode Node //本地节点信息ListenAddr string //本地UDP监听地址,用这个监听地址接收其他节点发来的心跳包(一般填0.0.0.0即可)status atomic.Value //本地节点列表更新状态(true:正常运行,false:停止发布心跳)IsPrint bool //是否打印列表同步信息到控制台metadata atomic.Value //元数据,集群中各个节点的元数据内容一致,相当于集群的公共数据(可存储一些公共配置信息),可以通过广播更新各个节点的元数据内容
}//元数据信息
type metadata struct {Data []byte //元数据内容Update int64 //元数据版本(更新时间戳)
}
- 发起交换数据请求
//发起两节点数据交换请求
func swapRequest(nodeList *NodeList) {//设置为数据交换数据包p := packet{//将本地节点信息存入数据包,接收方根据这个信息回复请求Node: nodeList.localNode,Infected: make(map[string]int),IsSwap: 1,Metadata: nodeList.metadata.Load().(metadata),}//取出所有未过期的节点nodes := nodeList.Get()//转成JSON格式bs, err := json.Marshal(p)if err != nil {println("[error]:", err)}//在节点列表中随机选取一个节点,发起数据交换请求for i := 0; i < len(nodes); i++ {//如果遍历到自己,则跳过if nodes[i].Addr == nodeList.localNode.Addr && nodes[i].Port == nodeList.localNode.Port {continue}//发送请求write(nodes[i].Addr, nodes[i].Port, bs)break}
}
- 回应交换数据请求
//接收数据交换请求并回应发送方,完成交换工作
func swapResponse(nodeList *NodeList, node Node) {//设置为数据交换数据包p := packet{Node: nodeList.localNode,Infected: make(map[string]int),IsSwap: 2,Metadata: nodeList.metadata.Load().(metadata),}//转成JSON格式bs, err := json.Marshal(p)if err != nil {println("[error]:", err)}//回应发起节点write(node.Addr, node.Port, bs)
}
- 更新元数据逻辑代码
//从监听队列中取出消息
bs := <-mq
var p packet//解析成packet实例
err := json.Unmarshal(bs, &p)//如果该数据包是两节点间的元数据交换数据包
if p.IsSwap != 0 {//如果数据包中的元数据版本要比本地存储的元数据版本新if p.Metadata.Update > nodeList.metadata.Load().(metadata).Update {//更新本地节点中存储的元数据信息nodeList.metadata.Store(p.Metadata)//跳过,不广播,不回应发起方continue}//如果数据包中的元数据版本要比本地存储的元数据版本旧,说明发起方的元数据版本较旧,需要更新if p.Metadata.Update < nodeList.metadata.Load().(metadata).Update {//如果是发起方发出的数据交换请求if p.IsSwap == 1 {//回应发起方,向发起方发送最新的元数据信息,完成交换流程swapResponse(nodeList, p.Node)}}//跳过,不广播continue
}//略......
数据包格式
数据包分为三种:
- 1、普通心跳数据包(IsUpdate=false且IsSwap=0),只负责传递心跳,不进行元数据更新和元数据交换。
- 2、元数据更新数据包(IsUpdate=true),负责更新所有节点的元数据信息,即用数据包内的Metadata元数据覆盖掉各个节点的metadata元数据。
- 3、元数据交换数据包(IsSwap=1或2),负责交换校验两个节点的元数据是否一致(反熵传播校验数据并纠错)。
// 数据包
type packet struct {//节点信息Node Node //心跳数据包中的节点信息TargetAddr string //发送目标的IP地址TargetPort int //发送目标的端口号Infected map[string]int //已被该数据包传染的节点列表,key为Addr:Port拼接的字符串,value为判定该节点是否已被传染的参数(1:是,0:否)//元数据信息Metadata metadata //新的元数据信息,如果该数据包是元数据更新数据包(isUpdate=true),则用newData覆盖掉原先的集群元数据metadataIsUpdate bool //该数据包是否为元数据更新数据包(true:是,false:否)IsSwap int //该数据包是否为元数据交换数据包(0:否,1:发起方将交换请求发送给接收方,2:接收方回应发送方,数据交换完成)
}

Infected是一个map集合,负责保存所有已被传染的节点,用于避免重复发送心跳。
注意这个infected集合是随着UDP数据包走的,不在节点上保存,每次广播都会查询Infected集合,并跳过其中的节点。

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
