go kafka:samara
此处使用另外安装的zookeeper,不使用kafka自带的
启动zookeeper,修改datadir,修改server.properties里的listen
listeners=PLAINTEXT://127.0.0.1:9092
bin/kafka-server-start /usr/local/etc/kafka/server.properties
go get github.com/Shopify/sarama //mac、linux直接装就行,windows涉及到gcc会很麻烦,不推荐使用win编程
简单实例
连接kafka并发送消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig() //配置
config.Producer.RequiredAcks = sarama.WaitForAll //ack应答模式为all
config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
config.Producer.Return.Successes = true //Successes channel 返回值
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) //使用参数创建一个SyncProducer
defer client.Close()
if err != nil {
fmt.Println("broker connection has something wrong happened")
return
}
message := &sarama.ProducerMessage{} //producermassage是一个结构体,用于给生产者发送信息
message.Topic = "test1"
message.Value = sarama.StringEncoder("2022-08-17") //StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
partition, offset, err := client.SendMessage(message) //发送信息
if err != nil {
fmt.Printf("message send wrong :%v\n", err)
return
}
fmt.Printf("partition:%v \noffset:%v\n", partition, offset)
}
简单跑几下
go run ./example.go
partition:0
offset:0
go run ./example.go
partition:0
offset:1
go run ./example.go
partition:0
offset:2
kafka
[2022-08-17 14:17:30,498] INFO Created log for partition test1-0 in /usr/local/var/lib/kafka-logs/test1-0 with properties {} (kafka.log.LogManager)
[2022-08-17 14:17:30,503] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2022-08-17 14:17:30,505] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
在日志存放目录确实看到了新生成的目录/usr/local/var/lib/kafka-logs/test1-0,其下生成了索引和数据文件
ls /usr/local/var/lib/kafka-logs/test1-0
00000000000000000000.index leader-epoch-checkpoint
00000000000000000000.log partition.metadata
00000000000000000000.timeindex
我们运行起一个消费者,然后再用脚本发送消息,就可以看到发过来的信息
bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1
2022-08-17
2022-08-17
2022-08-17
评论已关闭