分类 云原生 下的文章

example.go(做main

package main

import (
    "fmt"
    "sync"

    "example.go/kafka"
    "example.go/tailfile"
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = tailfile.Tailinit(configOBJ.Configpath) //tail初始化
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")

    var wg sync.WaitGroup
    wg.Add(2)
    go run(&wg)
    go send_to_kafka(&wg)
    wg.Wait()

}

func run(wg *sync.WaitGroup) { //从tail中接收读取到的信息并发送给kafka的channel
    defer wg.Done()
    msg := &sarama.ProducerMessage{}
    msg.Topic = "test2"
    for {
        line, ok := <-tailfile.Tail_context.Lines
        if len(line.Text) == 0 {
            continue
        }
        if ok {
            logrus.Info("get message")
            msg.Value = sarama.StringEncoder(line.Text)
            fmt.Println(msg.Value)
            kafka.Kafkareceive(msg)
            logrus.Info("send message")
        } else {
            logrus.Error("failed to get message")
            continue
        }
    }
}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

//go run ./example.go
//&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}}
//DEBU[0000] kafka init success
//DEBU[0000] file loaded successfully                      filename=./readme.log
//INFO[0004] get message
//666
//INFO[0004] send message
//INFO[0004] send message success                          offset=1 partition=0
//INFO[0012] get message
//777
//INFO[0012] send message
//INFO[0012] send message success                          offset=2 partition=0
//INFO[0035] get message
//888
//INFO[0035] send message
//INFO[0035] send message success                          offset=3 partition=0
//INFO[0039] get message
//999
//INFO[0039] send message
//INFO[0039] send message success                          offset=4 partition=0

tailread.go

package tailfile

import (
    "fmt"

    "github.com/nxadm/tail"
)

var (
    Tail_context *tail.Tail
    //Context_listen chan *tail.Line
)

func Tailinit(filename string) (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }

    Tail_context, err = tail.TailFile(filename, config)
    if err != nil {
        fmt.Println("error happens:", err)
    }
    return err
}

kafka.go
package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}
configure.ini
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test2运行消费者测试成功

ES 是一款开源的分布式搜索引擎

下载,解压,修改config/elasticsearch.yml里面根security两个地方的选项为false,运行bin/elasticsearch,测试9200端口
https://www.ruanyifeng.com/blog/2017/08/elasticsearch.html

curl -XGET 'http://localhost:9200/' -H 'Content-Type: application/json'
{
  "name" : "tddeMacBook-Air.local",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "2uSPQGtXRu-xIIzQVLVcAw",
  "version" : {
    "number" : "8.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "f56126089ca4db89b631901ad7cce0a8e10e2fe5",
    "build_date" : "2022-08-19T19:23:42.954591481Z",
    "build_snapshot" : false,
    "lucene_version" : "9.3.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

这样就说明成功了

概念

es本质上是一个分布式数据库,一台机器上一个实例为一个node

Index:
ES会把数据分词后建立索引,使用反向索引来搜索数据

查看所有索引

curl -X GET 'http://localhost:9200/_cat/indices?v'

Document:
index里面的单条记录被称为文档,多个文档组成一个index,同一个index下的document最好拥有相同构造以保证搜索效率

Type:
Type是对index里面document的虚拟分组

基础操作

https://blog.csdn.net/UbuntuTouch/article/details/98871531

增:索引会自动创建
image.png

curl -XPUT 'http://localhost:9200/twitter/_doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good?"
}'
 
curl -XPUT 'http://localhost:9200/twitter/_doc/2?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'
 
curl -XPUT 'http://localhost:9200/twitter/_doc/3?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "elastic",
    "post_date": "2010-01-15T01:46:38",
    "message": "Building the site, should be kewl"
}'


精准:

curl -XGET 'http://localhost:9200/twitter/_doc/1?pretty=true'
{
  "_index" : "twitter",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "kimchy",
    "post_date" : "2009-11-15T13:12:00",
    "message" : "Trying out Elasticsearch, so far so good?"
  }
}

搜索

curl -XGET 'http://localhost:9200/twitter/_search?q=user:kimchy&pretty=true'
{
  "took" : 69,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.4700036,
    "hits" : [
      {
        "_index" : "twitter",
        "_id" : "1",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T13:12:00",
          "message" : "Trying out Elasticsearch, so far so good?"
        }
      },
      {
        "_index" : "twitter",
        "_id" : "2",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T14:12:12",
          "message" : "Another tweet, will it be indexed?"
        }
      }
    ]
  }
}

JSON格式的查询

curl -XGET 'http://localhost:9200/twitter/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.4700036,
    "hits" : [
      {
        "_index" : "twitter",
        "_id" : "1",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T13:12:00",
          "message" : "Trying out Elasticsearch, so far so good?"
        }
      },
      {
        "_index" : "twitter",
        "_id" : "2",
        "_score" : 0.4700036,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T14:12:12",
          "message" : "Another tweet, will it be indexed?"
        }
      }
    ]
  }
}

删除

curl -X DELETE 'localhost:9200/index_name'

kibana

kibana是es配套的一个可视化工具,需要下载和es相同版本号的kibana才能连上

下载解压,修改config/kibana.yml,改下连的es的地址,另外有个语言选项可改可不改

bin/kibana启动,访问5601端口,配置索引
image.png

/elastic/v7

go get -u github.com/olivere/elastic/v7

这是个第三方库,据说官方库不咋好用。应该下载对应版本的,但是我es下高了....不过好在经过测试,基础的一些功能还是兼容的

package EScli

import (
    "context"
    "fmt"

    "github.com/olivere/elastic/v7"
    "github.com/sirupsen/logrus"
)

type Twinfo struct {
    User      string `json:"user"`
    Post_date string `json:"post_date"`
    Message   string `json:"message"`
}

func Connes() {
    cli, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"), elastic.SetSniff(false)) // 这里有很多可选项,按需选择

    if err != nil {
        logrus.Error("Failed to connect ES", err)
    }
    var msg = Twinfo{User: "zhangsan", Post_date: "2022-08-22T17:12:00", Message: "Vice versa"}
    put, err := cli.Index().Index("twitter").BodyJson(msg).Do(context.Background())
    if err != nil {
        logrus.Error("Failed to inject ES")
    }
    fmt.Printf("index:%s\ntype:%s\nid:%s\n", put.Index, put.Type, put.Id)
}

https://docs.influxdata.com/influxdb/v2.4/organizations/

mac:
brew install influxdb
influxd run
配置文件: /usr/local/etc/influxdb2/config.yml

概念

influxdb是一个时序数据库

org:用户空间
image.png
database/bucket:数据库
measurement:数据表
point:数据行
measurement:指标名
time:时间
field:指标值
tags:属性

series:一系列数据的集合,在一个database里面,retention policy\measurement\tags sets完全相同的数据属于一个series,在物理存储上连续存储

go -influxdb 2.x

get github.com/influxdata/influxdb-client-go

https://github.com/influxdata/influxdb-client-go

package main

import (
    "context"
    "fmt"
    "time"

    influxdb2 "github.com/influxdata/influxdb-client-go"
    "github.com/sirupsen/logrus"
)

//生产环境可别这么干哦
var (
    token = "RTqC84IwjDwG4Lb6G4DCdxT9w3padfGzsQM03gnq6-nt-_O6l-d7UHvGdG96r-sD9fySvNlYAPM0OARVbXzyTA=="
)

func conninflux() influxdb2.Client {
    cli := influxdb2.NewClient("http://127.0.0.1:8086", token)
    return cli
}

func queryinf(cli influxdb2.Client) {
    //org
    queryAPI := cli.QueryAPI("project1")
    // 数据库,范围,指标
    result, err := queryAPI.Query(context.Background(), `from(bucket:"pro1")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "pro1")`) //ctx context.Context, query string
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    }
}

func writeapi(cli influxdb2.Client) {
    // Use blocking write client for writes to desired bucket
    writeAPI := cli.WriteAPIBlocking("project1", "pro1")
    //measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time
    //指标名,标签,指标值,时间
    p := influxdb2.NewPoint("pro1",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    //非阻塞异步写入
    err := writeAPI.WritePoint(context.Background(), p)
    if err != nil {
        logrus.Warn("Failed to inject")
    } else {
        logrus.Info("Success")
    }
    // Or write directly line protocol
    // line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    // writeAPI.WriteRecord(context.Background(), line)
}

func main() {
    cli := conninflux()
    writeapi(cli)
    queryinf(cli)
}

//go run ./influxdb.go
//INFO[0000] Success                                      
//table: col{0: name: result, datatype: string, defaultValue: _result, group: false},col{1: name: table, datatype: long, defaultValue: , group: false},col{2: name: _start, datatype: dateTime:RFC3339, defaultValue: , group: true},col{3: name: _stop, datatype: dateTime:RFC3339, defaultValue: , group: true},col{4: name: _time, datatype: dateTime:RFC3339, defaultValue: , group: false},col{5: name: _value, datatype: double, defaultValue: , group: false},col{6: name: _field, datatype: string, defaultValue: , group: true},col{7: name: _measurement, datatype: string, defaultValue: , group: true},col{8: name: unit, datatype: string, defaultValue: , group: true}
//row: _start:2022-08-24 04:55:23.749402 +0000 UTC,_stop:2022-08-24 05:55:23.749402 +0000 UTC,_time:2022-08-24 05:44:43.119763 +0000 UTC,_value:24.5,_field:avg,_measurement:pro1,result:_result,table:0,unit:temperature
//row: result:_result,table:0,_start:2022-08-24 04:55:23.749402 +0000 UTC,_field:avg,_measurement:pro1,unit:temperature,_stop:2022-08-24 05:55:23.749402 +0000 UTC,_time:2022-08-24 05:55:23.728327 +0000 UTC,_value:24.5
//table: col{0: name: result, datatype: string, defaultValue: _result, group: false},col{1: name: table, datatype: long, defaultValue: , group: false},col{2: name: _start, datatype: dateTime:RFC3339, defaultValue: , group: true},col{3: name: _stop, datatype: dateTime:RFC3339, defaultValue: , group: true},col{4: name: _time, datatype: dateTime:RFC3339, defaultValue: , group: false},col{5: name: _value, datatype: long, defaultValue: , group: false},col{6: name: _field, datatype: string, defaultValue: , group: true},col{7: name: _measurement, datatype: string, defaultValue: , group: true},col{8: name: unit, datatype: string, defaultValue: , group: true}
//row: _stop:2022-08-24 05:55:23.749402 +0000 UTC,_time:2022-08-24 05:44:43.119763 +0000 UTC,_value:45,_measurement:pro1,result:_result,_start:2022-08-24 04:55:23.749402 +0000 UTC,_field:max,unit:temperature,table:1
//row: _measurement:pro1,_start:2022-08-24 04:55:23.749402 +0000 UTC,_time:2022-08-24 05:55:23.728327 +0000 UTC,_stop:2022-08-24 05:55:23.749402 +0000 UTC,_value:45,_field:max,unit:temperature,result:_result,table:1

image.png

grafana

https://www.jianshu.com/p/072b9a8a3d1a
https://grafana.com/docs/

/usr/local/opt/grafana/bin/grafana-server --config /usr/local/etc/grafana/grafana.ini --homepath /usr/local/opt/grafana/share/grafana --packaging=brew cfg:default.paths.logs=/usr/local/var/log/grafana cfg:default.paths.data=/usr/local/var/lib/grafana cfg:default.paths.plugins=/usr/local/var/lib/grafana/plugins

使用influxdb2.x版本需要注意,无法使用账号密码来连接数据库,language选择flux后使用token和org来连接

image.png

项目地址:https://github.com/etcd-io/etcd

简介

etcd是go语言开发的,基于键值对存储的分布式可靠存储系统,常被用于共享配置、服务的注册、发现,知名例子有k8s

特点:

  • 简单易使用
  • 基于SSL通信
  • 通过Raft算法保证一致性
  • 完全复制,集群中每个节点都可以使用完整存档
  • 快速,单实例每秒10000次写入

Raft算法保证一致性的原理

etcd集群拥有选举功能,当节点发生故障的时候可以通过选举来保障服务的可用性
image.png
服务注册与消息通知:

  • 消费者在启动的时候先主动去获取一次配置
  • 消费者在etcd上注册一个watcher
  • 当有新的内容,etcd都会主动去通知订阅者

etcd中使用了Watcher机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现方式:

  • 不同系统都在etcd上对同一个目录进行注册,同时设置Watcher观测该目录的变化(如果对子目录的变化也有需要,可以设置递归模式)
  • 当某个系统更新了etcd的目录,那么设置了Watcher的系统就会收到通知,并作出相应处理。

数据更新流程
image.png

当客户端对etcd发起请求的时候,如果etcd不是leader的状态而是follower,follower则会将请求转发leader; 如果是leader后, 会对其进行预检查,检查(配额、限速、鉴权【判断请求是否合法】、包大小【需要小于1.5M,过大则会拒绝】)。
如果请求本身是合法的,会将请求转发给KVServer处理。
KVserver一致性模块进行数据处理,一致性模块是基于raft协议实现的,这时候的数据本身是处于unstable状态。
当leader该数据处理unstable状态后,会通过rpc通知其他follower也来同步该数据,并且leader本身会将数据同步到日志模块【wal日志, wal日志通过fsync落盘到磁盘中】。而其他follow在同步该数据的时候,本身完成的是步骤3和数据同步到日志模块,follower一致性模块数据变成commited状态,当完成了这些后通过上次rpc返回响应体给leader。
leader在收到了超过半数集群本身确认后,更新MatchIndex, 一致性模块中数据本身由unstable变化成commited状态。这时候通过MVCC模块进行状态机的写入,将数据同步到treeIndex【会更新modified版本[当前版本号], generations信息[创建的版本,当前版本数,过往的所有版本号]】。再通过BoltDB落盘到磁盘中。这时候一致性模块数据由commited变化为applied状态。【在这里如果没有要求数据强一致性,弱一致性的话,那么数据在commited状态就认为数据已经同步完成了】。
再通过heatbeat将数据同步到follower中MVCC模块中。最终完成数据的一致性。如下图所示。 【如果follower比leader落后好几个版本,leader会通过headbeat带到follower进行同步】。

基础

mac install \use

etcdctl endpoint health

etcdctl --endpoints=http://127.0.0.1:2379 get xxxx

etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"

etcdctl --endpoints=http://127.0.0.1:2379 del testk1

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

func main() {
    client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("failed to connect etcd:", err)
        return
    }
    defer client.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

    _, err = client.Put(ctx, "testk1", "testv1") //上传键值对
    if err != nil {
        logrus.Error("failed to put key", err)
    }
    cancel()

    ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)

    get, err := client.Get(ctx, "testk1") //获取键值对

    for _, kv := range get.Kvs {
        fmt.Printf("key:%s \nvalue:%s", kv.Key, kv.Value)
    }
    cancel()

}

//go run ./example.go
//key:testk1 
//value:testv1

Watch

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

func main() {
    client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("failed to connect etcd:", err)
        return
    }
    defer client.Close()

    ctx, cancel := context.WithCancel(context.Background())

    watchdog := client.Watch(ctx, "testk1") //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value

    for re := range watchdog {
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
    defer cancel()
//go run ./example.go 
    
//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"
//OK

//Operation type is:PUT
//key:testk1
//value:value1

//etcdctl --endpoints=http://127.0.0.1:2379 del testk1

//Operation type is:DELETE
//key:testk1
//value:

//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "val1"

//Operation type is:PUT
//key:testk1
//value:val1

此处使用另外安装的zookeeper,不使用kafka自带的

启动zookeeper,修改datadir,修改server.properties里的listen

listeners=PLAINTEXT://127.0.0.1:9092

bin/kafka-server-start /usr/local/etc/kafka/server.properties

go get github.com/Shopify/sarama //mac、linux直接装就行,windows涉及到gcc会很麻烦,不推荐使用win编程

简单实例

连接kafka并发送消息

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) //使用参数创建一个SyncProducer
    defer client.Close()
    if err != nil {
        fmt.Println("broker connection has something wrong happened")
        return
    }

    message := &sarama.ProducerMessage{} //producermassage是一个结构体,用于给生产者发送信息
    message.Topic = "test1"
    message.Value = sarama.StringEncoder("2022-08-17") //StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

    partition, offset, err := client.SendMessage(message) //发送信息
    if err != nil {
        fmt.Printf("message send wrong :%v\n", err)
        return
    }
    fmt.Printf("partition:%v \noffset:%v\n", partition, offset)
}

简单跑几下

go run ./example.go 
partition:0 
offset:0
go run ./example.go
partition:0 
offset:1
go run ./example.go
partition:0 
offset:2

kafka

[2022-08-17 14:17:30,498] INFO Created log for partition test1-0 in /usr/local/var/lib/kafka-logs/test1-0 with properties {} (kafka.log.LogManager)
[2022-08-17 14:17:30,503] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2022-08-17 14:17:30,505] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)

在日志存放目录确实看到了新生成的目录/usr/local/var/lib/kafka-logs/test1-0,其下生成了索引和数据文件

ls /usr/local/var/lib/kafka-logs/test1-0
00000000000000000000.index    leader-epoch-checkpoint
00000000000000000000.log    partition.metadata
00000000000000000000.timeindex

我们运行起一个消费者,然后再用脚本发送消息,就可以看到发过来的信息

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1
2022-08-17
2022-08-17
2022-08-17