2022年8月

image.png
image.png
监控脚本会定时去获取机器内存信息,并发送给kafka,kafka发送给influxdb存储,并通过grafana展示出来

package main

import (
    "fmt"
    "sync"

    "example.go/addrget"
    "example.go/etcd"
    "example.go/ifdb"
    "example.go/kafka"
    "example.go/tailfile"
    moni "example.go/test_moni"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    ip, err := addrget.GetIP() //获取本机ip
    if err != nil {
        logrus.Error("FAILED TO GET IP!!!!", err)
        return
    }

    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_key := fmt.Sprintf(configOBJ.Etcd.Collectkey, ip)

    conf_list, err := etcd.Etcd_getvalue(conf_key) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    /**
    // go tailfile.Tailgetnewconf()
    **/
    go moni.Getcpuinfo()
    go ifdb.Writeapi()
    //go send_to_kafka()
    var wg sync.WaitGroup
    wg.Add(2)
    go send_to_kafka(&wg)
    go etcd.Etcd_watchdog(conf_key, &wg)
    wg.Wait()

}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    logrus.Warn("kafka's send job start!")
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}
package ifdb

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    influxdb2 "github.com/influxdata/influxdb-client-go"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

//生产环境可别这么干哦
var (
    cli   influxdb2.Client
    token = "RTqC84IwjDwG4Lb6G4DCdxT9w3padfGzsQM03gnq6-nt-_O6l-d7UHvGdG96r-sD9fySvNlYAPM0OARVbXzyTA=="
)

//连接数据库
func Conninflux() {
    cli = influxdb2.NewClient("http://127.0.0.1:8086", token)
    logrus.Info("success connect to influxdb")
}

func Queryinf() {
    //org
    queryAPI := cli.QueryAPI("project1")
    // 数据库,范围,指标
    result, err := queryAPI.Query(context.Background(), `from(bucket:"pro1")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "pro1")`) //ctx context.Context, query string
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    }
}

//此处只做演示用所以写死,请根据自己实际情况更改

func Writeapi() {
    writeAPI := cli.WriteAPIBlocking("project1", "pro1")
    topic := "meminfo"
    var getchan chan *sarama.ConsumerMessage
    getchan = make(chan *sarama.ConsumerMessage, 10)
    var list mem.SwapMemoryStat
    go kafka.Consumer(topic, getchan)

    // Use blocking write client for writes to desired bucket
    for msg := range getchan {
        logrus.Info("influxdb get new message")
        //    list := kafka.Kafkainoutput()
        // var list *mem.SwapMemoryStat
        err := json.Unmarshal(msg.Value, &list)
        if err != nil {
            logrus.Error("influx failed to unmarshal info")
        }
        tag := map[string]string{"men": "meminfo"}
        fields := map[string]interface{}{
            "Total ":      int64(list.Total),
            "Used":        int64(list.Used),
            "Free":        int64(list.Free),
            "UsedPercent": float64(list.UsedPercent),
            "Sin ":        int64(list.Sin),
            "Sout":        int64(list.Sout),
            "PgIn":        int64(list.PgIn),
            "PgOut":       int64(list.PgOut),
            "PgFault":     int64(list.PgFault),
        }
        //measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time
        p := influxdb2.NewPoint("meminfo", tag, fields, time.Now())
        //非阻塞异步写入
        err = writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            logrus.Warn("Failed to inject")
        } else {
            logrus.Info("Success")
        }
        // Or write directly line protocol
        // line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
        // writeAPI.WriteRecord(context.Background(), line)
    }
}

监控脚本,这里以meminfo为例

package moni

import (
    "time"

    "example.go/ifdb"
    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

//获取内存信息,这仅仅只是一个演示用的示例
var (
//Infochan chan *mem.SwapMemoryStat
)

func Getcpuinfo() {
    // var list *mem.SwapMemoryStat
    msg := &sarama.ProducerMessage{}
    msg.Topic = "meminfo"
    ifdb.Conninflux()
    for {
        meminfo, _ := mem.SwapMemory()
        //logrus.Info(meminfo.String())
        msg.Value = sarama.StringEncoder(meminfo.String())
        logrus.Info(msg.Value)
        kafka.Kafkareceive(msg)
        time.Sleep(3 * time.Second)
        // data, err := json.MarshalIndent(meminfo, "", "")
        // if err != nil {
        //     logrus.Warn("Failed to marshal meminfo")
        //     continue
        // }
        // err = json.Unmarshal(data, &list)
        // if err != nil {
        //     logrus.Warn("Failed to unmarshal meminfo!")
        //     continue
        // }
        // msg.Value = sarama.StringEncoder(meminfo)
        // kafka.Kafkainsend(meminfo)
        //        ifdb.Writeapi(*meminfo)
    }
}
package global

type Configure_list struct {
    Path  string `json:"path"`
    Topic string `json:"topic"`
}

type Sysinfo struct {
    Ip   string
    Host string
    Data interface{}
}
package addrget

import (
    "net"
    "strings"

    "github.com/sirupsen/logrus"
)

//获取本地ip
func GetIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:8888")
    if err != nil {
        logrus.Error("Fail to get IP")
        return "CAN NOT GET IP", err
    }
    defer conn.Close()

    ipaddr := conn.LocalAddr()
    ip := strings.Split(ipaddr.String(), ":")[0]
    return ip, err
}
package kafka

import (
    "fmt"
    "sync"

    "github.com/Shopify/sarama"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client  sarama.SyncProducer
    sarasend_chan chan *sarama.ProducerMessage
    insend_chan   chan *mem.SwapMemoryStat
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    sarasend_chan = make(chan *sarama.ProducerMessage, size)
    insend_chan = make(chan *mem.SwapMemoryStat, 10)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Consumer(topic string, getchan chan *sarama.ConsumerMessage) { //消费者示例
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        logrus.Error("Kafka failed to create consumer")
        return
    }
    defer consumer.Close()

    partitionlist, err := consumer.Partitions(topic) //根据topic获取partition列表
    if err != nil {
        logrus.Error("Kafka failed to get partitionlist")
        return
    }
    fmt.Println(partitionlist)
    var wg sync.WaitGroup

    wg.Add(len(partitionlist))

    for partition := range partitionlist {
        part_consumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) //针对每一个分区都进行消费
        if err != nil {
            logrus.WithFields(logrus.Fields{"partition": partition}).Error("partition consume error happened!", err)
            return
        }

        go func(sarama.PartitionConsumer) {
            //defer wg.Done() //也许不需要done?
            for msg := range part_consumer.Messages() {
                fmt.Printf("partition:%d\noffset:%d\ntopic:%s\nvalue:%s", msg.Partition, msg.Offset, msg.Topic, msg.Value)
                getchan <- msg
            }

        }(part_consumer)
    }
    wg.Wait()
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    sarasend_chan <- msg
    logrus.Info("kafka get msg", msg)
}

func Kafkaoutput() *sarama.ProducerMessage {
    logrus.Info("kafka start to outout msg")
    return <-sarasend_chan
}
package tailfile

import (
    "context"
    "fmt"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    // change_chan  chan []global.Configure_list
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
    ctx         context.Context
    cancel      context.CancelFunc
}

//监控对象配置初始化
func type_init(path string, topic string) *File_reader {
    ctx, cancel := context.WithCancel(context.Background())
    tobj := File_reader{
        Path:   path,
        Topic:  topic,
        ctx:    ctx,
        cancel: cancel,
    }
    return &tobj
}

func (tobj *File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    logrus.WithFields(logrus.Fields{"FIle": tobj.Path}).Info("Start to monitor file-----")
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        select {
        case <-tobj.ctx.Done():
            logrus.WithFields(logrus.Fields{"File": tobj.Path}).Info("STOP READ FILE:")
            return
        case line, ok := <-tobj.File_handle.Lines:
            if len(line.Text) == 0 {
                continue
            }
            if ok {
                logrus.Info("get message")
                msg.Value = sarama.StringEncoder(line.Text)
                fmt.Println(msg.Value)
                kafka.Kafkareceive(msg)
                logrus.Info("send message")
            } else {
                logrus.Error("failed to get message")
                continue
            }
        }

    }
}

//监控对象初始化
func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}

package tailfile

import (
    "example.go/global"
    "github.com/sirupsen/logrus"
)

//文件监控句柄管理对象
type tailobjmgr struct {
    tailobj     map[string]*File_reader
    tailobjconf []global.Configure_list
    chg_chan    chan []global.Configure_list
}

var (
    objmgr *tailobjmgr
)

//初始化函数
func Tailinit(flist []global.Configure_list) (err error) {
    objmgr = &tailobjmgr{
        tailobj:     make(map[string]*File_reader),
        tailobjconf: flist,
        chg_chan:    make(chan []global.Configure_list),
    }

    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        objmgr.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    logrus.Info("All tailfile inited")

    go objmgr.watch()
    return err
}

//等待更新
func (mobj *tailobjmgr) watch() {
    logrus.Info("tail's watchdog is waiting")
    for {
        listen := <-mobj.chg_chan //等待etcd给他发送新配置
        logrus.WithFields(logrus.Fields{"newconf": listen}).Info("get new configure:")
        for _, item := range listen {
            if mobj.isExist(item) { //如果是存在的,则跳过
                continue
            }
            t_file := type_init(item.Path, item.Topic) //不存在则初始化
            err := t_file.init()                       //初始化
            if err != nil {
                logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
                continue
            }
            mobj.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
            logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
            go t_file.run()
        }
        //把存在于登记列表中,但是在新配置中不存在的任务给停掉
        var flag bool
        logrus.Info("start to check")
        for key, item := range mobj.tailobj {
            flag = false
            for _, itx := range listen {
                if key == itx.Path {
                    logrus.WithFields(logrus.Fields{"file": key}).Info("get file")
                    flag = true
                }
            }
            if !flag {
                logrus.WithFields(logrus.Fields{"file": key}).Info("File watch will stop")
                item.cancel()
                delete(mobj.tailobj, key) //一定要记得删除,不然被停止的文件下次加载会被当成已存在的而被跳过无法初始化
            }
        }
        logrus.Info("check over")
    }

}

func Sendnewconf(newconf []global.Configure_list) { //让etcd发送新配置过来
    objmgr.chg_chan <- newconf
}

func (mobj *tailobjmgr) isExist(conf global.Configure_list) bool { //判断该项文件是否已在登记的目录里
    _, ok := mobj.tailobj[conf.Path]
    return ok
}
package etcd

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    "example.go/global"
    "example.go/tailfile"
    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

// type Configure_list struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []global.Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    stu := get.Kvs[0]
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        return
    }
    return list, err
}

//监控日志配置项是否发生变化
func Etcd_watchdog(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    watchdog := client.Watch(ctx, key) //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
    var newlist []global.Configure_list
    logrus.Info("watchdog stand by ")
    for re := range watchdog {
        logrus.Info("etcd watchdog catchs new message!")
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            //原本此处设计动作为del时会清空,但是实测channel接收不到del这个信号,暂弃,如需清空请传入"[{\"path\": \"\",\"topic\":\"\"}]"或随便传入一个不存在的地址
            // if ev.Type == clientv3.EventTypeDelete { //storagepb.Event_EventType DELETE = 1
            //     logrus.Warn("RECEIVED DELETE OPERATION")
            //     tailfile.Sendnewconf(newlist)
            //     logrus.Warn("YOU DELETE ALL CONFIGURES")
            //     return
            // }
            err := json.Unmarshal(ev.Kv.Value, &newlist)
            if err != nil {
                logrus.Error("failed to unmarshal now conf!")
                continue
            }
            logrus.Info("trying to send new configure")
            tailfile.Sendnewconf(newlist)
            logrus.Info("send new configure successfully")
        }
    }
}

// td@tddeMacBook-Air ~ % etcdctl put testk1 "[{\"path\": \"/Users/td/Documents/blog/prod.log\",\"topic\":\"prod\"},{\"path\": \"/Users/td/Documents/blog/readme.log\",\"topic\":\"readme\"}]"
// OK
// td@tddeMacBook-Air ~ % etcdctl get testk1
// testk1
// [{"path": "/Users/td/Documents/blog/prod.log","topic":"prod"},{"path": "/Users/td/Documents/blog/readme.log","topic":"readme"}]
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "etcd_key_%s_testk1"

当etcd发送变更的时候,会根据新配置来新增、删除文件监控,并实现通过部署机的ip来区分加载不同的etcd简直对
example.go

package main

import (
    "fmt"
    "sync"

    "example.go/addrget"
    "example.go/etcd"
    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    ip, err := addrget.GetIP() //获取本机ip
    if err != nil {
        logrus.Error("FAILED TO GET IP!!!!", err)
        return
    }

    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_key := fmt.Sprintf(configOBJ.Etcd.Collectkey, ip)

    conf_list, err := etcd.Etcd_getvalue(conf_key) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    /**
    // go tailfile.Tailgetnewconf()
    **/

    go send_to_kafka()
    var wg sync.WaitGroup
    wg.Add(1)
    //    go send_to_kafka()
    go etcd.Etcd_watchdog(conf_key, &wg)
    wg.Wait()

}

func send_to_kafka() { //给kafka发送数据
    //    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

getaddr.go

package addrget

import (
    "net"
    "strings"

    "github.com/sirupsen/logrus"
)

//获取本地ip
func GetIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:8888")
    if err != nil {
        logrus.Error("Fail to get IP")
        return "CAN NOT GET IP", err
    }
    defer conn.Close()

    ipaddr := conn.LocalAddr()
    ip := strings.Split(ipaddr.String(), ":")[0]
    return ip, err
}

kafka.go

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}

// func Send_kafka() {

// }

tailread.go

package tailfile

import (
    "context"
    "fmt"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    // change_chan  chan []global.Configure_list
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
    ctx         context.Context
    cancel      context.CancelFunc
}

//监控对象配置初始化
func type_init(path string, topic string) *File_reader {
    ctx, cancel := context.WithCancel(context.Background())
    tobj := File_reader{
        Path:   path,
        Topic:  topic,
        ctx:    ctx,
        cancel: cancel,
    }
    return &tobj
}

func (tobj *File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    logrus.WithFields(logrus.Fields{"FIle": tobj.Path}).Info("Start to monitor file-----")
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        select {
        case <-tobj.ctx.Done():
            logrus.WithFields(logrus.Fields{"File": tobj.Path}).Info("STOP READ FILE:")
            return
        case line, ok := <-tobj.File_handle.Lines:
            if len(line.Text) == 0 {
                continue
            }
            if ok {
                logrus.Info("get message")
                msg.Value = sarama.StringEncoder(line.Text)
                fmt.Println(msg.Value)
                kafka.Kafkareceive(msg)
                logrus.Info("send message")
            } else {
                logrus.Error("failed to get message")
                continue
            }
        }

    }
}

//监控对象初始化
func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}

tailmanager.go

package tailfile

import (
    "example.go/global"
    "github.com/sirupsen/logrus"
)

//文件监控句柄管理对象
type tailobjmgr struct {
    tailobj     map[string]*File_reader
    tailobjconf []global.Configure_list
    chg_chan    chan []global.Configure_list
}

var (
    objmgr *tailobjmgr
)

//初始化函数
func Tailinit(flist []global.Configure_list) (err error) {
    objmgr = &tailobjmgr{
        tailobj:     make(map[string]*File_reader),
        tailobjconf: flist,
        chg_chan:    make(chan []global.Configure_list),
    }

    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        objmgr.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    logrus.Info("All tailfile inited")

    go objmgr.watch()
    return err
}

//等待更新
func (mobj *tailobjmgr) watch() {
    logrus.Info("tail's watchdog is waiting")
    for {
        listen := <-mobj.chg_chan //等待etcd给他发送新配置
        logrus.WithFields(logrus.Fields{"newconf": listen}).Info("get new configure:")
        for _, item := range listen {
            if mobj.isExist(item) { //如果是存在的,则跳过
                continue
            }
            t_file := type_init(item.Path, item.Topic) //不存在则初始化
            err := t_file.init()                       //初始化
            if err != nil {
                logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
                continue
            }
            mobj.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
            logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
            go t_file.run()
        }
        //把存在于登记列表中,但是在新配置中不存在的任务给停掉
        var flag bool
        logrus.Info("start to check")
        for key, item := range mobj.tailobj {
            flag = false
            for _, itx := range listen {
                if key == itx.Path {
                    logrus.WithFields(logrus.Fields{"file": key}).Info("get file")
                    flag = true
                }
            }
            if !flag {
                logrus.WithFields(logrus.Fields{"file": key}).Info("File watch will stop")
                item.cancel()
                delete(mobj.tailobj, key) //一定要记得删除,不然被停止的文件下次加载会被当成已存在的而被跳过无法初始化
            }
        }
        logrus.Info("check over")
    }

}

func Sendnewconf(newconf []global.Configure_list) { //让etcd发送新配置过来
    objmgr.chg_chan <- newconf
}

func (mobj *tailobjmgr) isExist(conf global.Configure_list) bool { //判断该项文件是否已在登记的目录里
    _, ok := mobj.tailobj[conf.Path]
    return ok
}

etcd.go:注,暂不支持etcd del操作,会收不到信号

package etcd

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    "example.go/global"
    "example.go/tailfile"
    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

// type Configure_list struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []global.Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    stu := get.Kvs[0]
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        return
    }
    return list, err
}

//监控日志配置项是否发生变化
func Etcd_watchdog(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    watchdog := client.Watch(ctx, key) //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
    var newlist []global.Configure_list
    logrus.Info("watchdog stand by ")
    for re := range watchdog {
        logrus.Info("etcd watchdog catchs new message!")
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            //原本此处设计动作为del时会清空,但是实测channel接收不到del这个信号,暂弃,如需清空请传入"[{\"path\": \"\",\"topic\":\"\"}]"或随便传入一个不存在的地址
            // if ev.Type == clientv3.EventTypeDelete { //storagepb.Event_EventType DELETE = 1
            //     logrus.Warn("RECEIVED DELETE OPERATION")
            //     tailfile.Sendnewconf(newlist)
            //     logrus.Warn("YOU DELETE ALL CONFIGURES")
            //     return
            // }
            err := json.Unmarshal(ev.Kv.Value, &newlist)
            if err != nil {
                logrus.Error("failed to unmarshal now conf!")
                continue
            }
            logrus.Info("trying to send new configure")
            tailfile.Sendnewconf(newlist)
            logrus.Info("send new configure successfully")
        }
    }
}

// td@tddeMacBook-Air ~ % etcdctl put testk1 "[{\"path\": \"/Users/td/Documents/blog/prod.log\",\"topic\":\"prod\"},{\"path\": \"/Users/td/Documents/blog/readme.log\",\"topic\":\"readme\"}]"
// OK
// td@tddeMacBook-Air ~ % etcdctl get testk1
// testk1
// [{"path": "/Users/td/Documents/blog/prod.log","topic":"prod"},{"path": "/Users/td/Documents/blog/readme.log","topic":"readme"}]

configure.ini

[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "etcd_key_%s_testk1"

ini读取基本的初始化配置,etcd通过key去获取对应的json字符串,tail解析其中的路径和对应的Kafka topic,监控对应的文件并把最新的内容发送到对应的topic
example.go

package main

import (
    "fmt"
    "sync"

    "example.go/etcd"
    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_list, err := etcd.Etcd_getvalue(configOBJ.Etcd.Collectkey) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")

    var wg sync.WaitGroup
    wg.Add(1)
    go send_to_kafka(&wg)
    wg.Wait()

}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

tailread.go

package tailfile

import (
    "fmt"

    "example.go/etcd"
    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
}

func type_init(path string, topic string) *File_reader {
    tobj := File_reader{
        Path:  path,
        Topic: topic,
    }
    return &tobj
}

func (tobj File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        line, ok := <-tobj.File_handle.Lines
        if len(line.Text) == 0 {
            continue
        }
        if ok {
            logrus.Info("get message")
            msg.Value = sarama.StringEncoder(line.Text)
            fmt.Println(msg.Value)
            kafka.Kafkareceive(msg)
            logrus.Info("send message")
        } else {
            logrus.Error("failed to get message")
            continue
        }
    }
}

func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}
func Tailinit(flist []etcd.Configure_list) (err error) {
    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    return err
}

kafka.go

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}

etcd.go

package etcd

import (
    "context"
    "encoding/json"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

type Configure_list struct {
    Path  string `json:"path"`
    Topic string `json:"topic"`
}

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    // for _, item := range get.Kvs {
    //     fmt.Printf("key:%s  value:%s\n", item.Key, item.Value)
    //     err = json.Unmarshal(item.Value, &list)
    //     if err != nil {
    //         logrus.Error("failed to unmarshal  ", err)
    //         return []configure_list{}, err
    //     }
    // }
    stu := get.Kvs[0]
    // fmt.Println(string(stu.Value))
    // fmt.Printf("%#v\n", stu.Value)
    // stu.Value = bytes.TrimPrefix(stu.Value, []byte("\xef\xbb\xbf"))
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        // if e, ok := err.(*json.SyntaxError); ok {
        //     logrus.Printf("syntax error at byte offset %d", e.Offset)
        // }
        // logrus.Printf("sakura response: %q", stu.Value)
        return
    }
    return list, err
}

configure.ini

[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "testk1"

运行卡夫卡消费者,对对应文件进行修改,查看效果,测试结果正常

go run ./example.go                                          
&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}, Etcd:main.Etcd{Address:[]string{"127.0.0.1:2379"}, Collectkey:"testk1"}}
DEBU[0000] kafka init success                           
INFO[0000] etcd init success                            
[{/Users/td/Documents/blog/prod.log prod} {/Users/td/Documents/blog/readme.log readme}]
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/prod.log
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/readme.log
DEBU[0000] file loaded successfully                      filename=./readme.log
INFO[0010] get message                                  
444
INFO[0010] send message                                 
INFO[0010] send message success                          offset=1 partition=0
INFO[0017] get message                                  
555
INFO[0017] send message                                 
INFO[0017] send message success                          offset=0 partition=0
INFO[0101] get message                                  
666
INFO[0101] send message                                 
INFO[0101] send message success                          offset=2 partition=0
INFO[0123] get message                                  
777
INFO[0123] send message                                 
INFO[0123] send message success                          offset=1 partition=0


kafka-console-consumer --bootstrap-server 127.0.0.1:909
2 --topic readme
666

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic prod
777

example.go(做main

package main

import (
    "fmt"
    "sync"

    "example.go/kafka"
    "example.go/tailfile"
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = tailfile.Tailinit(configOBJ.Configpath) //tail初始化
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")

    var wg sync.WaitGroup
    wg.Add(2)
    go run(&wg)
    go send_to_kafka(&wg)
    wg.Wait()

}

func run(wg *sync.WaitGroup) { //从tail中接收读取到的信息并发送给kafka的channel
    defer wg.Done()
    msg := &sarama.ProducerMessage{}
    msg.Topic = "test2"
    for {
        line, ok := <-tailfile.Tail_context.Lines
        if len(line.Text) == 0 {
            continue
        }
        if ok {
            logrus.Info("get message")
            msg.Value = sarama.StringEncoder(line.Text)
            fmt.Println(msg.Value)
            kafka.Kafkareceive(msg)
            logrus.Info("send message")
        } else {
            logrus.Error("failed to get message")
            continue
        }
    }
}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

//go run ./example.go
//&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}}
//DEBU[0000] kafka init success
//DEBU[0000] file loaded successfully                      filename=./readme.log
//INFO[0004] get message
//666
//INFO[0004] send message
//INFO[0004] send message success                          offset=1 partition=0
//INFO[0012] get message
//777
//INFO[0012] send message
//INFO[0012] send message success                          offset=2 partition=0
//INFO[0035] get message
//888
//INFO[0035] send message
//INFO[0035] send message success                          offset=3 partition=0
//INFO[0039] get message
//999
//INFO[0039] send message
//INFO[0039] send message success                          offset=4 partition=0

tailread.go

package tailfile

import (
    "fmt"

    "github.com/nxadm/tail"
)

var (
    Tail_context *tail.Tail
    //Context_listen chan *tail.Line
)

func Tailinit(filename string) (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }

    Tail_context, err = tail.TailFile(filename, config)
    if err != nil {
        fmt.Println("error happens:", err)
    }
    return err
}

kafka.go
package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}
configure.ini
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test2运行消费者测试成功

ES 是一款开源的分布式搜索引擎

下载,解压,修改config/elasticsearch.yml里面根security两个地方的选项为false,运行bin/elasticsearch,测试9200端口
https://www.ruanyifeng.com/blog/2017/08/elasticsearch.html

curl -XGET 'http://localhost:9200/' -H 'Content-Type: application/json'
{
  "name" : "tddeMacBook-Air.local",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "2uSPQGtXRu-xIIzQVLVcAw",
  "version" : {
    "number" : "8.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "f56126089ca4db89b631901ad7cce0a8e10e2fe5",
    "build_date" : "2022-08-19T19:23:42.954591481Z",
    "build_snapshot" : false,
    "lucene_version" : "9.3.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

这样就说明成功了

概念

es本质上是一个分布式数据库,一台机器上一个实例为一个node

Index:
ES会把数据分词后建立索引,使用反向索引来搜索数据

查看所有索引

curl -X GET 'http://localhost:9200/_cat/indices?v'

Document:
index里面的单条记录被称为文档,多个文档组成一个index,同一个index下的document最好拥有相同构造以保证搜索效率

Type:
Type是对index里面document的虚拟分组

基础操作

https://blog.csdn.net/UbuntuTouch/article/details/98871531

增:索引会自动创建
image.png

curl -XPUT 'http://localhost:9200/twitter/_doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good?"
}'
 
curl -XPUT 'http://localhost:9200/twitter/_doc/2?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'
 
curl -XPUT 'http://localhost:9200/twitter/_doc/3?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "elastic",
    "post_date": "2010-01-15T01:46:38",
    "message": "Building the site, should be kewl"
}'


精准:

curl -XGET 'http://localhost:9200/twitter/_doc/1?pretty=true'
{
  "_index" : "twitter",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "kimchy",
    "post_date" : "2009-11-15T13:12:00",
    "message" : "Trying out Elasticsearch, so far so good?"
  }
}

搜索

curl -XGET 'http://localhost:9200/twitter/_search?q=user:kimchy&pretty=true'
{
  "took" : 69,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.4700036,
    "hits" : [
      {
        "_index" : "twitter",
        "_id" : "1",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T13:12:00",
          "message" : "Trying out Elasticsearch, so far so good?"
        }
      },
      {
        "_index" : "twitter",
        "_id" : "2",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T14:12:12",
          "message" : "Another tweet, will it be indexed?"
        }
      }
    ]
  }
}

JSON格式的查询

curl -XGET 'http://localhost:9200/twitter/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.4700036,
    "hits" : [
      {
        "_index" : "twitter",
        "_id" : "1",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T13:12:00",
          "message" : "Trying out Elasticsearch, so far so good?"
        }
      },
      {
        "_index" : "twitter",
        "_id" : "2",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T14:12:12",
          "message" : "Another tweet, will it be indexed?"
        }
      }
    ]
  }
}

删除

curl -X DELETE 'localhost:9200/index_name'

kibana

kibana是es配套的一个可视化工具,需要下载和es相同版本号的kibana才能连上

下载解压,修改config/kibana.yml,改下连的es的地址,另外有个语言选项可改可不改

bin/kibana启动,访问5601端口,配置索引
image.png

/elastic/v7

go get -u github.com/olivere/elastic/v7

这是个第三方库,据说官方库不咋好用。应该下载对应版本的,但是我es下高了....不过好在经过测试,基础的一些功能还是兼容的

package EScli

import (
    "context"
    "fmt"

    "github.com/olivere/elastic/v7"
    "github.com/sirupsen/logrus"
)

type Twinfo struct {
    User      string `json:"user"`
    Post_date string `json:"post_date"`
    Message   string `json:"message"`
}

func Connes() {
    cli, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"), elastic.SetSniff(false)) // 这里有很多可选项,按需选择

    if err != nil {
        logrus.Error("Failed to connect ES", err)
    }
    var msg = Twinfo{User: "zhangsan", Post_date: "2022-08-22T17:12:00", Message: "Vice versa"}
    put, err := cli.Index().Index("twitter").BodyJson(msg).Do(context.Background())
    if err != nil {
        logrus.Error("Failed to inject ES")
    }
    fmt.Printf("index:%s\ntype:%s\nid:%s\n", put.Index, put.Type, put.Id)
}