go-etcd
项目地址:https://github.com/etcd-io/etcd
简介
etcd是go语言开发的,基于键值对存储的分布式可靠存储系统,常被用于共享配置、服务的注册、发现,知名例子有k8s
特点:
- 简单易使用
- 基于SSL通信
- 通过Raft算法保证一致性
- 完全复制,集群中每个节点都可以使用完整存档
- 快速,单实例每秒10000次写入
etcd集群拥有选举功能,当节点发生故障的时候可以通过选举来保障服务的可用性
服务注册与消息通知:
- 消费者在启动的时候先主动去获取一次配置
- 消费者在etcd上注册一个watcher
- 当有新的内容,etcd都会主动去通知订阅者
etcd中使用了Watcher机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现方式:
- 不同系统都在etcd上对同一个目录进行注册,同时设置Watcher观测该目录的变化(如果对子目录的变化也有需要,可以设置递归模式)
- 当某个系统更新了etcd的目录,那么设置了Watcher的系统就会收到通知,并作出相应处理。
数据更新流程
当客户端对etcd发起请求的时候,如果etcd不是leader的状态而是follower,follower则会将请求转发leader; 如果是leader后, 会对其进行预检查,检查(配额、限速、鉴权【判断请求是否合法】、包大小【需要小于1.5M,过大则会拒绝】)。
如果请求本身是合法的,会将请求转发给KVServer处理。
KVserver一致性模块进行数据处理,一致性模块是基于raft协议实现的,这时候的数据本身是处于unstable状态。
当leader该数据处理unstable状态后,会通过rpc通知其他follower也来同步该数据,并且leader本身会将数据同步到日志模块【wal日志, wal日志通过fsync落盘到磁盘中】。而其他follow在同步该数据的时候,本身完成的是步骤3和数据同步到日志模块,follower一致性模块数据变成commited状态,当完成了这些后通过上次rpc返回响应体给leader。
leader在收到了超过半数集群本身确认后,更新MatchIndex, 一致性模块中数据本身由unstable变化成commited状态。这时候通过MVCC模块进行状态机的写入,将数据同步到treeIndex【会更新modified版本[当前版本号], generations信息[创建的版本,当前版本数,过往的所有版本号]】。再通过BoltDB落盘到磁盘中。这时候一致性模块数据由commited变化为applied状态。【在这里如果没有要求数据强一致性,弱一致性的话,那么数据在commited状态就认为数据已经同步完成了】。
再通过heatbeat将数据同步到follower中MVCC模块中。最终完成数据的一致性。如下图所示。 【如果follower比leader落后好几个版本,leader会通过headbeat带到follower进行同步】。
基础
etcdctl endpoint health
etcdctl --endpoints=http://127.0.0.1:2379 get xxxx
etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"
etcdctl --endpoints=http://127.0.0.1:2379 del testk1
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
)
func main() {
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
if err != nil {
logrus.Error("failed to connect etcd:", err)
return
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
_, err = client.Put(ctx, "testk1", "testv1") //上传键值对
if err != nil {
logrus.Error("failed to put key", err)
}
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
get, err := client.Get(ctx, "testk1") //获取键值对
for _, kv := range get.Kvs {
fmt.Printf("key:%s \nvalue:%s", kv.Key, kv.Value)
}
cancel()
}
//go run ./example.go
//key:testk1
//value:testv1
Watch
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
)
func main() {
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
if err != nil {
logrus.Error("failed to connect etcd:", err)
return
}
defer client.Close()
ctx, cancel := context.WithCancel(context.Background())
watchdog := client.Watch(ctx, "testk1") //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
for re := range watchdog {
for _, ev := range re.Events {
fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
defer cancel()
//go run ./example.go
//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"
//OK
//Operation type is:PUT
//key:testk1
//value:value1
//etcdctl --endpoints=http://127.0.0.1:2379 del testk1
//Operation type is:DELETE
//key:testk1
//value:
//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "val1"
//Operation type is:PUT
//key:testk1
//value:val1
评论已关闭