此处使用另外安装的zookeeper,不使用kafka自带的

启动zookeeper,修改datadir,修改server.properties里的listen

listeners=PLAINTEXT://127.0.0.1:9092

bin/kafka-server-start /usr/local/etc/kafka/server.properties

go get github.com/Shopify/sarama //mac、linux直接装就行,windows涉及到gcc会很麻烦,不推荐使用win编程

简单实例

连接kafka并发送消息

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) //使用参数创建一个SyncProducer
    defer client.Close()
    if err != nil {
        fmt.Println("broker connection has something wrong happened")
        return
    }

    message := &sarama.ProducerMessage{} //producermassage是一个结构体,用于给生产者发送信息
    message.Topic = "test1"
    message.Value = sarama.StringEncoder("2022-08-17") //StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

    partition, offset, err := client.SendMessage(message) //发送信息
    if err != nil {
        fmt.Printf("message send wrong :%v\n", err)
        return
    }
    fmt.Printf("partition:%v \noffset:%v\n", partition, offset)
}

简单跑几下

go run ./example.go 
partition:0 
offset:0
go run ./example.go
partition:0 
offset:1
go run ./example.go
partition:0 
offset:2

kafka

[2022-08-17 14:17:30,498] INFO Created log for partition test1-0 in /usr/local/var/lib/kafka-logs/test1-0 with properties {} (kafka.log.LogManager)
[2022-08-17 14:17:30,503] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2022-08-17 14:17:30,505] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)

在日志存放目录确实看到了新生成的目录/usr/local/var/lib/kafka-logs/test1-0,其下生成了索引和数据文件

ls /usr/local/var/lib/kafka-logs/test1-0
00000000000000000000.index    leader-epoch-checkpoint
00000000000000000000.log    partition.metadata
00000000000000000000.timeindex

我们运行起一个消费者,然后再用脚本发送消息,就可以看到发过来的信息

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1
2022-08-17
2022-08-17
2022-08-17

标签: none

评论已关闭