项目地址:https://github.com/etcd-io/etcd

简介

etcd是go语言开发的,基于键值对存储的分布式可靠存储系统,常被用于共享配置、服务的注册、发现,知名例子有k8s

特点:

  • 简单易使用
  • 基于SSL通信
  • 通过Raft算法保证一致性
  • 完全复制,集群中每个节点都可以使用完整存档
  • 快速,单实例每秒10000次写入

Raft算法保证一致性的原理

etcd集群拥有选举功能,当节点发生故障的时候可以通过选举来保障服务的可用性
image.png
服务注册与消息通知:

  • 消费者在启动的时候先主动去获取一次配置
  • 消费者在etcd上注册一个watcher
  • 当有新的内容,etcd都会主动去通知订阅者

etcd中使用了Watcher机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现方式:

  • 不同系统都在etcd上对同一个目录进行注册,同时设置Watcher观测该目录的变化(如果对子目录的变化也有需要,可以设置递归模式)
  • 当某个系统更新了etcd的目录,那么设置了Watcher的系统就会收到通知,并作出相应处理。

数据更新流程
image.png

当客户端对etcd发起请求的时候,如果etcd不是leader的状态而是follower,follower则会将请求转发leader; 如果是leader后, 会对其进行预检查,检查(配额、限速、鉴权【判断请求是否合法】、包大小【需要小于1.5M,过大则会拒绝】)。
如果请求本身是合法的,会将请求转发给KVServer处理。
KVserver一致性模块进行数据处理,一致性模块是基于raft协议实现的,这时候的数据本身是处于unstable状态。
当leader该数据处理unstable状态后,会通过rpc通知其他follower也来同步该数据,并且leader本身会将数据同步到日志模块【wal日志, wal日志通过fsync落盘到磁盘中】。而其他follow在同步该数据的时候,本身完成的是步骤3和数据同步到日志模块,follower一致性模块数据变成commited状态,当完成了这些后通过上次rpc返回响应体给leader。
leader在收到了超过半数集群本身确认后,更新MatchIndex, 一致性模块中数据本身由unstable变化成commited状态。这时候通过MVCC模块进行状态机的写入,将数据同步到treeIndex【会更新modified版本[当前版本号], generations信息[创建的版本,当前版本数,过往的所有版本号]】。再通过BoltDB落盘到磁盘中。这时候一致性模块数据由commited变化为applied状态。【在这里如果没有要求数据强一致性,弱一致性的话,那么数据在commited状态就认为数据已经同步完成了】。
再通过heatbeat将数据同步到follower中MVCC模块中。最终完成数据的一致性。如下图所示。 【如果follower比leader落后好几个版本,leader会通过headbeat带到follower进行同步】。

基础

mac install \use

etcdctl endpoint health

etcdctl --endpoints=http://127.0.0.1:2379 get xxxx

etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"

etcdctl --endpoints=http://127.0.0.1:2379 del testk1

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

func main() {
    client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("failed to connect etcd:", err)
        return
    }
    defer client.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

    _, err = client.Put(ctx, "testk1", "testv1") //上传键值对
    if err != nil {
        logrus.Error("failed to put key", err)
    }
    cancel()

    ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)

    get, err := client.Get(ctx, "testk1") //获取键值对

    for _, kv := range get.Kvs {
        fmt.Printf("key:%s \nvalue:%s", kv.Key, kv.Value)
    }
    cancel()

}

//go run ./example.go
//key:testk1 
//value:testv1

Watch

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

func main() {
    client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("failed to connect etcd:", err)
        return
    }
    defer client.Close()

    ctx, cancel := context.WithCancel(context.Background())

    watchdog := client.Watch(ctx, "testk1") //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value

    for re := range watchdog {
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
    defer cancel()
//go run ./example.go 
    
//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "value1"
//OK

//Operation type is:PUT
//key:testk1
//value:value1

//etcdctl --endpoints=http://127.0.0.1:2379 del testk1

//Operation type is:DELETE
//key:testk1
//value:

//etcdctl --endpoints=http://127.0.0.1:2379 put testk1 "val1"

//Operation type is:PUT
//key:testk1
//value:val1

标签: none

评论已关闭