分类 云原生 下的文章

https://feisky.gitbooks.io/kubernetes/content/introduction/

k8s是一个能够自动部署容器的集群管理系统,相较于传统的容器技术,k8s能够实现pod的自动调度,回滚,负载均衡等技术
k8s由master节点和node节点组成,master节点负责控制集群,node就是容器所真实存在的平台,他可能是物理机,也可能是一台虚拟机,一个node上面可能有一个或者多个pod,在新pod生成的时候,master会根据算法把他调度到合适的某台node上面去完成生成。
k8s架构.jpg

基础概念

k8s的核心组件

  • etcd 保存了整个集群的状态;
  • apiserver 提供了资源操作的唯一入口,并提供认证、授权、访问控制、API 注册和发现等机制;
  • controller manager 负责维护集群的状态,比如故障检测、自动扩展、滚动更新等;
  • scheduler 负责资源的调度,按照预定的调度策略将 Pod 调度到相应的机器上;
  • kubelet 负责维护容器的生命周期,同时也负责 Volume(CVI)和网络(CNI)的管理;
  • Container runtime 负责镜像管理以及 Pod 和容器的真正运行(CRI);
  • kube-proxy 负责为 Service 提供 cluster 内部的服务发现和负载均衡

安装

单机测试-minikube

感觉云相关的组建配置要求都比较高......至少单核的学生服务器是跑不起来了,这里我在阿里云上白嫖了一台2核4g的服务器,或者也可以使用自己本地的虚拟机来安装,配置给高点就行

一般来讲k8s的master是独立部署的,而minikube则把master和node合并在一起了
image.png

https://minikube.sigs.k8s.io/docs/start/

curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
minikube start

这里可能会报Exiting due to DRV_AS_ROOT: The "docker" driver should not be used with root privileges.,要么另外创建一个用户跑dockerhttps://blog.csdn.net/fly_leopard/article/details/108790217

要么minikube start --force --driver=docker --image-mirror-country='cn' --image-repository='registry.cn-hangzhou.aliyuncs.com/google_containers' --base-image='registry.cn-hangzhou.aliyuncs.com/google_containers/kicbase:v0.0.28''__

alias kubectl="minikube kubectl --"

不过这里因为墙的原因,即使加了代理设置试了几次也没能把镜像拉下来,心累......有兴趣玩这个模拟器可以自查这本官方手册
https://minikube.sigs.k8s.io/docs/start/

集群

https://k8s.easydoc.net/docs/dRiQjyTY/28366845/6GiNOzyZ/nd7yOvdY#nav_3

主节点需要:
docker
kubectl
kubeadm

worker节点需要:
docker
kubelet
kube-proxy

我这里以三台2c4g的云服务器为例来进行部署,服务器可以按流量计费去租,云服务厂商的镜像一般都是已经调整过的,如果是自己的镜像可能会有很多坑,先列举几个
1.关闭swap

swapoff -a
vi /etc/fstab 
#/dev/mapper/centos_hhdcloudrd6-swap swap                    swap    defaults        0 0
reboot

2.ip转发

 echo 1 > /proc/sys/net/ipv4/ip_forward

一台master两台node,全部修改/etc/hosts互相认识一下,改完记得ping一下名字看看通不通

关防火墙和SELinux,不过默认一般都是都关了的

配置k8s和docker的安装源

cat <<EOF > kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF
mv kubernetes.repo /etc/yum.repos.d/


yum -y install yum-utils
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

安装组件

yum install -y kubelet-1.22.4 kubectl-1.22.4 kubeadm-1.22.4 docker-ce

设置开机自启

systemctl enable kubelet
systemctl start kubelet
systemctl enable docker
systemctl start docker

修改docker配置

# kubernetes 官方推荐 docker 等使用 systemd 作为 cgroupdriver,否则 kubelet 启动不了
#kubelet会默认--cgroup-driver=systemd,以免systemd和cgroupfs发生冲突导致kubelet无法使用
cat <<EOF > daemon.json
{
  "exec-opts": ["native.cgroupdriver=systemd"],
  "registry-mirrors": ["https://ud6340vz.mirror.aliyuncs.com"]
}
EOF
mkdir /etc/docker
mv daemon.json /etc/docker/

systemctl daemon-reload
systemctl restart docker

在master使用kubeadm初始化集群,搭建一个控制平面
https://kubernetes.io/zh-cn/docs/reference/setup-tools/kubeadm/kubeadm-init/

echo 1 > /proc/sys/net/bridge/bridge-nf-call-iptables #这个每个节点都要
kubeadm init --image-repository=registry.aliyuncs.com/google_containers --pod-network-cidr=10.244.0.0/16 
#此处使用flannel,需要与文件中的net-conf.json.Network保持一致,不然会报错
#查看cidr的值的快速办法cat net.yaml |grep -E "^\s*\"Network"
# 这里可以用pod-network-cidr来指定cidr,不过后面的cni插件得要做出相应的修改
#如果你想要使用coredns作为你的dns而非kube-dns,那么你可以使用--feature-gates=CoreDNS=true


记得保存kubeamd join这条指令,后面让工作节点加入集群用的
忘记了重新获取:kubeadm token create --print-join-command
kubeadm join 172.20.245.132:6443 --token 32tne8.90kj2le30sgrxy1u \
        --discovery-token-ca-cert-hash sha256:aa08f6d92df2507ad0c9739df7ffd25861576fde4453dc47aec9a08984b51a43 

请注意这里是采用了单节点来部署master节点,如果你需要多节点部署高可用的k8s,那你需要在初始化的时候加入--control-plane-endpoint + 负载均衡,如果没有加后续无法调整!
https://www.cnblogs.com/cmt/p/12061089.html
https://www.cnblogs.com/dudu/p/12168433.html
https://www.jianshu.com/p/35b2c6deb573

复制授权文件,以便 kubectl 可以有权限访问集群
如果你其他节点需要访问集群,或者你需要从你外部的工作站访问集群,则需要从主节点复制admin.conf过去其他节点(linux和mac可以使用scp指令去复制)

mkdir -p $HOME/.kube
cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
chown $(id -u):$(id -g) $HOME/.kube/config

如果不行就
vi /etc/profile.d/kube.sh
export KUBECONFIG=$HOME/.kube/config
source /etc/profile.d/kube.sh

当你从外部访问
kubectl --kubeconfig ./admin.conf get nodes

当你希望从外部访问kuber api
scp root@<control-plane-host>:/etc/kubernetes/admin.conf .
kubectl --kubeconfig ./admin.conf proxy

在工作节点运行保存的kubeadm join
如果你后续想要让机器加入集群,可以使用上面保存的指令

kubeadm join --token <token> <control-plane-host>:<control-plane-port> --discovery-token-ca-cert-hash sha256:<hash>
如果你丢失了令牌,可以使用这个来列出
#kubeadm token list
#token一般有效期为24h,如果过期了, 你可以重新生成一个
kubeadm token create

跑完后就可以使用kubectl 命令了,但是我这里不知道为什么环境变量上不去,于是额外执行

vi /etc/profile.d/kube.sh
export KUBECONFIG=/etc/kubernetes/kubelet.conf

创建kube-flannel.yml,写入这个文件的内容(因为直接拉拉不下来),这是一个cni插件,处理网络相关的,https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml,然后kubectl apply -f kube-flannel.yml,不做这一步会发现node一直都是not ready
要注意的是,这里默认的文件有个坑,他默认的type为vxlan,而我们是云服务器,如果只是像这样做个小练习还好,实际环境一定要改或者按需选择!
(直接部署不做其他配套改动其实大概率是行不通的,就当是仪式感了)

#如果你对你的网络有信心,你可以直接运行
kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/v0.20.2/Documentation/kube-flannel.yml

#如果你发现你部署完flannel就寄了,可以查看pod的日志以观察是什么原因,这里提供一种情形,具体看我另外的帖子
#导致这个问题的原因是kubeadm的时候没加cidr的参数
#编辑 master 机器上的 /etc/kubernetes/manifests/kube-controller-manager.yaml
#启动文件加上下面两句话,下面这个cluster-cidr要和kube-flannel.yml里面的地址一致,要和kube-proxy.config.yaml里面的clusterCIDR一致

- --allocate-node-cidrs=true
- --cluster-cidr=10.244.0.0/16#

你为你集群安装的pod网络插件不能与你集群网络本身产生任何重叠,否则会导致一些问题,如果你想要修改网络,你可以在init的时候使用--pod-network-cidr 参数默认情况下,k8s使用RBAC来作为权限校正机制,你需要确保你选择的cni支持这种机制
cni本身并不由kubeadm提供,所以在选择那些第三方的时候最好足够小心谨慎,每个集群只能安装一个cni

https://jimmysong.io/kubernetes-handbook/practice/flannel-installation.html

[root@master ~]# kubectl get nodes
NAME     STATUS   ROLES                  AGE   VERSION
master   Ready    control-plane,master   10h   v1.22.4
node1    Ready    <none>                 10h   v1.22.4
node2    Ready    <none>                 10h   v1.22.4

在每个节点上写入这个文件/run/flannel/subnet.env

FLANNEL_NETWORK=10.244.0.0/16
FLANNEL_SUBNET=10.244.0.1/24
FLANNEL_MTU=1450
FLANNEL_IPMASQ=true

使用命令行运行和删除一个pod

[root@master ~]# kubectl get pods
No resources found in default namespace.
[root@master ~]# kubectl run test-server --image=go-service:v3
pod/test-server created
[root@master ~]# kubectl get pods
NAME          READY   STATUS              RESTARTS   AGE
test-server   0/1     ContainerCreating   0          8s
[root@master ~]# kubectl get pods
No resources found in default namespace.

基础操作

kubectl

kubectl常用指令
https://blog.csdn.net/weixin_36755535/article/details/127615449

kubectl apply/create -f yaml #根据yaml文件来创建pod,create只能创建一次,apply如果pod不存在则创建,如果存在则更新
kubectl get pod -n nodename  -o wide/yaml --show-labels -l label #-n指定node,获取其中的pod列表,-o wide显示详细信息,yaml是以yaml的格式显示,show-labels显示标签,-l显示含有某标签的pod
kubectl get namespaces/ns #获取namespace
kubectl create namespace new-namespace  #创建namespace
kubectl create cm xxx --from-file xxxx  #从文件创建cm
kubectl describe #查看某对象的详细信息
kubectl get service servername #查看server
kubectl -n node logs pod/podname -c 具体的容器名 #查看日志,如果pod里只有一个容器,那-c可以不用
kubectl delete pod  xxx #删除pod
kubectl exec -it <pod-name> -c <container-name> -- bash #登陆容器
#如果想直接执行命令,可以把-it去掉,然后把--后面的东西换成你的命令,--意味着kubectl语句的结束,在这之后的-不会被解析成kubectl的参数
kubectl delete 类型 名称 #    删除
kubectl logs pod/podname #    查看日志
kubectl -n xxx get pods xxxx -o jsonpath={.spec.containers[*].name} #查看pod里面的容器
kubectl rolling-update test-pod -f test-pod2.json #滚动更新
kubectl rolling-update test-pod test-pod2 --image=image:v2 #更新资源名和镜像
kubectl edit #更新资源,立即生效,可以通过配置环境变量KUBE_EDITOR来指定编辑器
kubectl scale --replicas=   #手动放缩
kubectl label --overwrite #打标签,如果原来有标签,需要overwrite,不然只会警告不会实际更改
kubectl explain ingress.spec #查询某种资源类型的定义规范格式等

yaml编写

k8s一般采用yaml来生成想要的pod

yaml大小写敏感,使用缩进来表示层级,相同层级要作对齐,用空格不要用tab,末尾要打个空格,如果把多个yaml放在一个文件,需要使用------来分割,#表示注释

apiVersion: group/apiversion  # 如果没有给定group名称,那么默认为croe,可以使用kubectl api-versions 获取当前k8s版本上所有的apiVersion版本信息(每个版本可能不同)
kind:       #资源类别
metadata:  #资源元数据
  name   
  namespace  #k8s自身的namespace   
  lables   
  annotations   #主要目的是方便用户阅读查找
spec: #期望的状态(disired state)
    containers:  
      - image:    
        name:   
        ports:    
          - name:      
            containerPort:    
            protocol: 

status: #当前状态,本字段有kubernetes自身维护,用户不能去定义

变量格式

k8s支持三种类型的变量:常量、对象(键值对),数组(一组按顺序排列的值),记得要打空格
常量:

数字、布尔、时间、字符串等

s1: hello
s2: 123
s3: True

对象

adb:
  a1: 1
  b1: 2

数组

addr:
  - Beijing
  - tokyo

常见kind

kind类型
资源.jpg

ReplicationController、ReplicaSet、deployment

rc/rs作用相似,后者是前者的升级版,rs的作用是使的pod的副本数与用户设置的数量保持一致,当有容器异常退出,填会去调度新的pod来进行替代,deployment比rc的功能更多,除了能够管理rc以外,deploy还能够提供回滚等操作,所以我们一般会去创建deployment而非直接创建rc/rs

示例

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
spec: #描述详情
  replicas: 3 #副本数量
  template: #模版,如果pod的数量不足,会按照下列模版去生成
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80

daemonset

daemonset作用是使的这个容器在每个node上都有一个运行副本,多用于部署一些日志、监控等功能

statefulset

StatefulSet 是为了解决有状态服务的问题(对应 Deployments 和 ReplicaSets 是为无状态服务而设计),其应用场景包括

  • 稳定的持久化存储,即 Pod 重新调度后还是能访问到相同的持久化数据,基于 PVC 来实现
  • 稳定的网络标志,即 Pod 重新调度后其 PodName 和 HostName 不变,基于 Headless Service(即没有 Cluster IP 的 Service)来实现
  • 有序部署,有序扩展,即 Pod 是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依序进行(即从 0 到 N-1,在下一个 Pod 运行之前所有之前的 Pod 必须都是 Running 和 Ready 状态),基于 init containers 来实现
  • 有序收缩,有序删除(即从 N-1 到 0)

用人话来说,就是那些会产生数据的,比如普罗米修斯,就得用这种

要注意的是,pod的数据并不能持久化保存,比如重启下可能数据就没了,如果想要让数据长期存储,得另外挂载卷volume

示例

apiVersion: v1
kind: Service
metadata:
  name: nginx
  labels:
    app: nginx
spec:
  ports:
  - port: 80
    name: web
  clusterIP: None
  selector:
    app: nginx
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: web
spec:
  serviceName: "nginx"
  replicas: 2
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: k8s.gcr.io/nginx-slim:0.8
        ports:
        - containerPort: 80
          name: web
        volumeMounts:   #挂载
        - name: www
          mountPath: /usr/share/nginx/html
  volumeClaimTemplates:
  - metadata:
      name: www
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 1Gi

service

service.jpg

service的功能是把一些pod抽象成一组,为其提供一个统一的网络入口,通过service可以方便的实现负载均衡等功能,service是通过标签来选择服务后端等,这些匹配标签的 Pod IP 和端口列表组成 endpoints,由 kube-proxy 负责将服务 IP 负载均衡到这些 endpoints 上。不然pod的ip会在每次重启发生变化,并且只能在内部访问

Service 有四种类型:

  • ClusterIP:默认类型,自动分配一个仅 cluster (集群)内部可以访问的虚拟 IP
  • NodePort:在 ClusterIP 基础上为 Service 在每台机器上绑定一个端口,这样就可以通过 :NodePort 来访问该服务。如果 kube-proxy 设置了 --nodeport-addresses=10.240.0.0/16(v1.10 支持),那么仅该 NodePort 仅对设置在范围内的 IP 有效。
  • LoadBalancer:在 NodePort 的基础上,借助 cloud provider 创建一个外部的负载均衡器,并将请求转发到 :NodePort
  • ExternalName:将服务通过 DNS CNAME 记录方式转发到指定的域名(通过 spec.externlName 设定)。需要 kube-dns 版本在 1.7 以上。

另外,也可以将已有的服务以 Service 的形式加入到 Kubernetes 集群中来,只需要在创建 Service 的时候不指定 Label selector,而是在 Service 创建好后手动为其添加 endpoint。

示例一:定义了一个叫做nginx的service,他会把这个service策略下的pod的80端口的数据转发到default namespace连标签为name: nginx的pod的80端口

apiVersion: v1
kind: Service
metadata:
  labels:
    run: nginx
  name: nginx
  namespace: default
spec:
  ports:
  - port: 80
    protocol: TCP
    targetPort: 80
  selector:  # 标签选择器
    run: nginx
  sessionAffinity: None
  type: ClusterIP
 

示例二:当涉及到多个端口,需要每个端口都起一个名

kind: Service
apiVersion: v1
metadata:
  name: my-service
spec:
  selector:
    app: MyApp
  ports:
  - name: http
    protocol: TCP
    port: 80
    targetPort: 9376
  - name: https
    protocol: TCP
    port: 443
    targetPort: 9377 

示例三:转发到集群外的service,有两种写法

3.1:自定义 endpoint,即创建同名的 service 和 endpoint,在 endpoint 中设置外部服务的 IP 和端口

kind: Service
apiVersion: v1
metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 80
      targetPort: 9376
---
kind: Endpoints
apiVersion: v1
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 1.2.3.4
    ports:
      - port: 9376

3.2:通过 DNS 转发,在 service 定义中指定 externalName。此时 DNS 服务会给 ..svc.cluster.local 创建一个 CNAME 记录,其值为 my.database.example.com。并且,该服务不会自动分配 Cluster IP,需要通过 service 的 DNS 来访问。

kind: Service
apiVersion: v1
metadata:
  name: my-service
  namespace: default
spec:
  type: ExternalName
  externalName: my.database.example.com

如果一个service没有指定ip,那就被叫做headless service,一共有两种,一种是上那种通过dns转发的,还一种是指定selectors,通过dns a 去找后端endpoint列表。

apiVersion: v1
kind: Service
metadata:
  labels:
    app: nginx
  name: nginx
spec:
  clusterIP: None
  ports:
  - name: tcp-80-80-3b6tl
    port: 80
    protocol: TCP
    targetPort: 80
  selector:
    app: nginx
  sessionAffinity: None #一种分发策略,基于客户端IP地址进行会话保持/关联的模式,即第1次将某个客户端发起的请求转发到后端的某个Pod上,之后从相同的客户端发起的请求都将被转发到后端相同的Pod上
  type: ClusterIP
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    app: nginx
  name: nginx
  namespace: default
spec:
  replicas: 2
  revisionHistoryLimit: 5  # 历史记录版本
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - image: nginx:latest
        imagePullPolicy: Always
        name: nginx
        resources:
          limits:
            memory: 128Mi
          requests:
            cpu: 200m
            memory: 128Mi
      dnsPolicy: ClusterFirst
      restartPolicy: Always
headless

https://cloud.tencent.com/developer/article/1638722
用上面的方法布的pod都会有个cluster ip,由kube-proxy负责,如果不想要集群ip,而是想给pod弄个单独的serviceip,那么可以使用headless,clusterIP: None
headless有lable secector和service(iptables代理)两种模式
对于有selector的headless,endpiont会记录并修改dns返回的地址,使得通过地址能够直接到达pod,pod的backend里面包含了跟他满足同样条件的其他pod的ip。如果没有selector,endpoints不会做记录
image.png

apiVersion: v1
kind: Service
metadata:
  labels:
    app: nginx
  name: nginx
spec:
  clusterIP: None
  ports:
  - name: tcp-80-80-3b6tl
    port: 80
    protocol: TCP
    targetPort: 80
  selector:
    app: nginx
  sessionAffinity: None #一种分发策略,基于客户端IP地址进行会话保持/关联的模式,即第1次将某个客户端发起的请求转发到后端的某个Pod上,之后从相同的客户端发起的请求都将被转发到后端相同的Pod上
  type: ClusterIP
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    app: nginx
  name: nginx
  namespace: default
spec:
  replicas: 2
  revisionHistoryLimit: 5  # 历史记录版本
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - image: nginx:latest
        imagePullPolicy: Always
        name: nginx
        resources:
          limits:
            memory: 128Mi
          requests:
            cpu: 200m
            memory: 128Mi
      dnsPolicy: ClusterFirst
      restartPolicy: Always

命名端口,使用别名可以在pod修改端口后service无需修改

kind: pod
spec: 
    containers:
      - name: test
        ports: 
          - name: http #给这个端口起名为http
            containerPort: 8080
          -    name: https
            containerPort: 8443

-------
apiVersion: v1
kind: Service
spec:
    ports:
    - name: http        #将80端口映射到叫http的端口上
      port: 80
      targetPort: http
    

service并不是与服务直连的,而是通过endpoint,使用describe来查看service可以看见相关的信息
endpoint就是暴露服务的ip和端口列表
选择器⽤于构建IP和端⼜列表,然后存储在Endpoint资源中。当客户端连接到服务时,服务代理选择这些IP和端⼜对中的⼀个,并将传⼊连接重定向到在该位置监听的服务器。
这也就意味着,service其实是和endpoint解耦的,你完全可以独立维护一个endpoint的yaml,将其对接在一个不含pod选择器的service上,两者需要相同的名称

[root@master ~]# kubectl describe svc jenkins-service -n devops-tools
Name:                     jenkins-service
Namespace:                devops-tools
Labels:                   <none>
Annotations:              prometheus.io/path: /
                          prometheus.io/port: 8080
                          prometheus.io/scrape: true
Selector:                 app=jenkins-server
Type:                     NodePort
IP Family Policy:         SingleStack
IP Families:              IPv4
IP:                       10.111.233.39
IPs:                      10.111.233.39
Port:                     <unset>  8080/TCP
TargetPort:               8080/TCP
NodePort:                 <unset>  32000/TCP
Endpoints:                10.244.1.3:8080   #服务endpoint的pod的列表
Session Affinity:         None
External Traffic Policy:  Cluster
Events:                   <none>

[root@master ~]# kubectl get endpoints -n devops-tools
NAME              ENDPOINTS         AGE
jenkins-service   10.244.1.3:8080   16d
loadbalance

负载均衡器loadbalance拥有自己的公开可访问的ip地址,并且将所有的连接重定向到服务,可以通过负载均衡器的ip地址访问服务
lb是nodeport的扩展,如果k8s所在的环境不支持lb,则lb会表现得如同nodeport
lb是基于连接的,类似四层转发
image.png
image.png
image.png


需要注意的是,流量进入到节点之后,可能不止只转到该节点上的容器,还有可能会被转到别的节点上,以造成额外的开销,如果你不希望这种情况的发生,可以配置externalTrafficPolicy为Local,但是这会对负载均衡的性能造成影响
https://www.cnblogs.com/zisefeizhu/p/13262239.html
默认可能会发生的情况,这种情况下因为被节点转过一次(SNAT),最终后端将无法记录原始的客户端ip
image.png
local
image.png

ingress

lb的缺点在上述中已经体现的很明显了,每个lb都需要自己的负载均衡器和ip地址,为了解决这个问题,我们选择使用ingress来代替
ingress只需要一个ip就可以为多个服务提供访问,当一个请求到达时,他可以根据主机名和路径来决定转发
ingress是基于应用层的,类似七层转发
ingress需要ingress控制器才能实现,不同的k8s环境在这点上所需不同,请注意这点

ingress控制器就相当于一个监听器,通过apiserver不断去监听后端service\pod的变化,根据ingress的配置去更新代理
https://help.aliyun.com/document_detail/204886.html
image.png
image.png

backend除了支持service之外,还支持连接资源,他和service是互斥的,resource常用用法是将静态资源导向对象存储后端

[root@master ~]# kubectl explain ingress.spec.rules.http.paths.backend
KIND:     Ingress
VERSION:  networking.k8s.io/v1

RESOURCE: backend <Object>

DESCRIPTION:
     Backend defines the referenced service endpoint to which the traffic will
     be forwarded to.

     IngressBackend describes all endpoints for a given service and port.

FIELDS:
   resource     <Object>
     Resource is an ObjectRef to another Kubernetes resource in the namespace of
     the Ingress object. If resource is specified, a service.Name and
     service.Port must not be specified. This is a mutually exclusive setting
     with "Service".

   service      <Object>
     Service references a Service as a Backend. This is a mutually exclusive
     setting with "Resource".

namespace

Namespace 是对一组资源和对象的抽象集合,比如可以用来将系统内部的对象划分为不同的项目组或用户组。常见的 pod, service, replication controller 和 deployment 等都是属于某一个 namespace 的(默认是 default),而 node, persistent volume,namespace 等资源则不属于任何 namespace。
Namespace 常用来隔离不同的用户,比如 Kubernetes 自带的服务一般运行在 kube-system namespace 中。

用人话说,默认情况下k8s里的pod是可以互相访问的,如果想要实现分组隔离,可以使用namespace,另外可以把namespace交给不同的用户进行管理

kubectl 可以通过 --namespace 或者 -n 选项指定 namespace。如果不指定,默认为 default。查看操作下, 也可以通过设置 --all-namespace=true 来查看所有 namespace 下的资源。可以通过kubectl get namespaces/ns来查看

创建

(1) 命令行直接创建
$ kubectl create namespace new-namespace

(2) 通过文件创建
$ cat my-namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: new-namespace

$ kubectl create -f ./my-namespace.yaml


删除

kubectl delete namespaces new-namespace
删除一个 namespace 会自动删除所有属于该 namespace 的资源。
default 和 kube-system 命名空间不可删除。
PersistentVolume 是不属于任何 namespace 的,但 PersistentVolumeClaim 是属于某个特定 namespace 的。
Event 是否属于 namespace 取决于产生 event 的对象。
v1.7 版本增加了 kube-public 命名空间,该命名空间用来存放公共的信息,一般以 ConfigMap 的形式存放

configmap

configap可以帮我实现配置和应用分离,其用于保存配置数据的键值对,可以用来保存单个属性,也可以用来保存配置文件。

要注意:

  • ConfigMap 必须在 Pod 引用它之前创建
  • 使用 envFrom 时,将会自动忽略无效的键
  • Pod 只能使用同一个命名空间内的 ConfigMap

通过yaml来编写

apiVersion: v1
kind: ConfigMap
metadata:
  name: special-config
  namespace: default
data:
  special.how: very
  special.type: charm
  
  
  
  
$ kubectl create  -f  config.yaml
configmap "special-config" created

使用

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
spec:
  containers:
    - name: test-container
      image: gcr.io/google_containers/busybox
      command: ["/bin/sh", "-c", "env"]
      env:
        - name: SPECIAL_LEVEL_KEY
          valueFrom:
            configMapKeyRef:
              name: special-config  #所使用的configmap
              key: special.how        #所使用的变量
        - name: SPECIAL_TYPE_KEY
          valueFrom:
            configMapKeyRef:
              name: special-config
              key: special.type
      envFrom:
        - configMapRef:
            name: env-config
  restartPolicy: Never
  
  
  
  #输出
SPECIAL_LEVEL_KEY=very
SPECIAL_TYPE_KEY=charm
log_level=INFO

作为命令行参数

apiVersion: v1
kind: Pod
metadata:
  name: dapi-test-pod
spec:
  containers:
    - name: test-container
      image: gcr.io/google_containers/busybox
      command: ["/bin/sh", "-c", "echo $(SPECIAL_LEVEL_KEY) $(SPECIAL_TYPE_KEY)" ]
      env:
        - name: SPECIAL_LEVEL_KEY
          valueFrom:
            configMapKeyRef:
              name: special-config
              key: special.how
        - name: SPECIAL_TYPE_KEY
          valueFrom:
            configMapKeyRef:
              name: special-config
              key: special.type
  restartPolicy: Never

挂载

apiVersion: v1
kind: ConfigMap
metadata:  
  name: configmap  
  namespace: dev
  data:  
    info: |    
      username:admin    
      password:123456
      
kubectl create -f configmap.yaml
configmap/configmap created


apiVersion: v1
kind: Pod
metadata:  
  name: pod-configmap  
  namespace: dev
  spec:  
    containers:  
      - name: nginx    
        image: nginx:1.17.1    
    volumeMounts: # 将configmap挂载到目录    
      - name: config      
        mountPath: /configmap/config  
    volumes: # 引用configmap  
      - name: config    
        configMap:      
        name: configmap

volume

容器会在挂掉之后丢失所有数据,为了使数据持久化,可以采用volume,volume的生命周期与pod绑定,其中部分(emptyDir、secret、gitRepo等)不会因为容器挂掉而失去数据,但是如果pod被删除数据也会被清空

emptyDir

如果 Pod 设置了 emptyDir 类型 Volume, Pod 被分配到 Node 上时候,会创建 emptyDir,只要 Pod 运行在 Node 上,emptyDir 都会存在(容器挂掉不会导致 emptyDir 丢失数据),但是如果 Pod 从 Node 上被删除(Pod 被删除,或者 Pod 发生迁移),emptyDir 也会被删除,并且永久丢失。

apiVersion: v1
kind: Pod
metadata:
  name: test-pd
spec:
  containers:
  - image: gcr.io/google_containers/test-webserver
    name: test-container
    volumeMounts: #将cache-volume挂载进容器里/cache
    - mountPath: /cache  # 容器内的挂载点
      name: cache-volume  #名字
  volumes:  #声明一个卷叫做cache-volume,类型为emptyDir
  - name: cache-volume  #与上面名字对应
    emptyDir: {}

hostpath

hostPath 允许挂载 Node 上的文件系统到 Pod 里面去。如果 Pod 需要使用 Node 上的文件,可以使用 hostPath。

apiVersion: v1
kind: Pod
metadata:
  name: test-pd
spec:
  containers:
  - image: gcr.io/google_containers/test-webserver
    name: test-container
    volumeMounts:
    - mountPath: /test-pd
      name: test-volume
  volumes:
  - name: test-volume
    hostPath:
      path: /data
      type: DirectoryOrCreate  # 目录存在就使用,不存在就先创建后使用,可选
      
#关于type的值的一点说明:    
DirectoryOrCreate 目录存在就使用,不存在就先创建后使用    
Directory   目录必须存在    
FileOrCreate  文件存在就使用,不存在就先创建后使用    
File 文件必须存在     
Socket  unix套接字必须存在    
CharDevice  字符设备必须存在    
BlockDevice 块设备必须存在

NFS
可以将文件挂载到一台NFS服务器上,那台服务器必须已经开启了NFS,并将共享目录暴露给pod所在网段

persistent volume

pv是持久化卷的意思,是对底层存储的一种抽象,一般通过管理员创建,通过插件实现共享化存储。

PV 提供网络存储资源,而 PVC (PersistentVolumeClaim)请求存储资源,这样,设置持久化的工作流包括配置底层文件系统或者云数据卷、创建持久性数据卷、最后创建 PVC 来将 Pod 跟数据卷关联起来。PV 和 PVC 可以将 pod 和数据卷解耦,pod 不需要知道确切的文件系统或者支持它的持久化引擎。

说白点就是pv负责把实际的存储抽象打包起来,而PVC负责去调用这些pv,上面用pvc的人不需要知道底层存储到底是什么样子的

Volume 的生命周期包括 5 个阶段

  1. Provisioning,即 PV 的创建,可以直接创建 PV(静态方式),也可以使用 StorageClass 动态创建
  2. Binding,用户创建pvc,如果有满足条件的会绑定,如果没有会一直等着
  3. Using,Pod 通过 PVC 使用该 Volume,并可以通过准入控制 StorageObjectInUseProtection(1.9 及以前版本为 PVCProtection)阻止删除正在使用的 PVC
  4. Releasing,Pod 释放 Volume 并删除 PVC
  5. Reclaiming,回收 PV,可以保留 PV 以便下次使用,也可以直接从云存储中删除
  6. Deleting,删除 PV 并从云存储中删除后段存储

根据这 5 个阶段,Volume 的状态有以下 4 种

  • Available:可用
  • Bound:已经分配给 PVC
  • Released:PVC 解绑但还未执行回收策略
  • Failed:发生错误
apiVersion: v1  
kind: PersistentVolume
metadata:  
  name: pv2
spec:  
  nfs: # 存储类型 
  capacity:  # 存储能力   
    storage: 2Gi  
  accessModes:  # 访问模式  
  storageClassName: # 存储类别  
  persistentVolumeReclaimPolicy: # 回收策略

访问模式分为三种,不同的底层存储类型可能支持的访问方式不同

  1. ReadWriteOnce:读写,只能被单个节点挂载
  2. ReadOnlyMany:只读,多个节点
  3. ReadWriteMany:读写,多个节点

回收策略分为三种:

  1. retain:保留数据,需要手动清理
  2. recycle:清除数据
  3. delete:删除卷

存储类别:pv可以设置一个存储类别,该pv只能被申请对应类别的pvc绑定,没有设置存储类别的pv只能与不请求任何类别的pvc绑定,一经绑定该pv无法与其他pvc绑定

pvc

apiVersion: v1
kind: PersistentVolumeClaim
metadata: 
  name: pvc  
  namespace: dev
  spec:  
    accessModes: # 访问模式  
    selector: # 采用标签对PV选择  
    storageClassName: # 存储类别,只有设置了类别的pv会被匹配  
    resources: # 请求空间    
      requests:      
        storage: 5Gi

ClusterRole/Role

apiserver支持RBAC的方式来进行授权管理,即根据角色的不同而授予不同的权限类型,角色即为针对某些资源对象的一组权限的集合

  • thanos

未命名文件.png
项目地址

实现功能

日志收集

监控日志文件,将其新数据通过kafka中转存储在ES中

目前实现的特性:

  • 实时从etcd获取最新配置,动态管控当前监听的对象
  • 根据ip自动获取不同的配置,方便进行区分
  • 动态创建ES index,无需提前手动创建

image.png
image.png

image.png

监控数据收集

运行你自定义的监控脚本,收集其数据并存储在influxdb中,方便后续的引用或者展示

代码文件中已包含一个示例test_moni.go,以其为例

image.png
image.png

image.png
image.png
监控脚本会定时去获取机器内存信息,并发送给kafka,kafka发送给influxdb存储,并通过grafana展示出来

package main

import (
    "fmt"
    "sync"

    "example.go/addrget"
    "example.go/etcd"
    "example.go/ifdb"
    "example.go/kafka"
    "example.go/tailfile"
    moni "example.go/test_moni"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    ip, err := addrget.GetIP() //获取本机ip
    if err != nil {
        logrus.Error("FAILED TO GET IP!!!!", err)
        return
    }

    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_key := fmt.Sprintf(configOBJ.Etcd.Collectkey, ip)

    conf_list, err := etcd.Etcd_getvalue(conf_key) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    /**
    // go tailfile.Tailgetnewconf()
    **/
    go moni.Getcpuinfo()
    go ifdb.Writeapi()
    //go send_to_kafka()
    var wg sync.WaitGroup
    wg.Add(2)
    go send_to_kafka(&wg)
    go etcd.Etcd_watchdog(conf_key, &wg)
    wg.Wait()

}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    logrus.Warn("kafka's send job start!")
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}
package ifdb

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    influxdb2 "github.com/influxdata/influxdb-client-go"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

//生产环境可别这么干哦
var (
    cli   influxdb2.Client
    token = "RTqC84IwjDwG4Lb6G4DCdxT9w3padfGzsQM03gnq6-nt-_O6l-d7UHvGdG96r-sD9fySvNlYAPM0OARVbXzyTA=="
)

//连接数据库
func Conninflux() {
    cli = influxdb2.NewClient("http://127.0.0.1:8086", token)
    logrus.Info("success connect to influxdb")
}

func Queryinf() {
    //org
    queryAPI := cli.QueryAPI("project1")
    // 数据库,范围,指标
    result, err := queryAPI.Query(context.Background(), `from(bucket:"pro1")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "pro1")`) //ctx context.Context, query string
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    }
}

//此处只做演示用所以写死,请根据自己实际情况更改

func Writeapi() {
    writeAPI := cli.WriteAPIBlocking("project1", "pro1")
    topic := "meminfo"
    var getchan chan *sarama.ConsumerMessage
    getchan = make(chan *sarama.ConsumerMessage, 10)
    var list mem.SwapMemoryStat
    go kafka.Consumer(topic, getchan)

    // Use blocking write client for writes to desired bucket
    for msg := range getchan {
        logrus.Info("influxdb get new message")
        //    list := kafka.Kafkainoutput()
        // var list *mem.SwapMemoryStat
        err := json.Unmarshal(msg.Value, &list)
        if err != nil {
            logrus.Error("influx failed to unmarshal info")
        }
        tag := map[string]string{"men": "meminfo"}
        fields := map[string]interface{}{
            "Total ":      int64(list.Total),
            "Used":        int64(list.Used),
            "Free":        int64(list.Free),
            "UsedPercent": float64(list.UsedPercent),
            "Sin ":        int64(list.Sin),
            "Sout":        int64(list.Sout),
            "PgIn":        int64(list.PgIn),
            "PgOut":       int64(list.PgOut),
            "PgFault":     int64(list.PgFault),
        }
        //measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time
        p := influxdb2.NewPoint("meminfo", tag, fields, time.Now())
        //非阻塞异步写入
        err = writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            logrus.Warn("Failed to inject")
        } else {
            logrus.Info("Success")
        }
        // Or write directly line protocol
        // line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
        // writeAPI.WriteRecord(context.Background(), line)
    }
}

监控脚本,这里以meminfo为例

package moni

import (
    "time"

    "example.go/ifdb"
    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

//获取内存信息,这仅仅只是一个演示用的示例
var (
//Infochan chan *mem.SwapMemoryStat
)

func Getcpuinfo() {
    // var list *mem.SwapMemoryStat
    msg := &sarama.ProducerMessage{}
    msg.Topic = "meminfo"
    ifdb.Conninflux()
    for {
        meminfo, _ := mem.SwapMemory()
        //logrus.Info(meminfo.String())
        msg.Value = sarama.StringEncoder(meminfo.String())
        logrus.Info(msg.Value)
        kafka.Kafkareceive(msg)
        time.Sleep(3 * time.Second)
        // data, err := json.MarshalIndent(meminfo, "", "")
        // if err != nil {
        //     logrus.Warn("Failed to marshal meminfo")
        //     continue
        // }
        // err = json.Unmarshal(data, &list)
        // if err != nil {
        //     logrus.Warn("Failed to unmarshal meminfo!")
        //     continue
        // }
        // msg.Value = sarama.StringEncoder(meminfo)
        // kafka.Kafkainsend(meminfo)
        //        ifdb.Writeapi(*meminfo)
    }
}
package global

type Configure_list struct {
    Path  string `json:"path"`
    Topic string `json:"topic"`
}

type Sysinfo struct {
    Ip   string
    Host string
    Data interface{}
}
package addrget

import (
    "net"
    "strings"

    "github.com/sirupsen/logrus"
)

//获取本地ip
func GetIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:8888")
    if err != nil {
        logrus.Error("Fail to get IP")
        return "CAN NOT GET IP", err
    }
    defer conn.Close()

    ipaddr := conn.LocalAddr()
    ip := strings.Split(ipaddr.String(), ":")[0]
    return ip, err
}
package kafka

import (
    "fmt"
    "sync"

    "github.com/Shopify/sarama"
    "github.com/shirou/gopsutil/mem"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client  sarama.SyncProducer
    sarasend_chan chan *sarama.ProducerMessage
    insend_chan   chan *mem.SwapMemoryStat
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    sarasend_chan = make(chan *sarama.ProducerMessage, size)
    insend_chan = make(chan *mem.SwapMemoryStat, 10)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Consumer(topic string, getchan chan *sarama.ConsumerMessage) { //消费者示例
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        logrus.Error("Kafka failed to create consumer")
        return
    }
    defer consumer.Close()

    partitionlist, err := consumer.Partitions(topic) //根据topic获取partition列表
    if err != nil {
        logrus.Error("Kafka failed to get partitionlist")
        return
    }
    fmt.Println(partitionlist)
    var wg sync.WaitGroup

    wg.Add(len(partitionlist))

    for partition := range partitionlist {
        part_consumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) //针对每一个分区都进行消费
        if err != nil {
            logrus.WithFields(logrus.Fields{"partition": partition}).Error("partition consume error happened!", err)
            return
        }

        go func(sarama.PartitionConsumer) {
            //defer wg.Done() //也许不需要done?
            for msg := range part_consumer.Messages() {
                fmt.Printf("partition:%d\noffset:%d\ntopic:%s\nvalue:%s", msg.Partition, msg.Offset, msg.Topic, msg.Value)
                getchan <- msg
            }

        }(part_consumer)
    }
    wg.Wait()
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    sarasend_chan <- msg
    logrus.Info("kafka get msg", msg)
}

func Kafkaoutput() *sarama.ProducerMessage {
    logrus.Info("kafka start to outout msg")
    return <-sarasend_chan
}
package tailfile

import (
    "context"
    "fmt"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    // change_chan  chan []global.Configure_list
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
    ctx         context.Context
    cancel      context.CancelFunc
}

//监控对象配置初始化
func type_init(path string, topic string) *File_reader {
    ctx, cancel := context.WithCancel(context.Background())
    tobj := File_reader{
        Path:   path,
        Topic:  topic,
        ctx:    ctx,
        cancel: cancel,
    }
    return &tobj
}

func (tobj *File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    logrus.WithFields(logrus.Fields{"FIle": tobj.Path}).Info("Start to monitor file-----")
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        select {
        case <-tobj.ctx.Done():
            logrus.WithFields(logrus.Fields{"File": tobj.Path}).Info("STOP READ FILE:")
            return
        case line, ok := <-tobj.File_handle.Lines:
            if len(line.Text) == 0 {
                continue
            }
            if ok {
                logrus.Info("get message")
                msg.Value = sarama.StringEncoder(line.Text)
                fmt.Println(msg.Value)
                kafka.Kafkareceive(msg)
                logrus.Info("send message")
            } else {
                logrus.Error("failed to get message")
                continue
            }
        }

    }
}

//监控对象初始化
func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}

package tailfile

import (
    "example.go/global"
    "github.com/sirupsen/logrus"
)

//文件监控句柄管理对象
type tailobjmgr struct {
    tailobj     map[string]*File_reader
    tailobjconf []global.Configure_list
    chg_chan    chan []global.Configure_list
}

var (
    objmgr *tailobjmgr
)

//初始化函数
func Tailinit(flist []global.Configure_list) (err error) {
    objmgr = &tailobjmgr{
        tailobj:     make(map[string]*File_reader),
        tailobjconf: flist,
        chg_chan:    make(chan []global.Configure_list),
    }

    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        objmgr.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    logrus.Info("All tailfile inited")

    go objmgr.watch()
    return err
}

//等待更新
func (mobj *tailobjmgr) watch() {
    logrus.Info("tail's watchdog is waiting")
    for {
        listen := <-mobj.chg_chan //等待etcd给他发送新配置
        logrus.WithFields(logrus.Fields{"newconf": listen}).Info("get new configure:")
        for _, item := range listen {
            if mobj.isExist(item) { //如果是存在的,则跳过
                continue
            }
            t_file := type_init(item.Path, item.Topic) //不存在则初始化
            err := t_file.init()                       //初始化
            if err != nil {
                logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
                continue
            }
            mobj.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
            logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
            go t_file.run()
        }
        //把存在于登记列表中,但是在新配置中不存在的任务给停掉
        var flag bool
        logrus.Info("start to check")
        for key, item := range mobj.tailobj {
            flag = false
            for _, itx := range listen {
                if key == itx.Path {
                    logrus.WithFields(logrus.Fields{"file": key}).Info("get file")
                    flag = true
                }
            }
            if !flag {
                logrus.WithFields(logrus.Fields{"file": key}).Info("File watch will stop")
                item.cancel()
                delete(mobj.tailobj, key) //一定要记得删除,不然被停止的文件下次加载会被当成已存在的而被跳过无法初始化
            }
        }
        logrus.Info("check over")
    }

}

func Sendnewconf(newconf []global.Configure_list) { //让etcd发送新配置过来
    objmgr.chg_chan <- newconf
}

func (mobj *tailobjmgr) isExist(conf global.Configure_list) bool { //判断该项文件是否已在登记的目录里
    _, ok := mobj.tailobj[conf.Path]
    return ok
}
package etcd

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    "example.go/global"
    "example.go/tailfile"
    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

// type Configure_list struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []global.Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    stu := get.Kvs[0]
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        return
    }
    return list, err
}

//监控日志配置项是否发生变化
func Etcd_watchdog(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    watchdog := client.Watch(ctx, key) //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
    var newlist []global.Configure_list
    logrus.Info("watchdog stand by ")
    for re := range watchdog {
        logrus.Info("etcd watchdog catchs new message!")
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            //原本此处设计动作为del时会清空,但是实测channel接收不到del这个信号,暂弃,如需清空请传入"[{\"path\": \"\",\"topic\":\"\"}]"或随便传入一个不存在的地址
            // if ev.Type == clientv3.EventTypeDelete { //storagepb.Event_EventType DELETE = 1
            //     logrus.Warn("RECEIVED DELETE OPERATION")
            //     tailfile.Sendnewconf(newlist)
            //     logrus.Warn("YOU DELETE ALL CONFIGURES")
            //     return
            // }
            err := json.Unmarshal(ev.Kv.Value, &newlist)
            if err != nil {
                logrus.Error("failed to unmarshal now conf!")
                continue
            }
            logrus.Info("trying to send new configure")
            tailfile.Sendnewconf(newlist)
            logrus.Info("send new configure successfully")
        }
    }
}

// td@tddeMacBook-Air ~ % etcdctl put testk1 "[{\"path\": \"/Users/td/Documents/blog/prod.log\",\"topic\":\"prod\"},{\"path\": \"/Users/td/Documents/blog/readme.log\",\"topic\":\"readme\"}]"
// OK
// td@tddeMacBook-Air ~ % etcdctl get testk1
// testk1
// [{"path": "/Users/td/Documents/blog/prod.log","topic":"prod"},{"path": "/Users/td/Documents/blog/readme.log","topic":"readme"}]
[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "etcd_key_%s_testk1"

当etcd发送变更的时候,会根据新配置来新增、删除文件监控,并实现通过部署机的ip来区分加载不同的etcd简直对
example.go

package main

import (
    "fmt"
    "sync"

    "example.go/addrget"
    "example.go/etcd"
    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    ip, err := addrget.GetIP() //获取本机ip
    if err != nil {
        logrus.Error("FAILED TO GET IP!!!!", err)
        return
    }

    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_key := fmt.Sprintf(configOBJ.Etcd.Collectkey, ip)

    conf_list, err := etcd.Etcd_getvalue(conf_key) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    /**
    // go tailfile.Tailgetnewconf()
    **/

    go send_to_kafka()
    var wg sync.WaitGroup
    wg.Add(1)
    //    go send_to_kafka()
    go etcd.Etcd_watchdog(conf_key, &wg)
    wg.Wait()

}

func send_to_kafka() { //给kafka发送数据
    //    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

getaddr.go

package addrget

import (
    "net"
    "strings"

    "github.com/sirupsen/logrus"
)

//获取本地ip
func GetIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:8888")
    if err != nil {
        logrus.Error("Fail to get IP")
        return "CAN NOT GET IP", err
    }
    defer conn.Close()

    ipaddr := conn.LocalAddr()
    ip := strings.Split(ipaddr.String(), ":")[0]
    return ip, err
}

kafka.go

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}

// func Send_kafka() {

// }

tailread.go

package tailfile

import (
    "context"
    "fmt"

    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    // change_chan  chan []global.Configure_list
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
    ctx         context.Context
    cancel      context.CancelFunc
}

//监控对象配置初始化
func type_init(path string, topic string) *File_reader {
    ctx, cancel := context.WithCancel(context.Background())
    tobj := File_reader{
        Path:   path,
        Topic:  topic,
        ctx:    ctx,
        cancel: cancel,
    }
    return &tobj
}

func (tobj *File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    logrus.WithFields(logrus.Fields{"FIle": tobj.Path}).Info("Start to monitor file-----")
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        select {
        case <-tobj.ctx.Done():
            logrus.WithFields(logrus.Fields{"File": tobj.Path}).Info("STOP READ FILE:")
            return
        case line, ok := <-tobj.File_handle.Lines:
            if len(line.Text) == 0 {
                continue
            }
            if ok {
                logrus.Info("get message")
                msg.Value = sarama.StringEncoder(line.Text)
                fmt.Println(msg.Value)
                kafka.Kafkareceive(msg)
                logrus.Info("send message")
            } else {
                logrus.Error("failed to get message")
                continue
            }
        }

    }
}

//监控对象初始化
func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}

tailmanager.go

package tailfile

import (
    "example.go/global"
    "github.com/sirupsen/logrus"
)

//文件监控句柄管理对象
type tailobjmgr struct {
    tailobj     map[string]*File_reader
    tailobjconf []global.Configure_list
    chg_chan    chan []global.Configure_list
}

var (
    objmgr *tailobjmgr
)

//初始化函数
func Tailinit(flist []global.Configure_list) (err error) {
    objmgr = &tailobjmgr{
        tailobj:     make(map[string]*File_reader),
        tailobjconf: flist,
        chg_chan:    make(chan []global.Configure_list),
    }

    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        objmgr.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    logrus.Info("All tailfile inited")

    go objmgr.watch()
    return err
}

//等待更新
func (mobj *tailobjmgr) watch() {
    logrus.Info("tail's watchdog is waiting")
    for {
        listen := <-mobj.chg_chan //等待etcd给他发送新配置
        logrus.WithFields(logrus.Fields{"newconf": listen}).Info("get new configure:")
        for _, item := range listen {
            if mobj.isExist(item) { //如果是存在的,则跳过
                continue
            }
            t_file := type_init(item.Path, item.Topic) //不存在则初始化
            err := t_file.init()                       //初始化
            if err != nil {
                logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
                continue
            }
            mobj.tailobj[t_file.Path] = t_file //把创建的对象放进map里面登记
            logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
            go t_file.run()
        }
        //把存在于登记列表中,但是在新配置中不存在的任务给停掉
        var flag bool
        logrus.Info("start to check")
        for key, item := range mobj.tailobj {
            flag = false
            for _, itx := range listen {
                if key == itx.Path {
                    logrus.WithFields(logrus.Fields{"file": key}).Info("get file")
                    flag = true
                }
            }
            if !flag {
                logrus.WithFields(logrus.Fields{"file": key}).Info("File watch will stop")
                item.cancel()
                delete(mobj.tailobj, key) //一定要记得删除,不然被停止的文件下次加载会被当成已存在的而被跳过无法初始化
            }
        }
        logrus.Info("check over")
    }

}

func Sendnewconf(newconf []global.Configure_list) { //让etcd发送新配置过来
    objmgr.chg_chan <- newconf
}

func (mobj *tailobjmgr) isExist(conf global.Configure_list) bool { //判断该项文件是否已在登记的目录里
    _, ok := mobj.tailobj[conf.Path]
    return ok
}

etcd.go:注,暂不支持etcd del操作,会收不到信号

package etcd

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    "example.go/global"
    "example.go/tailfile"
    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

// type Configure_list struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []global.Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    stu := get.Kvs[0]
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        return
    }
    return list, err
}

//监控日志配置项是否发生变化
func Etcd_watchdog(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    watchdog := client.Watch(ctx, key) //WatchChan-->WatchResponse-->[]Events-->type/Kv-->key/value
    var newlist []global.Configure_list
    logrus.Info("watchdog stand by ")
    for re := range watchdog {
        logrus.Info("etcd watchdog catchs new message!")
        for _, ev := range re.Events {
            fmt.Printf("Operation type is:%s\nkey:%s\nvalue:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            //原本此处设计动作为del时会清空,但是实测channel接收不到del这个信号,暂弃,如需清空请传入"[{\"path\": \"\",\"topic\":\"\"}]"或随便传入一个不存在的地址
            // if ev.Type == clientv3.EventTypeDelete { //storagepb.Event_EventType DELETE = 1
            //     logrus.Warn("RECEIVED DELETE OPERATION")
            //     tailfile.Sendnewconf(newlist)
            //     logrus.Warn("YOU DELETE ALL CONFIGURES")
            //     return
            // }
            err := json.Unmarshal(ev.Kv.Value, &newlist)
            if err != nil {
                logrus.Error("failed to unmarshal now conf!")
                continue
            }
            logrus.Info("trying to send new configure")
            tailfile.Sendnewconf(newlist)
            logrus.Info("send new configure successfully")
        }
    }
}

// td@tddeMacBook-Air ~ % etcdctl put testk1 "[{\"path\": \"/Users/td/Documents/blog/prod.log\",\"topic\":\"prod\"},{\"path\": \"/Users/td/Documents/blog/readme.log\",\"topic\":\"readme\"}]"
// OK
// td@tddeMacBook-Air ~ % etcdctl get testk1
// testk1
// [{"path": "/Users/td/Documents/blog/prod.log","topic":"prod"},{"path": "/Users/td/Documents/blog/readme.log","topic":"readme"}]

configure.ini

[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "etcd_key_%s_testk1"

ini读取基本的初始化配置,etcd通过key去获取对应的json字符串,tail解析其中的路径和对应的Kafka topic,监控对应的文件并把最新的内容发送到对应的topic
example.go

package main

import (
    "fmt"
    "sync"

    "example.go/etcd"
    "example.go/kafka"
    "example.go/tailfile"
    "github.com/sirupsen/logrus"
    "gopkg.in/ini.v1"
)

type Kafka struct {
    Address string `ini:"addr"`
    Topic   string `ini:"topic"`
    Size    int64  `ini:"chan_size"`
}

type Tsdb struct {
    Address    string `ini:"address"`
    Configpath string `ini:"confpath"`
}

type Etcd struct {
    Address    []string `ini:"address"`
    Collectkey string   `ini:"collectkey"`
}

type Configure struct { //映射用结构
    Kafka `ini:"kafka"`
    Tsdb  `ini:"tsdb"`
    Etcd  `ini:"etcd"`
}

func main() {
    logrus.SetLevel(logrus.DebugLevel)
    var configOBJ = new(Configure) //创建一个configure类型的指针并分配空间
    cfg, err := ini.Load("./configure.ini")
    if err != nil {
        logrus.Error("failed to load configure:", err)
        return
    }
    // err = ini.MapTo(configOBJ,cfg)
    // err := ini.MapTo(configOBJ, "./configure.ini")
    err = cfg.MapTo(configOBJ) //映射
    if err != nil {
        logrus.Error("failed to reflect:", err)
        return
    }
    fmt.Printf("%#v\n", configOBJ)

    err = kafka.Kafkainit([]string{configOBJ.Kafka.Address}, configOBJ.Kafka.Size) //类型转换,kafka初始化
    if err != nil {
        logrus.Error("kafka init failed:", err)
        return
    }
    logrus.Debug("kafka init success")

    err = etcd.Etcd_ini(configOBJ.Etcd.Address) //etcd初始化
    if err != nil {
        logrus.Error("failed to init etcd")
        return
    }
    logrus.Info("etcd init success")

    conf_list, err := etcd.Etcd_getvalue(configOBJ.Etcd.Collectkey) //通过etcd获取value
    if err != nil {
        logrus.Error("faild to get value from etcd  ", err)
        return
    }
    // fmt.Printf("list:%v", list)
    fmt.Println(conf_list)

    err = tailfile.Tailinit(conf_list) //tail初始化,根据etcd返回的json去加载对应要读取的日志文件
    if err != nil {
        logrus.WithFields(logrus.Fields{"filename": configOBJ.Configpath, "err": err}).Error("file loaded failed")
        return
    }
    logrus.WithFields(logrus.Fields{"filename": configOBJ.Tsdb.Configpath}).Debug("file loaded successfully")

    var wg sync.WaitGroup
    wg.Add(1)
    go send_to_kafka(&wg)
    wg.Wait()

}

func send_to_kafka(wg *sync.WaitGroup) { //给kafka发送数据
    defer wg.Done()
    defer kafka.Kafka_client.Close()
    for {
        partition, offset, err := kafka.Kafka_client.SendMessage(kafka.Kafkaoutput())
        if err != nil {
            logrus.Error("kafka send failed")
        } else {
            logrus.WithFields(logrus.Fields{"partition": partition, "offset": offset}).Info("send message success")
        }
    }
}

tailread.go

package tailfile

import (
    "fmt"

    "example.go/etcd"
    "example.go/kafka"
    "github.com/Shopify/sarama"
    "github.com/nxadm/tail"
    "github.com/sirupsen/logrus"
)

var (
    Tail_context *tail.Tail
    //Context_listen chan *tail.Line
)

type File_reader struct {
    Path        string
    Topic       string
    File_handle *tail.Tail
}

func type_init(path string, topic string) *File_reader {
    tobj := File_reader{
        Path:  path,
        Topic: topic,
    }
    return &tobj
}

func (tobj File_reader) run() { //从tail中接收读取到的信息并发送给kafka的channel
    //    defer wg.Done()
    msg := &sarama.ProducerMessage{}
    msg.Topic = tobj.Topic
    for {
        line, ok := <-tobj.File_handle.Lines
        if len(line.Text) == 0 {
            continue
        }
        if ok {
            logrus.Info("get message")
            msg.Value = sarama.StringEncoder(line.Text)
            fmt.Println(msg.Value)
            kafka.Kafkareceive(msg)
            logrus.Info("send message")
        } else {
            logrus.Error("failed to get message")
            continue
        }
    }
}

func (tobj *File_reader) init() (err error) {
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,                                //允许文件不存在
        Poll:      true,                                 //轮询
    }
    tobj.File_handle, err = tail.TailFile(tobj.Path, config)
    return err
}
func Tailinit(flist []etcd.Configure_list) (err error) {
    for _, item := range flist {
        t_file := type_init(item.Path, item.Topic) //导入基本信息
        err = t_file.init()                        //初始化
        if err != nil {
            logrus.WithFields(logrus.Fields{"File": item.Path, "topic": item.Topic}).Error("failed to init tailfile:", err)
            continue
        }
        logrus.WithFields(logrus.Fields{"File": t_file.Path}).Info("Init tailfile successfully")
        go t_file.run()
    }

    return err
}

kafka.go

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/sirupsen/logrus"
)

var (
    Kafka_client sarama.SyncProducer
    send_chan    chan *sarama.ProducerMessage
)

func Kafkainit(address []string, size int64) (err error) {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    send_chan = make(chan *sarama.ProducerMessage, size)

    Kafka_client, err = sarama.NewSyncProducer(address, config) //使用参数创建一个SyncProducer
    //    defer kafka_client.Close()
    if err != nil {
        logrus.Error("broker connection has something wrong happened:", err)
        return err
    }
    return err
}

func Kafkareceive(msg *sarama.ProducerMessage) {
    send_chan <- msg
}

func Kafkaoutput() *sarama.ProducerMessage {
    return <-send_chan
}

etcd.go

package etcd

import (
    "context"
    "encoding/json"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/sirupsen/logrus"
)

var (
    client *clientv3.Client
)

type Configure_list struct {
    Path  string `json:"path"`
    Topic string `json:"topic"`
}

// type LogEntry struct {
//     Path  string `json:"path"`
//     Topic string `json:"topic"`
// }

func Etcd_ini(address []string) (err error) {
    client, err = clientv3.New(clientv3.Config{Endpoints: address, DialTimeout: 3 * time.Second})
    if err != nil {
        logrus.Error("Failed to connect etcd", err)
        return err
    }
    return err
}

func Etcd_getvalue(key string) (list []Configure_list, err error) { //    通过键获取值,返回json格式
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    get, err := client.Get(ctx, key)
    if err != nil {
        logrus.WithFields(logrus.Fields{key: key}).Error("failed to get key value", err)
        return
    }
    if len(get.Kvs) == 0 {
        logrus.WithFields(logrus.Fields{key: key}).Error("empty value!", err)
        return
    }
    // for _, item := range get.Kvs {
    //     fmt.Printf("key:%s  value:%s\n", item.Key, item.Value)
    //     err = json.Unmarshal(item.Value, &list)
    //     if err != nil {
    //         logrus.Error("failed to unmarshal  ", err)
    //         return []configure_list{}, err
    //     }
    // }
    stu := get.Kvs[0]
    // fmt.Println(string(stu.Value))
    // fmt.Printf("%#v\n", stu.Value)
    // stu.Value = bytes.TrimPrefix(stu.Value, []byte("\xef\xbb\xbf"))
    err = json.Unmarshal(stu.Value, &list)
    if err != nil {
        logrus.Error("failed to unmarshal  ", err)
        // if e, ok := err.(*json.SyntaxError); ok {
        //     logrus.Printf("syntax error at byte offset %d", e.Offset)
        // }
        // logrus.Printf("sakura response: %q", stu.Value)
        return
    }
    return list, err
}

configure.ini

[kafka]
addr = "127.0.0.1:9092"
topic = "test1"
chan_size = 10000

[tsdb]
address = 127.0.0.1
confpath = "./readme.log"

[etcd]
address = "127.0.0.1:2379"
collectkey = "testk1"

运行卡夫卡消费者,对对应文件进行修改,查看效果,测试结果正常

go run ./example.go                                          
&main.Configure{Kafka:main.Kafka{Address:"127.0.0.1:9092", Topic:"test1", Size:10000}, Tsdb:main.Tsdb{Address:"127.0.0.1", Configpath:"./readme.log"}, Etcd:main.Etcd{Address:[]string{"127.0.0.1:2379"}, Collectkey:"testk1"}}
DEBU[0000] kafka init success                           
INFO[0000] etcd init success                            
[{/Users/td/Documents/blog/prod.log prod} {/Users/td/Documents/blog/readme.log readme}]
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/prod.log
INFO[0000] Init tailfile successfully                    File=/Users/td/Documents/blog/readme.log
DEBU[0000] file loaded successfully                      filename=./readme.log
INFO[0010] get message                                  
444
INFO[0010] send message                                 
INFO[0010] send message success                          offset=1 partition=0
INFO[0017] get message                                  
555
INFO[0017] send message                                 
INFO[0017] send message success                          offset=0 partition=0
INFO[0101] get message                                  
666
INFO[0101] send message                                 
INFO[0101] send message success                          offset=2 partition=0
INFO[0123] get message                                  
777
INFO[0123] send message                                 
INFO[0123] send message success                          offset=1 partition=0


kafka-console-consumer --bootstrap-server 127.0.0.1:909
2 --topic readme
666

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic prod
777