实例4:监控脚本通过kafka存入influxdb
监控脚本会定时去获取机器内存信息,并发送给kafka,kafka发送给influxdb存储,并通过grafana展示出来
package main
import (
"fmt"
"sync"
"example.go/addrget"
"example.go/etcd"
"example.go/ifdb"
"example.go/kafka"
"example.go/tailfile"
moni "example.go/test_moni"
"github.com/sirupsen/logrus"
"gopkg.in/ini.v1"
)
type Kafka struct {
Address string `ini:"addr"`
Topic string `ini:"topic"`
Size int64 `ini:"chan_size"`
}
type Tsdb struct {
Address string `ini:"address"`
Configpath string `ini:"confpath"`
}
type Etcd struct {
Address []string `ini:"address"`
Collectkey string `ini:"collectkey"`
}
type Configure struct { //映射用结构
Kafka `ini:"kafka"`
Tsdb `ini:"tsdb"`
Etcd `ini:"etcd"`
}
func main() {
ip, err := addrget.GetIP() //获取本机ip
if err != nil {
logrus.Error("FAILED TO GET IP!!!!", err)
return
}
logrus.SetLevel(logrus.DebugLevel)
var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
cfg, err := ini.Load("./configure.ini")
if err != nil {
logrus.Error("failed to load configure:", err)
return
}
// err = ini.MapTo(configOBJ,cfg)
// err := ini.MapTo(configOBJ, "./configure.ini")
err = cfg.MapTo(configOBJ) //映射
if err != nil {
logrus.Error("failed to reflect:", err)
return
}
fmt.Printf("%#v\n", configOBJ)
err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
if err != nil {
logrus.Error("kafka init failed:", err)
return
}
logrus.Debug("kafka init success")
err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
if err != nil {
logrus.Error("failed to init etcd")
return
}
logrus.Info("etcd init success")
conf_key := fmt.Sprintf(configOBJ.Etcd.Collectkey, ip)
conf_list, err := etcd.Etcd_getvalue(conf_key) //通过etcd获取value
if err != nil {
logrus.Error("faild to get value from etcd ", err)
return
}
// fmt.Printf("list:%v", list)
fmt.Println(conf_list)
err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
if err != nil {
logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
return
}
/**
// go tailfile.Tailgetnewconf()
**/
go moni.Getcpuinfo()
go ifdb.Writeapi()
//go send_to_kafka()
var wg sync.WaitGroup
wg.Add(2)
go send_to_kafka(&wg)
go etcd.Etcd_watchdog(conf_key, &wg)
wg.Wait()
}
func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
logrus.Warn("kafka's send job start!")
defer wg.Done()
defer kafka.Kafka_client.Close()
for {
partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
if err != nil {
logrus.Error("kafka send failed")
} else {
logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
}
}
}
package ifdb
import (
"context"
"encoding/json"
"fmt"
"time"
"example.go/kafka"
"github.com/Shopify/sarama"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/shirou/gopsutil/mem"
"github.com/sirupsen/logrus"
)
//生产环境可别这么干哦
var (
cli influxdb2.Client
token = "RTqC84IwjDwG4Lb6G4DCdxT9w3padfGzsQM03gnq6-nt-_O6l-d7UHvGdG96r-sD9fySvNlYAPM0OARVbXzyTA=="
)
//连接数据库
func Conninflux() {
cli = influxdb2.NewClient("http://127.0.0.1:8086", token)
logrus.Info("success connect to influxdb")
}
func Queryinf() {
//org
queryAPI := cli.QueryAPI("project1")
// 数据库,范围,指标
result, err := queryAPI.Query(context.Background(), `from(bucket:"pro1")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "pro1")`) //ctx context.Context, query string
if err == nil {
// Use Next() to iterate over query result lines
for result.Next() {
// Observe when there is new grouping key producing new table
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// read result
fmt.Printf("row: %s\n", result.Record().String())
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
}
}
//此处只做演示用所以写死,请根据自己实际情况更改
func Writeapi() {
writeAPI := cli.WriteAPIBlocking("project1", "pro1")
topic := "meminfo"
var getchan chan *sarama.ConsumerMessage
getchan = make(chan *sarama.ConsumerMessage, 10)
var list mem.SwapMemoryStat
go kafka.Consumer(topic, getchan)
// Use blocking write client for writes to desired bucket
for msg := range getchan {
logrus.Info("influxdb get new message")
// list := kafka.Kafkainoutput()
// var list *mem.SwapMemoryStat
err := json.Unmarshal(msg.Value, &list)
if err != nil {
logrus.Error("influx failed to unmarshal info")
}
tag := map[string]string{"men": "meminfo"}
fields := map[string]interface{}{
"Total ": int64(list.Total),
"Used": int64(list.Used),
"Free": int64(list.Free),
"UsedPercent": float64(list.UsedPercent),
"Sin ": int64(list.Sin),
"Sout": int64(list.Sout),
"PgIn": int64(list.PgIn),
"PgOut": int64(list.PgOut),
"PgFault": int64(list.PgFault),
}
//measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time
p := influxdb2.NewPoint("meminfo", tag, fields, time.Now())
//非阻塞异步写入
err = writeAPI.WritePoint(context.Background(), p)
if err != nil {
logrus.Warn("Failed to inject")
} else {
logrus.Info("Success")
}
// Or write directly line protocol
// line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
// writeAPI.WriteRecord(context.Background(), line)
}
}
监控脚本,这里以meminfo为例
package moni
import (
"time"
"example.go/ifdb"
"example.go/kafka"
"github.com/Shopify/sarama"
"github.com/shirou/gopsutil/mem"
"github.com/sirupsen/logrus"
)
//获取内存信息,这仅仅只是一个演示用的示例
var (
//Infochan chan *mem.SwapMemoryStat
)
func Getcpuinfo() {
// var list *mem.SwapMemoryStat
msg := &sarama.ProducerMessage{}
msg.Topic = "meminfo"
ifdb.Conninflux()
for {
meminfo, _ := mem.SwapMemory()
//logrus.Info(meminfo.String())
msg.Value = sarama.StringEncoder(meminfo.String())
logrus.Info(msg.Value)
kafka.Kafkareceive(msg)
time.Sleep(3 * time.Second)
// data, err := json.MarshalIndent(meminfo, "", "")
// if err != nil {
// logrus.Warn("Failed to marshal meminfo")
// continue
// }
// err = json.Unmarshal(data, &list)
// if err != nil {
// logrus.Warn("Failed to unmarshal meminfo!")
// continue
// }
// msg.Value = sarama.StringEncoder(meminfo)
// kafka.Kafkainsend(meminfo)
// ifdb.Writeapi(*meminfo)
}
}
package global
type Configure_list struct {
Path string `json:"path"`
Topic string `json:"topic"`
}
type Sysinfo struct {
Ip string
Host string
Data interface{}
}
package addrget
import (
"net"
"strings"
"github.com/sirupsen/logrus"
)
//获取本地ip
func GetIP() (string, error) {
conn, err := net.Dial("udp", "8.8.8.8:8888")
if err != nil {
logrus.Error("Fail to get IP")
return "CAN NOT GET IP", err
}
defer conn.Close()
ipaddr := conn.LocalAddr()
ip := strings.Split(ipaddr.String(), ":")[0]
return ip, err
}
package kafka
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
"github.com/shirou/gopsutil/mem"
"github.com/sirupsen/logrus"
)
var (
Kafka_client sarama.SyncProducer
sarasend_chan chan *sarama.ProducerMessage
insend_chan chan *mem.SwapMemoryStat
)
func Kafkainit(address []string, size int64) (err error) {
config := sarama.NewConfig() //配置
config.Producer.RequiredAcks = sarama.WaitForAll //ack应答模式为all
config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
config.Producer.Return.Successes = true //Successes channel 返回值
sarasend_chan = make(chan *sarama.ProducerMessage, size)
insend_chan = make(chan *mem.SwapMemoryStat, 10)
Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
// defer kafka_client.Close()
if err != nil {
logrus.Error("broker connection has something wrong happened:", err)
return err
}
return err
}
func Consumer(topic string, getchan chan *sarama.ConsumerMessage) { //消费者示例
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
logrus.Error("Kafka failed to create consumer")
return
}
defer consumer.Close()
partitionlist, err := consumer.Partitions(topic) //根据topic获取partition列表
if err != nil {
logrus.Error("Kafka failed to get partitionlist")
return
}
fmt.Println(partitionlist)
var wg sync.WaitGroup
wg.Add(len(partitionlist))
for partition := range partitionlist {
part_consumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) //针对每一个分区都进行消费
if err != nil {
logrus.WithFields(logrus.Fields{"partition": partition}).Error("partition consume error happened!", err)
return
}
go func(sarama.PartitionConsumer) {
//defer wg.Done() //也许不需要done?
for msg := range part_consumer.Messages() {
fmt.Printf("partition:%d\noffset:%d\ntopic:%s\nvalue:%s", msg.Partition, msg.Offset, msg.Topic, msg.Value)
getchan <- msg
}
}(part_consumer)
}
wg.Wait()
}
func Kafkareceive(msg *sarama.ProducerMessage) {
sarasend_chan <- msg
logrus.Info("kafka get msg", msg)
}
func Kafkaoutput() *sarama.ProducerMessage {
logrus.Info("kafka start to outout msg")
return <-sarasend_chan
}
package tailfile
import (
"context"
"fmt"
"example.go/kafka"
"github.com/Shopify/sarama"
"github.com/nxadm/tail"
"github.com/sirupsen/logrus"
)
var (
Tail_context *tail.Tail
// change_chan chan []global.Configure_list
//Context_listen chan *tail.Line
)
type File_reader struct {
Path string
Topic string
File_handle *tail.Tail
ctx context.Context
cancel context.CancelFunc
}
//监控对象配置初始化
func type_init(path string, topic string) *File_reader {
ctx, cancel := context.WithCancel(context.Background())
tobj := File_reader{
Path: path,
Topic: topic,
ctx: ctx,
cancel: cancel,
}
return &tobj
}
func (tobj *File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
// defer wg.Done()
logrus.WithFields(logrus.Fields{"FIle": tobj.Path}).Info("Start to monitor file-----")
msg := &sarama.ProducerMessage{}
msg.Topic = tobj.Topic
for {
select {
case <-tobj.ctx.Done():
logrus.WithFields(logrus.Fields{"File": tobj.Path}).Info("STOP READ FILE:")
return
case line, ok := <-tobj.File_handle.Lines:
if len(line.Text) == 0 {
continue
}
if ok {
logrus.Info("get message")
msg.Value = sarama.StringEncoder(line.Text)
fmt.Println(msg.Value)
kafka.Kafkareceive(msg)
logrus.Info("send message")
} else {
logrus.Error("failed to get message")
continue
}
}
}
}
//监控对象初始化
func (tobj *File_reader) init() (err error) {
config := tail.Config{
ReOpen: true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
MustExist: false, //允许文件不存在
Poll: true, //轮询
}
tobj.File_handle, err = tail.TailFile(tobj.Path, config)
return err
}
package tailfile
import (
"example.go/global"
"github.com/sirupsen/logrus"
)
//文件监控句柄管理对象
type tailobjmgr struct {
tailobj map[string]*File_reader
tailobjconf []global.Configure_list
chg_chan chan []global.Configure_list
}
var (
objmgr *tailobjmgr
)
//初始化函数
func Tailinit(flist []global.Configure_list) (err error) {
objmgr = &tailobjmgr{
tailobj: make(map[string]*File_reader),
tailobjconf: flist,
chg_chan: make(chan []global.Configure_list),
}
for _, item := range flist {
t_file := type_init(item.Path, item.Topic) //导入基本信息
err = t_file.init() //初始化
if err != nil {
logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
continue
}
objmgr.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
go t_file.run()
}
logrus.Info("All tailfile inited")
go objmgr.watch()
return err
}
//等待更新
func (mobj *tailobjmgr) watch() {
logrus.Info("tail's watchdog is waiting")
for {
listen := <-mobj.chg_chan //等待etcd给他发送新配置
logrus.WithFields(logrus.Fields{"newconf": listen}).Info("get new configure:")
for _, item := range listen {
if mobj.isExist(item) { //如果是存在的,则跳过
continue
}
t_file := type_init(item.Path, item.Topic) //不存在则初始化
err := t_file.init() //初始化
if err != nil {
logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
continue
}
mobj.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
go t_file.run()
}
//把存在于登记列表中,但是在新配置中不存在的任务给停掉
var flag bool
logrus.Info("start to check")
for key, item := range mobj.tailobj {
flag = false
for _, itx := range listen {
if key == itx.Path {
logrus.WithFields(logrus.Fields{"file": key}).Info("get file")
flag = true
}
}
if !flag {
logrus.WithFields(logrus.Fields{"file": key}).Info("File watch will stop")
item.cancel()
delete(mobj.tailobj, key) //一定要记得删除,不然被停止的文件下次加载会被当成已存在的而被跳过无法初始化
}
}
logrus.Info("check over")
}
}
func Sendnewconf(newconf []global.Configure_list) { //让etcd发送新配置过来
objmgr.chg_chan <- newconf
}
func (mobj *tailobjmgr) isExist(conf global.Configure_list) bool { //判断该项文件是否已在登记的目录里
_, ok := mobj.tailobj[conf.Path]
return ok
}
package etcd
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"example.go/global"
"example.go/tailfile"
"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
)
var (
client *clientv3.Client
)
// type Configure_list struct {
// Path string `json:"path"`
// Topic string `json:"topic"`
// }
// type LogEntry struct {
// Path string `json:"path"`
// Topic string `json:"topic"`
// }
func Etcd_ini(address []string) (err error) {
client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
if err != nil {
logrus.Error("Failed to connect etcd", err)
return err
}
return err
}
func Etcd_getvalue(key string) (list []global.Configure_list, err error) { // 通过键获取值,返回json格式
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
get, err := client.Get(ctx, key)
if err != nil {
logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
return
}
if len(get.Kvs) == 0 {
logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
return
}
stu := get.Kvs[0]
err = json.Unmarshal(stu.Value, &list)
if err != nil {
logrus.Error("failed to unmarshal ", err)
return
}
return list, err
}
//监控日志配置项是否发生变化
func Etcd_watchdog(key string, wg *sync.WaitGroup) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchdog := client.Watch(ctx, key) //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
var newlist []global.Configure_list
logrus.Info("watchdog stand by ")
for re := range watchdog {
logrus.Info("etcd watchdog catchs new message!")
for _, ev := range re.Events {
fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//原本此处设计动作为del时会清空,但是实测channel接收不到del这个信号,暂弃,如需清空请传入"[{\"path\": \"\",\"topic\":\"\"}]"或随便传入一个不存在的地址
// if ev.Type == clientv3.EventTypeDelete { //storagepb.Event_EventType DELETE = 1
// logrus.Warn("RECEIVED DELETE OPERATION")
// tailfile.Sendnewconf(newlist)
// logrus.Warn("YOU DELETE ALL CONFIGURES")
// return
// }
err := json.Unmarshal(ev.Kv.Value, &newlist)
if err != nil {
logrus.Error("failed to unmarshal now conf!")
continue
}
logrus.Info("trying to send new configure")
tailfile.Sendnewconf(newlist)
logrus.Info("send new configure successfully")
}
}
}
// td@tddeMacBook-Air ~ % etcdctl put testk1 "[{\"path\": \"/Users/td/Documents/blog/prod.log\",\"topic\":\"prod\"},{\"path\": \"/Users/td/Documents/blog/readme.log\",\"topic\":\"readme\"}]"
// OK
// td@tddeMacBook-Air ~ % etcdctl get testk1
// testk1
// [{"path": "/Users/td/Documents/blog/prod.log","topic":"prod"},{"path": "/Users/td/Documents/blog/readme.log","topic":"readme"}]
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000
[tsdb]
address = 127.0.0.1
confpath = "./readme.log"
[etcd]
address = "127.0.0.1:2379"
collectkey = "etcd_key_%s_testk1"
评论已关闭