使用golang开发并监控hadoop篇3监控hadoop的端口并整合到Prometheus体系

前言

我们公司的hadoop集群都是基于原生安装的,没有使用cdh这些第三方整合好的,还提供各种监控。为了保证各个组件的正常,需要自己来开发弥补监控上的缺失。hadoop生态相当庞大,组件也超级多,其中一两个组件挂掉了,不经常巡检可能发现不了,经常巡检也会消耗掉大量精力

设计思路

针对前面提到的痛点,我要动手解决掉下面的几个问题

  • 定时检测端口是否正常
  • 怎么模块化配置,支持各个地方不同的场景
  • 当异常发生时候,怎么通知到我

明确了要解决的问题之后,开始着手开发。

定时检测端口是否正常

在上一篇文章中,已经实现了定时任务功能,所以可以直接参考那个做一个tcp检测的功能,设计一个TCPTask 结构体来结合Crond实现定时检测。

type TCPTask struct {
	// 检测端口
	Port    int
	// 检测地址
	Addr    string
	// 角色,这里后面篇幅讲
	Role    string
	// 返回结果,默认是0
	Result  int `default:"0"`
	// 监控结构体,后面篇幅讲
	Monitor *Monitor
}

func (t *TCPTask) Run() {
	// 正常状态就是0,等下面产生异常时候会修改成1
	t.Result = 0

	// 这里开始监控tcp的状态,设置一个超时时间
	conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))
	if err != nil {
		log.Println("err = ", err)
		t.Result = 1

	} else {
		// 连接成功就就可以关掉原来的连接了
		conn.Close()
	}

	// 通过chan传递执行状态
	t.Monitor.Message <- t
}

怎么模块化配置,支持各个地方不同的场景?

hadoop的组件超级多,每个组件启动的端口也很多,我整理了一份表格,简称就是前面篇幅提到的Role(角色),全称就是hadoop组件的名称。

简称全称
NNNameNode
DNDataNode
JNJournalNode
FCHDFS Failover Controller
HGHive Gateway
HS2Hive Server 2
SGSpark2 Gateway
SHSSpart 2 History Server
NMYarn NodeManager
RMYarn ResourceManager
ZKSZooKeeper Server
MHbase Master
RSHBase RegionServer
FAFlume Agent
HMSHive Metastore Server
JHSJobHistory Server

想了好久才确定下来一个十分灵活的配置文件结构

hadoop:
  tcp_timeout: 10
  config:
    NN:
      port:
      - 8020
      - 50070
      crond: "01 */2 * * * *"
      describe: hadoop NameNode
      name: hadoop_go_NameNode
    DN:
      port:
      - 50020
      - 50010
      - 50075
      crond: "30 */2 * * * *"
      describe: hadoop DataNode
      name: hadoop_go_DataNode

对应的go struct,下面入口程序代码有用到。

type GlobalConfig struct {
	Hadoop        HadoopGlobalConfig
	Listen        string
	MaxTaskNumber int `yaml:"max_task_number" default:"1000"`
	Log           string
}

type HadoopGlobalConfig struct {
	TCP_TIMEOUT int `yaml:"tcp_timeout" default:"30"`
	Config      map[string]HadoopRoleConfig
}
type HadoopRoleConfig struct {
	Port     []int
	Crond    string
	Describe string
	Name     string
}
  • hadoop.config.tcp_timeout 超时时间
  • hadoop.config.ROLE_NAME
类型描述
portlist角色启动的端口列表
crondcrond表达式秒 分 时 日 月 周
describestring描述这角色是干嘛的
namestring监控项名称

这样做的好处就是为了灵活,启动时候只加载需要的角色,在程序启动时候去初始化监控指标(metrics),附上最新的入口代码

package main

import (
	"fmt"
	"hadoop-go/hadoop"
	"io/ioutil"
	"log"
	"net/http"
	"os"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"github.com/robfig/cron"
	"github.com/urfave/cli"
	"gopkg.in/yaml.v2"
)

var (
	hadoopConfig       map[string]hadoop.HadoopConfig
	config_path        string
	global_config_path string
	GlobalConfig       hadoop.GlobalConfig
	enable_task        bool
	ding               hadoop.Ding
	monitor            hadoop.Monitor
)

func load_config() {
	// 读取全局配置文件
	log.Print("读取全局配置文件:", global_config_path)
	f, err := os.OpenFile(global_config_path, os.O_RDONLY, 0444)
	if err != nil {
		log.Panic("读取全局配置文件失败 ", err)
	}
	data, _ := ioutil.ReadAll(f)
	err = yaml.Unmarshal(data, &GlobalConfig)
	if err != nil {
		log.Panic("格式化全局配置文件失败", err)
	}
	f.Close()
	// 设置日志
	logFile, err := os.OpenFile(GlobalConfig.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
	if err != nil {
		panic(err)
	}
	log.SetOutput(logFile) // 将文件设置为log输出的文件
	// log.SetPrefix("[reboot]")
	log.SetFlags(log.LstdFlags | log.Lshortfile | log.LUTC)

	// 读取配置文件
	log.Print("读取配置文件:", config_path)
	f, err1 := os.OpenFile(config_path, os.O_RDONLY, 0444)
	if err1 != nil {
		log.Panic("读取配置文件失败 ", err)
	}
	data1, _ := ioutil.ReadAll(f)
	err = yaml.Unmarshal(data1, &hadoopConfig)
	if err != nil {
		log.Panic("格式化配置文件失败", err)
	}
	f.Close()

	// 上面都没问题了,就去初始化监控指标
	monitor.Metrcis = make(map[string]*prometheus.GaugeVec)
	// 初始化一个channel,用来接收tcp检测的结果,
	monitor.Message = make(chan *hadoop.TCPTask, GlobalConfig.MaxTaskNumber)
	monitor.Config = &GlobalConfig.Hadoop

	for n, d := range GlobalConfig.Hadoop.Config {
		monitor.Metrcis[n] = prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: d.Name,
				Help: d.Describe,
			},
			[]string{"addr", "port"},
		)
		prometheus.MustRegister(monitor.Metrcis[n])
	}
}

func action(c *cli.Context) error {
	// 载入配置
	load_config()

	// fix: error caused when modifying the configuration path
	if len(c.Args()) == 0 {
		println("请选择一个配置")
		i := 0
		for k := range hadoopConfig {
			i++
			println(i, k)
		}
		return nil
	}

	config, err := hadoopConfig[c.Args()[0]]
	if !err {
		log.Panic("没有", c.Args()[0], "的配置")
	}

	// 初始化dingding监听
	if config.Dingding != "" {
		log.Println("启用钉钉,token:", config.Dingding)
		ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}
		go ding.Send()
	}

	// hadoop功能监控整合Prometheus
	monitor.Load(config)

	// 启用task
	crond := cron.New()
	if enable_task {
		log.Println("启用定时任务")
		// 循环遍历任务加到定时任务里面

		for n, t := range config.Tasks {
			t.Ding = &ding
			switch t.Module {
			case "hdfs":
				if t.Crond != "" {
					log.Println("添加定时任务:", n, "时间:", t.Crond)
					crond.AddJob(t.Crond, hadoop.Hdfs{Name: n, Task: t})
				}

			default:
				log.Println("没有对应的模块")
			}
		}
		crond.Start()
	}

	http.Handle("/metrics", promhttp.Handler())

	log.Fatal(http.ListenAndServe(GlobalConfig.Listen, nil))
	defer monitor.Crond.Stop()
	defer crond.Stop()
	return nil
}

func main() {
	app := cli.NewApp()
	app.Name = "hadoop-go"
	app.Version = "1.0.0"
	app.Usage = "hadoop监控"
	app.Flags = []cli.Flag{
		cli.StringFlag{
			Name:        "c",
			Usage:       "配置文件路径。default: ./config/hadoop.yml",
			Value:       "./config/hadoop.yml",
			Destination: &config_path,
		},
		cli.StringFlag{
			Name:        "g",
			Usage:       "全局配置文件路径。default: ./config/config.yml",
			Value:       "./config/config.yml",
			Destination: &global_config_path,
		},
		cli.BoolFlag{
			Name:        "t",
			Usage:       "启动定时任务",
			Destination: &enable_task,
		},
	}
	app.Action = action
	err := app.Run(os.Args)
	if err != nil {
		fmt.Println(err)
	}
}

比较重要的就是Monitor,附上完整的代码

package hadoop

import (
	"log"
	"net"
	"strconv"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/robfig/cron"
)

type Monitor struct {
	Metrcis map[string]*prometheus.GaugeVec
	Crond   *cron.Cron
	Config  *HadoopGlobalConfig
	Message chan *TCPTask
}

type TCPTask struct {
	Port    int
	Addr    string
	Role    string
	Result  int `default:"0"`
	Monitor *Monitor
}

func (t *TCPTask) Run() {
	// 正常状态就是0,等下面产生异常时候会修改成1
	t.Result = 0

	// 这里开始监控tcp的状态,设置一个超时时间
	conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))
	if err != nil {
		log.Println("err = ", err)
		t.Result = 1

	} else {
		// 连接成功就就可以关掉原来的连接了
		conn.Close()
	}

	// 通过chan传递执行状态
	t.Monitor.Message <- t
}

func (m *Monitor) Load(config HadoopConfig) {
	// 启动一个采集定时器
	m.Crond = cron.New()
	// 启动一个协程去更新metric信息
	go m.Update()

	// 将需要的转换成定时任务
	for server, c := range config.Hadoop {
		log.Println("服务器:", server)
		for _, role := range c.Role {
			for _, port := range m.Config.Config[role].Port {
				// 在这里初始化一下。默认值都是0,代表正常的
				m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)
				// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测
				m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})

			}

		}

	}

	m.Crond.Start()
}

func (m *Monitor) Update() {
	// 用来更新metric
	for msg := range m.Message {
		log.Printf("update metric: server=>%s role=>%s port=>%d result=>%d", msg.Addr, msg.Role, msg.Port, msg.Result)
		m.Metrcis[msg.Role].WithLabelValues(msg.Addr, strconv.Itoa(msg.Port)).Set(float64(msg.Result))
	}
}

上面的代码就实现了怎么模块化的注册Prometheus指标

for n, d := range GlobalConfig.Hadoop.Config {
		monitor.Metrcis[n] = prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: d.Name,
				Help: d.Describe,
			},
			[]string{"addr", "port"},
		)
		prometheus.MustRegister(monitor.Metrcis[n])
	}

模块化的设置监控主机,主要就是xxx.hadoop里面的配置,以下面的配置文件作为参考

chengdu:
  dingding: x
  tasks:
    v_report: 
      module: hdfs
      type: CompareSize
      args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455
      crond: 00 09 11 * * *
      desc: 成都{{.Yesterday}} v_report状态
  hadoop:
    172.16.4.29:
      role:
      - DN
      - NN 

172.16.4.29这台主机既是DataNode,又是namenode,我们需要监控其8020,50070,50020,50010,50075端口。代码部分

// 将需要的转换成定时任务
	for server, c := range config.Hadoop {
		log.Println("服务器:", server)
		for _, role := range c.Role {
			for _, port := range m.Config.Config[role].Port {
				// 在这里初始化一下。默认值都是0,代表正常的
				m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)
				// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测
				m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})

			}

		}

	}

日志打印:

2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50075 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50010 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0

Promethues Web:

# HELP hadoop_go_DataNode hadoop DataNode
# TYPE hadoop_go_DataNode gauge
hadoop_go_DataNode{addr="172.16.4.29",port="50010"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50020"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50075"} 0
# HELP hadoop_go_NameNode hadoop NameNode
# TYPE hadoop_go_NameNode gauge
hadoop_go_NameNode{addr="172.16.4.29",port="50070"} 0
hadoop_go_NameNode{addr="172.16.4.29",port="8020"} 0

当异常发生时候,怎么通知到我

我们通过配置prometheus alert规则就能实现。

  - alert: "hadoop_go_datanode"
   # 表达式,不等于0时候
   expr: hadoop_go_DataNode!=0
   # 持续1分钟
   for: 1m  
   labels:
     # 这里是我们自定义的,receiver是一个接受者,我们自己开发的告警平台
     receiver: yunwei
     severity: error
   annotations:
     summary: "成都datanode异常"
     description: "{{$labels.addr}}的{{$labels.port}}状态异常"
     value: "{{ $value }}"

本系列到此就告一段落了,大家有什么想法可以留言交流。每做一个项目我都会有很大的收获。后面如果要更新的话,可能是讲一下监控hadoop的其他信息。

往期回顾

1.使用golang开发并监控hadoop篇(1)hdfs文件夹大小监控
2.使用golang开发并监控hadoop篇(2)hdfs使用情况和定时任务功能
3.使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus

相关文章

类型转换 1、int转string 2、string转int 3、string转float ...
package main import s &quot;strings&quot; import...
类使用:实现一个people中有一个sayhi的方法调用功能,代码如...
html代码: beego代码:
1、读取文件信息: 2、读取文件夹下的所有文件: 3、写入文件...
配置环境:Windows7+推荐IDE:LiteIDEGO下载地址:http:...