使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus体系
前言
我们公司的hadoop集群都是基于原生安装的,没有使用cdh这些第三方整合好的,还提供各种监控。为了保证各个组件的正常,需要自己来开发弥补监控上的缺失。hadoop生态相当庞大,组件也超级多,其中一两个组件挂掉了,不经常巡检可能发现不了,经常巡检也会消耗掉大量精力
设计思路
针对前面提到的痛点,我要动手解决掉下面的几个问题
- 定时检测端口是否正常
- 怎么模块化配置,支持各个地方不同的场景
- 当异常发生时候,怎么通知到我
明确了要解决的问题之后,开始着手开发。
定时检测端口是否正常
在上一篇文章中,已经实现了定时任务功能,所以可以直接参考那个做一个tcp检测的功能,设计一个TCPTask 结构体来结合Crond实现定时检测。
type TCPTask struct {// 检测端口Port int// 检测地址Addr string// 角色,这里后面篇幅讲Role string// 返回结果,默认是0Result int `default:"0"`// 监控结构体,后面篇幅讲Monitor *Monitor
}func (t *TCPTask) Run() {// 正常状态就是0,等下面产生异常时候会修改成1t.Result = 0// 这里开始监控tcp的状态,设置一个超时时间conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))if err != nil {log.Println("err = ", err)t.Result = 1} else {// 连接成功就就可以关掉原来的连接了conn.Close()}// 通过chan传递执行状态t.Monitor.Message <- t
}
怎么模块化配置,支持各个地方不同的场景?
hadoop的组件超级多,每个组件启动的端口也很多,我整理了一份表格,简称就是前面篇幅提到的Role(角色),全称就是hadoop组件的名称。
| 简称 | 全称 |
|---|---|
| NN | NameNode |
| DN | DataNode |
| JN | JournalNode |
| FC | HDFS Failover Controller |
| HG | Hive Gateway |
| HS2 | Hive Server 2 |
| SG | Spark2 Gateway |
| SHS | Spart 2 History Server |
| NM | Yarn NodeManager |
| RM | Yarn ResourceManager |
| ZKS | ZooKeeper Server |
| M | Hbase Master |
| RS | HBase RegionServer |
| FA | Flume Agent |
| HMS | Hive Metastore Server |
| JHS | JobHistory Server |
想了好久才确定下来一个十分灵活的配置文件结构
hadoop:tcp_timeout: 10config:NN:port:- 8020- 50070crond: "01 */2 * * * *"describe: hadoop NameNodename: hadoop_go_NameNodeDN:port:- 50020- 50010- 50075crond: "30 */2 * * * *"describe: hadoop DataNodename: hadoop_go_DataNode
对应的go struct,下面入口程序代码有用到。
type GlobalConfig struct {Hadoop HadoopGlobalConfigListen stringMaxTaskNumber int `yaml:"max_task_number" default:"1000"`Log string
}type HadoopGlobalConfig struct {TCP_TIMEOUT int `yaml:"tcp_timeout" default:"30"`Config map[string]HadoopRoleConfig
}
type HadoopRoleConfig struct {Port []intCrond stringDescribe stringName string
}
- hadoop.config.tcp_timeout 超时时间
- hadoop.config.ROLE_NAME
| 键 | 类型 | 描述 |
|---|---|---|
| port | list | 角色启动的端口列表 |
| crond | crond表达式 | 秒 分 时 日 月 周 |
| describe | string | 描述这角色是干嘛的 |
| name | string | 监控项名称 |
这样做的好处就是为了灵活,启动时候只加载需要的角色,在程序启动时候去初始化监控指标(metrics),附上最新的入口代码
package mainimport ("fmt""hadoop-go/hadoop""io/ioutil""log""net/http""os""github.com/prometheus/client_golang/prometheus""github.com/prometheus/client_golang/prometheus/promhttp""github.com/robfig/cron""github.com/urfave/cli""gopkg.in/yaml.v2"
)var (hadoopConfig map[string]hadoop.HadoopConfigconfig_path stringglobal_config_path stringGlobalConfig hadoop.GlobalConfigenable_task boolding hadoop.Dingmonitor hadoop.Monitor
)func load_config() {// 读取全局配置文件log.Print("读取全局配置文件:", global_config_path)f, err := os.OpenFile(global_config_path, os.O_RDONLY, 0444)if err != nil {log.Panic("读取全局配置文件失败 ", err)}data, _ := ioutil.ReadAll(f)err = yaml.Unmarshal(data, &GlobalConfig)if err != nil {log.Panic("格式化全局配置文件失败", err)}f.Close()// 设置日志logFile, err := os.OpenFile(GlobalConfig.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)if err != nil {panic(err)}log.SetOutput(logFile) // 将文件设置为log输出的文件// log.SetPrefix("[reboot]")log.SetFlags(log.LstdFlags | log.Lshortfile | log.LUTC)// 读取配置文件log.Print("读取配置文件:", config_path)f, err1 := os.OpenFile(config_path, os.O_RDONLY, 0444)if err1 != nil {log.Panic("读取配置文件失败 ", err)}data1, _ := ioutil.ReadAll(f)err = yaml.Unmarshal(data1, &hadoopConfig)if err != nil {log.Panic("格式化配置文件失败", err)}f.Close()// 上面都没问题了,就去初始化监控指标monitor.Metrcis = make(map[string]*prometheus.GaugeVec)// 初始化一个channel,用来接收tcp检测的结果,monitor.Message = make(chan *hadoop.TCPTask, GlobalConfig.MaxTaskNumber)monitor.Config = &GlobalConfig.Hadoopfor n, d := range GlobalConfig.Hadoop.Config {monitor.Metrcis[n] = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: d.Name,Help: d.Describe,},[]string{"addr", "port"},)prometheus.MustRegister(monitor.Metrcis[n])}
}func action(c *cli.Context) error {// 载入配置load_config()// fix: error caused when modifying the configuration pathif len(c.Args()) == 0 {println("请选择一个配置")i := 0for k := range hadoopConfig {i++println(i, k)}return nil}config, err := hadoopConfig[c.Args()[0]]if !err {log.Panic("没有", c.Args()[0], "的配置")}// 初始化dingding监听if config.Dingding != "" {log.Println("启用钉钉,token:", config.Dingding)ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}go ding.Send()}// hadoop功能监控整合Prometheusmonitor.Load(config)// 启用taskcrond := cron.New()if enable_task {log.Println("启用定时任务")// 循环遍历任务加到定时任务里面for n, t := range config.Tasks {t.Ding = &dingswitch t.Module {case "hdfs":if t.Crond != "" {log.Println("添加定时任务:", n, "时间:", t.Crond)crond.AddJob(t.Crond, hadoop.Hdfs{Name: n, Task: t})}default:log.Println("没有对应的模块")}}crond.Start()}http.Handle("/metrics", promhttp.Handler())log.Fatal(http.ListenAndServe(GlobalConfig.Listen, nil))defer monitor.Crond.Stop()defer crond.Stop()return nil
}func main() {app := cli.NewApp()app.Name = "hadoop-go"app.Version = "1.0.0"app.Usage = "hadoop监控"app.Flags = []cli.Flag{cli.StringFlag{Name: "c",Usage: "配置文件路径。default: ./config/hadoop.yml",Value: "./config/hadoop.yml",Destination: &config_path,},cli.StringFlag{Name: "g",Usage: "全局配置文件路径。default: ./config/config.yml",Value: "./config/config.yml",Destination: &global_config_path,},cli.BoolFlag{Name: "t",Usage: "启动定时任务",Destination: &enable_task,},}app.Action = actionerr := app.Run(os.Args)if err != nil {fmt.Println(err)}
}
比较重要的就是Monitor,附上完整的代码
package hadoopimport ("log""net""strconv""time""github.com/prometheus/client_golang/prometheus""github.com/robfig/cron"
)type Monitor struct {Metrcis map[string]*prometheus.GaugeVecCrond *cron.CronConfig *HadoopGlobalConfigMessage chan *TCPTask
}type TCPTask struct {Port intAddr stringRole stringResult int `default:"0"`Monitor *Monitor
}func (t *TCPTask) Run() {// 正常状态就是0,等下面产生异常时候会修改成1t.Result = 0// 这里开始监控tcp的状态,设置一个超时时间conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))if err != nil {log.Println("err = ", err)t.Result = 1} else {// 连接成功就就可以关掉原来的连接了conn.Close()}// 通过chan传递执行状态t.Monitor.Message <- t
}func (m *Monitor) Load(config HadoopConfig) {// 启动一个采集定时器m.Crond = cron.New()// 启动一个协程去更新metric信息go m.Update()// 将需要的转换成定时任务for server, c := range config.Hadoop {log.Println("服务器:", server)for _, role := range c.Role {for _, port := range m.Config.Config[role].Port {// 在这里初始化一下。默认值都是0,代表正常的m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})}}}m.Crond.Start()
}func (m *Monitor) Update() {// 用来更新metricfor msg := range m.Message {log.Printf("update metric: server=>%s role=>%s port=>%d result=>%d", msg.Addr, msg.Role, msg.Port, msg.Result)m.Metrcis[msg.Role].WithLabelValues(msg.Addr, strconv.Itoa(msg.Port)).Set(float64(msg.Result))}
}
上面的代码就实现了怎么模块化的注册Prometheus指标
for n, d := range GlobalConfig.Hadoop.Config {monitor.Metrcis[n] = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: d.Name,Help: d.Describe,},[]string{"addr", "port"},)prometheus.MustRegister(monitor.Metrcis[n])}
模块化的设置监控主机,主要就是xxx.hadoop里面的配置,以下面的配置文件作为参考
chengdu:dingding: xtasks:v_report: module: hdfstype: CompareSizeargs: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455crond: 00 09 11 * * *desc: 成都{{.Yesterday}} v_report状态hadoop:172.16.4.29:role:- DN- NN
172.16.4.29这台主机既是DataNode,又是namenode,我们需要监控其8020,50070,50020,50010,50075端口。代码部分
// 将需要的转换成定时任务for server, c := range config.Hadoop {log.Println("服务器:", server)for _, role := range c.Role {for _, port := range m.Config.Config[role].Port {// 在这里初始化一下。默认值都是0,代表正常的m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})}}}
日志打印:
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50075 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50010 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
Promethues Web:
# HELP hadoop_go_DataNode hadoop DataNode
# TYPE hadoop_go_DataNode gauge
hadoop_go_DataNode{addr="172.16.4.29",port="50010"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50020"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50075"} 0
# HELP hadoop_go_NameNode hadoop NameNode
# TYPE hadoop_go_NameNode gauge
hadoop_go_NameNode{addr="172.16.4.29",port="50070"} 0
hadoop_go_NameNode{addr="172.16.4.29",port="8020"} 0
当异常发生时候,怎么通知到我
我们通过配置prometheus alert规则就能实现。
- alert: "hadoop_go_datanode"# 表达式,不等于0时候expr: hadoop_go_DataNode!=0# 持续1分钟for: 1m labels:# 这里是我们自定义的,receiver是一个接受者,我们自己开发的告警平台receiver: yunweiseverity: errorannotations:summary: "成都datanode异常"description: "{{$labels.addr}}的{{$labels.port}}状态异常"value: "{{ $value }}"
本系列到此就告一段落了,大家有什么想法可以留言交流。每做一个项目我都会有很大的收获。后面如果要更新的话,可能是讲一下监控hadoop的其他信息。
往期回顾
1.使用golang开发并监控hadoop篇(1)hdfs文件夹大小监控
2.使用golang开发并监控hadoop篇(2)hdfs使用情况和定时任务功能
3.使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
