matsudaira 发布的文章

go get gopkg.in/ini.v1
ini一个非常方便的能够配置配置文件的包
官方手册

go get github.com/sirupsen/logrus
logrus是一个日志库

https://zhuanlan.zhihu.com/p/105759117

简单示例

读取配置文件

package main

import (
    "fmt"

    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

func main() {
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    kafka_addr := cfg.Section("kafka").Key("addr").String()
    kafka_port := cfg.Section("kafka").Key("port").String()
    fmt.Println("kafka addr:", kafka_addr)
    fmt.Println("kafka_port:", kafka_port)
}

//configure.ini
//[kafka]
//addr = 127.0.0.1
//port = 9092

结构体映射

package main

import (
    "fmt"

    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Port    string `ini:"port"`
}

type Tsdb struct {
    Address string `ini:"address"`
}

type Configure struct {
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
}

func main() {
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ)
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)
}

//&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1", Port:"9092"}, Tsdb:main.Tsdb{Address:"127.0.0.1"}}

格式化输出

//kafka.go
package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
)

func Kafkainit(address []string) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

//tailread.go
package tailfile

import (
    "fmt"

    "github.com/nxadm/tail"
)

var (
    tail_context *tail.Tail
)

func Tailinit(filename string) (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }

    tail_context, err = tail.TailFile(filename, config)
    if err != nil {
        fmt.Println("error happens:", err)
    }
    return err
}


//example.go
package main

import (
    "fmt"

    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Configure struct {
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ)
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}) //类型转换

    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }

    logrus.Debug("kafka init success")

    err = tailfile.Tailinit(configOBJ.Configpath)

    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }

    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")
}


//go run ./example.go
//&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1"}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./configure.ini"}}
//DEBU[0000] kafka init success                           
//DEBU[0000] file loaded successfully                      filename=./configure.ini

tail读取文件

go get github.com/nxadm/tail/...

go get github.com/hpcloud/tail/...因为依赖包改了地址而这里没改,所以直接get会出现问题,使用上面这个作为替代
package main

import (
    "fmt"
    "time"

    "github.com/nxadm/tail"
)

func main() {

    filename := "./text.txt"
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,//允许文件不存在
        Poll:      true, //轮询
    }

    tails, err := tail.TailFile(filename, config)
    if err != nil {
        fmt.Println("error happens:", err)
    }
    var msg *tail.Line
    var flag bool

    for {
        msg, flag = <-tails.Lines
        if !flag {
            fmt.Println("tail can read anything from ", tails.Filename)
            time.After(1 * time.Second)
        } else {
            fmt.Println(msg.Text)
        }
    }
}

此处使用另外安装的zookeeper,不使用kafka自带的

启动zookeeper,修改datadir,修改server.properties里的listen

listeners=PLAINTEXT://127.0.0.1:9092

bin/kafka-server-start /usr/local/etc/kafka/server.properties

go get github.com/Shopify/sarama //mac、linux直接装就行,windows涉及到gcc会很麻烦,不推荐使用win编程

简单实例

连接kafka并发送消息

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) //使用参数创建一个SyncProducer
    defer client.Close()
    if err != nil {
        fmt.Println("broker connection has something wrong happened")
        return
    }

    message := &sarama.ProducerMessage{} //producermassage是一个结构体,用于给生产者发送信息
    message.Topic = "test1"
    message.Value = sarama.StringEncoder("2022-08-17") //StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

    partition, offset, err := client.SendMessage(message) //发送信息
    if err != nil {
        fmt.Printf("message send wrong :%v\n", err)
        return
    }
    fmt.Printf("partition:%v \noffset:%v\n", partition, offset)
}

简单跑几下

go run ./example.go 
partition:0 
offset:0
go run ./example.go
partition:0 
offset:1
go run ./example.go
partition:0 
offset:2

kafka

[2022-08-17 14:17:30,498] INFO Created log for partition test1-0 in /usr/local/var/lib/kafka-logs/test1-0 with properties {} (kafka.log.LogManager)
[2022-08-17 14:17:30,503] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2022-08-17 14:17:30,505] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)

在日志存放目录确实看到了新生成的目录/usr/local/var/lib/kafka-logs/test1-0,其下生成了索引和数据文件

ls /usr/local/var/lib/kafka-logs/test1-0
00000000000000000000.index    leader-epoch-checkpoint
00000000000000000000.log    partition.metadata
00000000000000000000.timeindex

我们运行起一个消费者,然后再用脚本发送消息,就可以看到发过来的信息

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1
2022-08-17
2022-08-17
2022-08-17

基础架构

kafka是一个开源的分布式消息系统,具有高吞吐、低延迟、高容错、高并发等特点

avatar


Producer:消息的生产者

Cluster:集群,由多台broker组成

Broker:kafka的实例

Topic:消息的主题,信息在kafka上以Topic为分类进行存储,一个Broker上可以有多个Topic

Partition:Topic的分区,每个Partition的内容不同,起负载作用,提高吞吐量。具体表现为一个文件夹,下面又包含多个segment,segment下面又有index,log,timeindex等文件,log是存放文件的地方,另外两个是索引

Replication:同一个Partition在不同的Broker上存有副本,这些副本当中由一个leader和多个follower组成,当一个leader挂了,会从剩下的follower当中选出新的leader,副本不能存在于同一台机器

Consumer:消费者

Consumer group:由多个消费者组成的组,同一个partition只能被同一组中的一个消费者消费

生产模式

avatar

生产者产生数据后,只会发给leader,follower上的备份信息需要从leader机上pull。

1.生产者从集群获取leader信息
2.生产者把消息发送给leader
3.leader将消息写入磁盘
4.followers从leader获取数据
5.follower消息落盘并给leader发个ack
6.leader给producer发送ack

如果topic存在多个partition,那么按照以下情形选择写入的分区:
1.如有指定,则写入指定的分区
2.如无指定,但是设置了数据的key,则根据key的hash来选取
3.如果以上都没有,则轮询

ack应答机制:生产者在给kafka发送数据的时候,可以选择0,1,all三种参数
0:生产者发送数据后不需要等到集群返回
1:leader应答了就可以继续
all:所有都ack了才可以继续

如往不存在的topic发送数据,则kafka会自动创建topic,partition和replication默认都是1

avatar
partition是一个有序不可变的消息记录集合,当有新的消息会被写到partition的末尾,每个消息都有一个唯一的标识符offset。但是要注意在不同的partition之间卡夫卡不能保证消息的顺序
kafka可以设置一个保留期限,超过期限的数据将会被清除,或者也可以基于大小来限制

avatar
利用segment+offset来寻找数据:
先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

https://blog.csdn.net/wanghailan1818/article/details/125166287

context包是官方定义的一个上下文包,用于在goroutine之间传递上下文用,其采用树状结构,根节点退出后子goroutine也会跟着退出

type Context interface {
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct{}

    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error

    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)

    // 获取 key 对应的 value
    Value(key interface{}) interface{}
}

创建

context.Background 是上下文的默认值,一般作为根节点在main中使用

context.TODO 应该只在不确定应该使用哪种上下文时使用

以上两种类型想要发挥作用还得搭配各种with方法来说使用

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

withcancel

返回父context和cancelfunc

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background()) //根

    son_ctx, _ := context.WithCancel(ctx) //子

    var wg sync.WaitGroup

    wg.Add(2)

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("ALL DONE")
                fmt.Println(ctx.Err())
                wg.Done()
                return
            default:
                fmt.Println("continue")
            }
        }
    }()

    go func() {
        for {
            select {
            case <-son_ctx.Done(): //父收到退出信号,子也会跟着退出
                fmt.Println("son die too")
                wg.Done()
                return
            }
        }
    }()

    time.Sleep(time.Duration(2) * time.Second)
    cancel()
    wg.Wait()
}


//continue
//son die too
//ALL DONE
//context canceled

withTimeout/withDeadline

传入一个父context和超时时间

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

withValue

func WithValue(parent Context, key, val any) Context

传入的键必须是可比较类型,且不能是string等内置类型

package main

import (
    "context"
    "fmt"
)

func main() {
    type favContextKey string

    f := func(ctx context.Context, k favContextKey) {
        if v := ctx.Value(k); v != nil {
            fmt.Println("found value:", v)
            return
        }
        fmt.Println("key not found:", k)
    }

    k := favContextKey("language")
    ctx := context.WithValue(context.Background(), k, "Go")

    f(ctx, k)
    f(ctx, favContextKey("color"))

}

//Output:

//found value: Go
//key not found: color