ini读取基本的初始化配置,etcd通过key去获取对应的json字符串,tail解析其中的路径和对应的Kafka topic,监控对应的文件并把最新的内容发送到对应的topic
example.go

package main

import (
    "fmt"
    "sync"

    "example.go/etcd"
    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_list, err := etcd.Etcd_getvalue(configOBJ.Etcd.Collectkey) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")

    var wg sync.WaitGroup
    wg.Add(1)
    go send_to_kafka(&wg)
    wg.Wait()

}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

tailread.go

package tailfile

import (
    "fmt"

    "example.go/etcd"
    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
}

func type_init(path string, topic string) *File_reader {
    tobj := File_reader{
        Path:  path,
        Topic: topic,
    }
    return &tobj
}

func (tobj File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        line, ok := <-tobj.File_handle.Lines
        if len(line.Text) == 0 {
            continue
        }
        if ok {
            logrus.Info("get message")
            msg.Value = sarama.StringEncoder(line.Text)
            fmt.Println(msg.Value)
            kafka.Kafkareceive(msg)
            logrus.Info("send message")
        } else {
            logrus.Error("failed to get message")
            continue
        }
    }
}

func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}
func Tailinit(flist []etcd.Configure_list) (err error) {
    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    return err
}

kafka.go

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}

etcd.go

package etcd

import (
    "context"
    "encoding/json"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

type Configure_list struct {
    Path  string `json:"path"`
    Topic string `json:"topic"`
}

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    // for _, item := range get.Kvs {
    //     fmt.Printf("key:%s  value:%s\n", item.Key, item.Value)
    //     err = json.Unmarshal(item.Value, &list)
    //     if err != nil {
    //         logrus.Error("failed to unmarshal  ", err)
    //         return []configure_list{}, err
    //     }
    // }
    stu := get.Kvs[0]
    // fmt.Println(string(stu.Value))
    // fmt.Printf("%#v\n", stu.Value)
    // stu.Value = bytes.TrimPrefix(stu.Value, []byte("\xef\xbb\xbf"))
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        // if e, ok := err.(*json.SyntaxError); ok {
        //     logrus.Printf("syntax error at byte offset %d", e.Offset)
        // }
        // logrus.Printf("sakura response: %q", stu.Value)
        return
    }
    return list, err
}

configure.ini

[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "testk1"

运行卡夫卡消费者,对对应文件进行修改,查看效果,测试结果正常

go run ./example.go                                          
&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}, Etcd:main.Etcd{Address:[]string{"127.0.0.1:2379"}, Collectkey:"testk1"}}
DEBU[0000] kafka init success                           
INFO[0000] etcd init success                            
[{/Users/td/Documents/blog/prod.log prod} {/Users/td/Documents/blog/readme.log readme}]
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/prod.log
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/readme.log
DEBU[0000] file loaded successfully                      filename=./readme.log
INFO[0010] get message                                  
444
INFO[0010] send message                                 
INFO[0010] send message success                          offset=1 partition=0
INFO[0017] get message                                  
555
INFO[0017] send message                                 
INFO[0017] send message success                          offset=0 partition=0
INFO[0101] get message                                  
666
INFO[0101] send message                                 
INFO[0101] send message success                          offset=2 partition=0
INFO[0123] get message                                  
777
INFO[0123] send message                                 
INFO[0123] send message success                          offset=1 partition=0


kafka-console-consumer --bootstrap-server 127.0.0.1:909
2 --topic readme
666

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic prod
777

标签: none

评论已关闭