使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus体系

前言

我们公司的hadoop集群都是基于原生安装的,没有使用cdh这些第三方整合好的,还提供各种监控。为了保证各个组件的正常,需要自己来开发弥补监控上的缺失。hadoop生态相当庞大,组件也超级多,其中一两个组件挂掉了,不经常巡检可能发现不了,经常巡检也会消耗掉大量精力

设计思路

针对前面提到的痛点,我要动手解决掉下面的几个问题

  • 定时检测端口是否正常
  • 怎么模块化配置,支持各个地方不同的场景
  • 当异常发生时候,怎么通知到我

明确了要解决的问题之后,开始着手开发。

定时检测端口是否正常

在上一篇文章中,已经实现了定时任务功能,所以可以直接参考那个做一个tcp检测的功能,设计一个TCPTask 结构体来结合Crond实现定时检测。

type TCPTask struct {// 检测端口Port    int// 检测地址Addr    string// 角色,这里后面篇幅讲Role    string// 返回结果,默认是0Result  int `default:"0"`// 监控结构体,后面篇幅讲Monitor *Monitor
}func (t *TCPTask) Run() {// 正常状态就是0,等下面产生异常时候会修改成1t.Result = 0// 这里开始监控tcp的状态,设置一个超时时间conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))if err != nil {log.Println("err = ", err)t.Result = 1} else {// 连接成功就就可以关掉原来的连接了conn.Close()}// 通过chan传递执行状态t.Monitor.Message <- t
}

怎么模块化配置,支持各个地方不同的场景?

hadoop的组件超级多,每个组件启动的端口也很多,我整理了一份表格,简称就是前面篇幅提到的Role(角色),全称就是hadoop组件的名称。

简称全称
NNNameNode
DNDataNode
JNJournalNode
FCHDFS Failover Controller
HGHive Gateway
HS2Hive Server 2
SGSpark2 Gateway
SHSSpart 2 History Server
NMYarn NodeManager
RMYarn ResourceManager
ZKSZooKeeper Server
MHbase Master
RSHBase RegionServer
FAFlume Agent
HMSHive Metastore Server
JHSJobHistory Server

想了好久才确定下来一个十分灵活的配置文件结构

hadoop:tcp_timeout: 10config:NN:port:- 8020- 50070crond: "01 */2 * * * *"describe: hadoop NameNodename: hadoop_go_NameNodeDN:port:- 50020- 50010- 50075crond: "30 */2 * * * *"describe: hadoop DataNodename: hadoop_go_DataNode

对应的go struct,下面入口程序代码有用到。

type GlobalConfig struct {Hadoop        HadoopGlobalConfigListen        stringMaxTaskNumber int `yaml:"max_task_number" default:"1000"`Log           string
}type HadoopGlobalConfig struct {TCP_TIMEOUT int `yaml:"tcp_timeout" default:"30"`Config      map[string]HadoopRoleConfig
}
type HadoopRoleConfig struct {Port     []intCrond    stringDescribe stringName     string
}
  • hadoop.config.tcp_timeout 超时时间
  • hadoop.config.ROLE_NAME
类型描述
portlist角色启动的端口列表
crondcrond表达式秒 分 时 日 月 周
describestring描述这角色是干嘛的
namestring监控项名称

这样做的好处就是为了灵活,启动时候只加载需要的角色,在程序启动时候去初始化监控指标(metrics),附上最新的入口代码

package mainimport ("fmt""hadoop-go/hadoop""io/ioutil""log""net/http""os""github.com/prometheus/client_golang/prometheus""github.com/prometheus/client_golang/prometheus/promhttp""github.com/robfig/cron""github.com/urfave/cli""gopkg.in/yaml.v2"
)var (hadoopConfig       map[string]hadoop.HadoopConfigconfig_path        stringglobal_config_path stringGlobalConfig       hadoop.GlobalConfigenable_task        boolding               hadoop.Dingmonitor            hadoop.Monitor
)func load_config() {// 读取全局配置文件log.Print("读取全局配置文件:", global_config_path)f, err := os.OpenFile(global_config_path, os.O_RDONLY, 0444)if err != nil {log.Panic("读取全局配置文件失败 ", err)}data, _ := ioutil.ReadAll(f)err = yaml.Unmarshal(data, &GlobalConfig)if err != nil {log.Panic("格式化全局配置文件失败", err)}f.Close()// 设置日志logFile, err := os.OpenFile(GlobalConfig.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)if err != nil {panic(err)}log.SetOutput(logFile) // 将文件设置为log输出的文件// log.SetPrefix("[reboot]")log.SetFlags(log.LstdFlags | log.Lshortfile | log.LUTC)// 读取配置文件log.Print("读取配置文件:", config_path)f, err1 := os.OpenFile(config_path, os.O_RDONLY, 0444)if err1 != nil {log.Panic("读取配置文件失败 ", err)}data1, _ := ioutil.ReadAll(f)err = yaml.Unmarshal(data1, &hadoopConfig)if err != nil {log.Panic("格式化配置文件失败", err)}f.Close()// 上面都没问题了,就去初始化监控指标monitor.Metrcis = make(map[string]*prometheus.GaugeVec)// 初始化一个channel,用来接收tcp检测的结果,monitor.Message = make(chan *hadoop.TCPTask, GlobalConfig.MaxTaskNumber)monitor.Config = &GlobalConfig.Hadoopfor n, d := range GlobalConfig.Hadoop.Config {monitor.Metrcis[n] = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: d.Name,Help: d.Describe,},[]string{"addr", "port"},)prometheus.MustRegister(monitor.Metrcis[n])}
}func action(c *cli.Context) error {// 载入配置load_config()// fix: error caused when modifying the configuration pathif len(c.Args()) == 0 {println("请选择一个配置")i := 0for k := range hadoopConfig {i++println(i, k)}return nil}config, err := hadoopConfig[c.Args()[0]]if !err {log.Panic("没有", c.Args()[0], "的配置")}// 初始化dingding监听if config.Dingding != "" {log.Println("启用钉钉,token:", config.Dingding)ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}go ding.Send()}// hadoop功能监控整合Prometheusmonitor.Load(config)// 启用taskcrond := cron.New()if enable_task {log.Println("启用定时任务")// 循环遍历任务加到定时任务里面for n, t := range config.Tasks {t.Ding = &dingswitch t.Module {case "hdfs":if t.Crond != "" {log.Println("添加定时任务:", n, "时间:", t.Crond)crond.AddJob(t.Crond, hadoop.Hdfs{Name: n, Task: t})}default:log.Println("没有对应的模块")}}crond.Start()}http.Handle("/metrics", promhttp.Handler())log.Fatal(http.ListenAndServe(GlobalConfig.Listen, nil))defer monitor.Crond.Stop()defer crond.Stop()return nil
}func main() {app := cli.NewApp()app.Name = "hadoop-go"app.Version = "1.0.0"app.Usage = "hadoop监控"app.Flags = []cli.Flag{cli.StringFlag{Name:        "c",Usage:       "配置文件路径。default: ./config/hadoop.yml",Value:       "./config/hadoop.yml",Destination: &config_path,},cli.StringFlag{Name:        "g",Usage:       "全局配置文件路径。default: ./config/config.yml",Value:       "./config/config.yml",Destination: &global_config_path,},cli.BoolFlag{Name:        "t",Usage:       "启动定时任务",Destination: &enable_task,},}app.Action = actionerr := app.Run(os.Args)if err != nil {fmt.Println(err)}
}

比较重要的就是Monitor,附上完整的代码

package hadoopimport ("log""net""strconv""time""github.com/prometheus/client_golang/prometheus""github.com/robfig/cron"
)type Monitor struct {Metrcis map[string]*prometheus.GaugeVecCrond   *cron.CronConfig  *HadoopGlobalConfigMessage chan *TCPTask
}type TCPTask struct {Port    intAddr    stringRole    stringResult  int `default:"0"`Monitor *Monitor
}func (t *TCPTask) Run() {// 正常状态就是0,等下面产生异常时候会修改成1t.Result = 0// 这里开始监控tcp的状态,设置一个超时时间conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))if err != nil {log.Println("err = ", err)t.Result = 1} else {// 连接成功就就可以关掉原来的连接了conn.Close()}// 通过chan传递执行状态t.Monitor.Message <- t
}func (m *Monitor) Load(config HadoopConfig) {// 启动一个采集定时器m.Crond = cron.New()// 启动一个协程去更新metric信息go m.Update()// 将需要的转换成定时任务for server, c := range config.Hadoop {log.Println("服务器:", server)for _, role := range c.Role {for _, port := range m.Config.Config[role].Port {// 在这里初始化一下。默认值都是0,代表正常的m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})}}}m.Crond.Start()
}func (m *Monitor) Update() {// 用来更新metricfor msg := range m.Message {log.Printf("update metric: server=>%s role=>%s port=>%d result=>%d", msg.Addr, msg.Role, msg.Port, msg.Result)m.Metrcis[msg.Role].WithLabelValues(msg.Addr, strconv.Itoa(msg.Port)).Set(float64(msg.Result))}
}

上面的代码就实现了怎么模块化的注册Prometheus指标

for n, d := range GlobalConfig.Hadoop.Config {monitor.Metrcis[n] = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: d.Name,Help: d.Describe,},[]string{"addr", "port"},)prometheus.MustRegister(monitor.Metrcis[n])}

模块化的设置监控主机,主要就是xxx.hadoop里面的配置,以下面的配置文件作为参考

chengdu:dingding: xtasks:v_report: module: hdfstype: CompareSizeargs: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455crond: 00 09 11 * * *desc: 成都{{.Yesterday}} v_report状态hadoop:172.16.4.29:role:- DN- NN 

172.16.4.29这台主机既是DataNode,又是namenode,我们需要监控其8020,50070,50020,50010,50075端口。代码部分

// 将需要的转换成定时任务for server, c := range config.Hadoop {log.Println("服务器:", server)for _, role := range c.Role {for _, port := range m.Config.Config[role].Port {// 在这里初始化一下。默认值都是0,代表正常的m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})}}}

日志打印:

2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50075 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50010 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0

Promethues Web:

# HELP hadoop_go_DataNode hadoop DataNode
# TYPE hadoop_go_DataNode gauge
hadoop_go_DataNode{addr="172.16.4.29",port="50010"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50020"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50075"} 0
# HELP hadoop_go_NameNode hadoop NameNode
# TYPE hadoop_go_NameNode gauge
hadoop_go_NameNode{addr="172.16.4.29",port="50070"} 0
hadoop_go_NameNode{addr="172.16.4.29",port="8020"} 0

当异常发生时候,怎么通知到我

我们通过配置prometheus alert规则就能实现。

  - alert: "hadoop_go_datanode"# 表达式,不等于0时候expr: hadoop_go_DataNode!=0# 持续1分钟for: 1m  labels:# 这里是我们自定义的,receiver是一个接受者,我们自己开发的告警平台receiver: yunweiseverity: errorannotations:summary: "成都datanode异常"description: "{{$labels.addr}}的{{$labels.port}}状态异常"value: "{{ $value }}"

本系列到此就告一段落了,大家有什么想法可以留言交流。每做一个项目我都会有很大的收获。后面如果要更新的话,可能是讲一下监控hadoop的其他信息。

往期回顾

1.使用golang开发并监控hadoop篇(1)hdfs文件夹大小监控
2.使用golang开发并监控hadoop篇(2)hdfs使用情况和定时任务功能
3.使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部