实例:实时读取日志并发送kafka
example.go(做main
package main
import (
"fmt"
"sync"
"example.go/kafka"
"example.go/tailfile"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"gopkg.in/ini.v1"
)
type Kafka struct {
Address string `ini:"addr"`
Topic string `ini:"topic"`
Size int64 `ini:"chan_size"`
}
type Tsdb struct {
Address string `ini:"address"`
Configpath string `ini:"confpath"`
}
type Configure struct { //映射用结构
Kafka `ini:"kafka"`
Tsdb `ini:"tsdb"`
}
func main() {
logrus.SetLevel(logrus.DebugLevel)
var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
cfg, err := ini.Load("./configure.ini")
if err != nil {
logrus.Error("failed to load configure:", err)
return
}
// err = ini.MapTo(configOBJ,cfg)
// err := ini.MapTo(configOBJ, "./configure.ini")
err = cfg.MapTo(configOBJ) //映射
if err != nil {
logrus.Error("failed to reflect:", err)
return
}
fmt.Printf("%#v\n", configOBJ)
err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
if err != nil {
logrus.Error("kafka init failed:", err)
return
}
logrus.Debug("kafka init success")
err = tailfile.Tailinit(configOBJ.Configpath) //tail初始化
if err != nil {
logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
return
}
logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")
var wg sync.WaitGroup
wg.Add(2)
go run(&wg)
go send_to_kafka(&wg)
wg.Wait()
}
func run(wg *sync.WaitGroup) { //从tail中接收读取到的信息并发送给kafka的channel
defer wg.Done()
msg := &sarama.ProducerMessage{}
msg.Topic = "test2"
for {
line, ok := <-tailfile.Tail_context.Lines
if len(line.Text) == 0 {
continue
}
if ok {
logrus.Info("get message")
msg.Value = sarama.StringEncoder(line.Text)
fmt.Println(msg.Value)
kafka.Kafkareceive(msg)
logrus.Info("send message")
} else {
logrus.Error("failed to get message")
continue
}
}
}
func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
defer wg.Done()
defer kafka.Kafka_client.Close()
for {
partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
if err != nil {
logrus.Error("kafka send failed")
} else {
logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
}
}
}
//go run ./example.go
//&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}}
//DEBU[0000] kafka init success
//DEBU[0000] file loaded successfully filename=./readme.log
//INFO[0004] get message
//666
//INFO[0004] send message
//INFO[0004] send message success offset=1 partition=0
//INFO[0012] get message
//777
//INFO[0012] send message
//INFO[0012] send message success offset=2 partition=0
//INFO[0035] get message
//888
//INFO[0035] send message
//INFO[0035] send message success offset=3 partition=0
//INFO[0039] get message
//999
//INFO[0039] send message
//INFO[0039] send message success offset=4 partition=0
tailread.go
package tailfile
import (
"fmt"
"github.com/nxadm/tail"
)
var (
Tail_context *tail.Tail
//Context_listen chan *tail.Line
)
func Tailinit(filename string) (err error) {
config := tail.Config{
ReOpen: true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
MustExist: false, //允许文件不存在
Poll: true, //轮询
}
Tail_context, err = tail.TailFile(filename, config)
if err != nil {
fmt.Println("error happens:", err)
}
return err
}
kafka.go
package kafka
import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)
var (
Kafka_client sarama.SyncProducer
send_chan chan *sarama.ProducerMessage
)
func Kafkainit(address []string, size int64) (err error) {
config := sarama.NewConfig() //配置
config.Producer.RequiredAcks = sarama.WaitForAll //ack应答模式为all
config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
config.Producer.Return.Successes = true //Successes channel 返回值
send_chan = make(chan *sarama.ProducerMessage, size)
Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
// defer kafka_client.Close()
if err != nil {
logrus.Error("broker connection has something wrong happened:", err)
return err
}
return err
}
func Kafkareceive(msg *sarama.ProducerMessage) {
send_chan <- msg
}
func Kafkaoutput() *sarama.ProducerMessage {
return <-send_chan
}
configure.ini
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000
[tsdb]
address = 127.0.0.1
confpath = "./readme.log"
bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test2运行消费者测试成功