分类 go 下的文章

tail读取文件

go get github.com/nxadm/tail/...

go get github.com/hpcloud/tail/...因为依赖包改了地址而这里没改,所以直接get会出现问题,使用上面这个作为替代
package main

import (
    "fmt"
    "time"

    "github.com/nxadm/tail"
)

func main() {

    filename := "./text.txt"
    config := tail.Config{
        ReOpen:    true, //当出现轮转切割的时候,这两个参数会去追踪新的文件
        Follow:    true,
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从哪开始
        MustExist: false,//允许文件不存在
        Poll:      true, //轮询
    }

    tails, err := tail.TailFile(filename, config)
    if err != nil {
        fmt.Println("error happens:", err)
    }
    var msg *tail.Line
    var flag bool

    for {
        msg, flag = <-tails.Lines
        if !flag {
            fmt.Println("tail can read anything from ", tails.Filename)
            time.After(1 * time.Second)
        } else {
            fmt.Println(msg.Text)
        }
    }
}

此处使用另外安装的zookeeper,不使用kafka自带的

启动zookeeper,修改datadir,修改server.properties里的listen

listeners=PLAINTEXT://127.0.0.1:9092

bin/kafka-server-start /usr/local/etc/kafka/server.properties

go get github.com/Shopify/sarama //mac、linux直接装就行,windows涉及到gcc会很麻烦,不推荐使用win编程

简单实例

连接kafka并发送消息

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()                              //配置
    config.Producer.RequiredAcks = sarama.WaitForAll          //ack应答模式为all
    config.Producer.Partitioner = sarama.NewRandomPartitioner //partition的选择方式为随机
    config.Producer.Return.Successes = true                   //Successes channel 返回值

    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) //使用参数创建一个SyncProducer
    defer client.Close()
    if err != nil {
        fmt.Println("broker connection has something wrong happened")
        return
    }

    message := &sarama.ProducerMessage{} //producermassage是一个结构体,用于给生产者发送信息
    message.Topic = "test1"
    message.Value = sarama.StringEncoder("2022-08-17") //StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

    partition, offset, err := client.SendMessage(message) //发送信息
    if err != nil {
        fmt.Printf("message send wrong :%v\n", err)
        return
    }
    fmt.Printf("partition:%v \noffset:%v\n", partition, offset)
}

简单跑几下

go run ./example.go 
partition:0 
offset:0
go run ./example.go
partition:0 
offset:1
go run ./example.go
partition:0 
offset:2

kafka

[2022-08-17 14:17:30,498] INFO Created log for partition test1-0 in /usr/local/var/lib/kafka-logs/test1-0 with properties {} (kafka.log.LogManager)
[2022-08-17 14:17:30,503] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2022-08-17 14:17:30,505] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)

在日志存放目录确实看到了新生成的目录/usr/local/var/lib/kafka-logs/test1-0,其下生成了索引和数据文件

ls /usr/local/var/lib/kafka-logs/test1-0
00000000000000000000.index    leader-epoch-checkpoint
00000000000000000000.log    partition.metadata
00000000000000000000.timeindex

我们运行起一个消费者,然后再用脚本发送消息,就可以看到发过来的信息

bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1
2022-08-17
2022-08-17
2022-08-17

context包是官方定义的一个上下文包,用于在goroutine之间传递上下文用,其采用树状结构,根节点退出后子goroutine也会跟着退出

type Context interface {
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct{}

    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error

    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)

    // 获取 key 对应的 value
    Value(key interface{}) interface{}
}

创建

context.Background 是上下文的默认值,一般作为根节点在main中使用

context.TODO 应该只在不确定应该使用哪种上下文时使用

以上两种类型想要发挥作用还得搭配各种with方法来说使用

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

withcancel

返回父context和cancelfunc

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background()) //根

    son_ctx, _ := context.WithCancel(ctx) //子

    var wg sync.WaitGroup

    wg.Add(2)

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("ALL DONE")
                fmt.Println(ctx.Err())
                wg.Done()
                return
            default:
                fmt.Println("continue")
            }
        }
    }()

    go func() {
        for {
            select {
            case <-son_ctx.Done(): //父收到退出信号,子也会跟着退出
                fmt.Println("son die too")
                wg.Done()
                return
            }
        }
    }()

    time.Sleep(time.Duration(2) * time.Second)
    cancel()
    wg.Wait()
}


//continue
//son die too
//ALL DONE
//context canceled

withTimeout/withDeadline

传入一个父context和超时时间

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

withValue

func WithValue(parent Context, key, val any) Context

传入的键必须是可比较类型,且不能是string等内置类型

package main

import (
    "context"
    "fmt"
)

func main() {
    type favContextKey string

    f := func(ctx context.Context, k favContextKey) {
        if v := ctx.Value(k); v != nil {
            fmt.Println("found value:", v)
            return
        }
        fmt.Println("key not found:", k)
    }

    k := favContextKey("language")
    ctx := context.WithValue(context.Background(), k, "Go")

    f(ctx, k)
    f(ctx, favContextKey("color"))

}

//Output:

//found value: Go
//key not found: color

go 的testing包提供了一个单元测试框架,当你想进行一场单元测试,你只需要在目录下创建一个以_test为结尾的go文件,导入这个包,编写一个以Test为开头的测试函数,最后再用go test来运行这个文件就行
例子:测试我在sync这一章中写的print_something函数

package main

import (
    "io"
    "os"
    "strings"
    "sync"
    "testing"
)

func Test_printSomething(t *testing.T) { //testing测试框架,函数必须以Test开头,调用go test调用
    stdOut := os.Stdout //标准输出文件描述符(type file struct)

    r, w, _ := os.Pipe() //Pipe returns a connected pair of Files; reads from r return bytes written to w. It returns the files and an error, if any.
    os.Stdout = w

    var wg sync.WaitGroup
    wg.Add(1)

    go print_something("epsilon", &wg)

    wg.Wait()

    _ = w.Close() //Close closes the File, rendering it unusable for I/O. On files that support SetDeadline, any pending I/O operations will be canceled and return immediately with an ErrClosed error. Close will return an error if it has already been called.

    result, _ := io.ReadAll(r) //func ReadAll(r Reader) ([]byte, error).ReadAll reads from r until an error or EOF and returns the data it read. A successful call returns err == nil, not err == EOF
    output := string(result)

    os.Stdout = stdOut

    if !strings.Contains(output, "epsilon") {
        t.Errorf("Expected to find epsilon, but it is not there")
    }
}


//go test .  
//ok      modulename      0.014s

http://www.jquerycn.cn/a_41217

sync.WaitGroup

我们在前面的例子里面,为了保证协程的运行,使用了time.Sleep来保证他的运行,但这是一种很不明智的做法,当我们就一个的时候我们可以写个等待一秒,但是当我们有许多个协程的时候呢?我们需要等待多久?或者说难道一个协程等一秒?不管哪个方案都是一种灾难
WaitGroup会阻塞线程直到一组协程结束,他使用add来衡量这组协程的规模,使用wait来进行阻塞直到计数器为0,使用done来减少计数器的值,因此done应在函数执行完之后再被执行。需要注意的是当add里面的计数器为0,所有被阻塞的线程都会被释放,而当小于0的时候会引发panic

package main

import (
    "fmt"
    "sync"
)

func print_something(s string, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println(s)
}

func main() {
    var wg sync.WaitGroup
    list := []string{
        "first",
        "second",
        "third",
        "four",
        "five",
        "six",
        "seven",
    }
    wg.Add(7)
    for i, x := range list {
        go print_something(fmt.Sprintf("%d:%s", i, x), &wg)
    }
    wg.Wait()
}

sync.Mutex

go协程的调度是由go来确定的,我们并不知道其具体的顺序,当多个协程竞争一个变量时很有可能会出现data race问题,这个时候得用互斥锁来保证最后结果的正确性
当一个协程使用lock的时候,这个被lock的资源直到他unlock释放之前,除了他其他的协程都不能动

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup
var msg string

func update(s string, m *sync.Mutex) {
    defer wg.Done()

    m.Lock()
    msg = s
    m.Unlock()
}

func main() {
    msg = "init"
    var mutex sync.Mutex //mutex is a struct

    wg.Add(2)
    go update("hello,nice to see you", &mutex)
    go update("vice versa", &mutex)
    wg.Wait()
    fmt.Println(msg)
}

//go run ./example.go
//hello,nice to see you
//go run ./example.go
//hello,nice to see you
//go run ./example.go
//vice versa

让我们现在来看看如果没有锁会发生什么。。。

package main

import "testing"

func Test_update(t *testing.T) {
    msg = "nihao"

    wg.Add(2)
    go update("hello")
    go update("ohayo")
    wg.Wait()

    if msg != "ohayo" {
        t.Errorf("error value in msg")
    }
}
go test -race .
==================
WARNING: DATA RACE
Write at 0x0000012cb9e0 by goroutine 9:
  modulename.update()
      /Users/td/Documents/blog/example.go:13 +0x6f
  modulename.Test_update.func2()
      /Users/td/Documents/blog/example_test.go:10 +0x37

Previous write at 0x0000012cb9e0 by goroutine 8:
  modulename.update()
      /Users/td/Documents/blog/example.go:13 +0x6f
  modulename.Test_update.func1()
      /Users/td/Documents/blog/example_test.go:9 +0x37

Goroutine 9 (running) created at:
  modulename.Test_update()
      /Users/td/Documents/blog/example_test.go:10 +0x92
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47

Goroutine 8 (finished) created at:
  modulename.Test_update()
      /Users/td/Documents/blog/example_test.go:9 +0x86
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47
==================
--- FAIL: Test_update (0.00s)
    testing.go:1312: race detected during execution of test
FAIL
FAIL    modulename      0.021s
FAIL

dining philosophers

有五个哲学家绕着一张桌子吃面,这种面必须要两把叉子才能吃,现在每人面前有一个盘子,盘子两边各自有一个叉子,也就是说相邻的两个人不可能同时吃饭,现在设计一个程序来让这五个人都吃到饭

Trevor 的原设计方案是创建左右叉子同步锁,用循环来运行协程,除开第一个人,后面每个人的左手叉子是上一个人的右手叉子,而他的右手叉子则是一个新的锁。这样就会导致一个问题:理论上来讲最后一个人的右手叉子应该是第一个人的左手叉子,但是他这种设计方案使得最后一个的右手叉子变成了一个新的独立的锁,叉子数量实际上多了一个!

新方案:给每个叉子编号,跟人同号代表那人左手边的叉子,同时最大只允许四个人拿起左叉子

package main

import (
    "fmt"
    "sync"
    "time"
)

// The Dining Philosophers problem is well known in computer science circles.
// Five philosophers, numbered from 0 through 4, live in a house where the
// table is laid for them; each philosopher has their own place at the table.
// Their only difficulty – besides those of philosophy – is that the dish
// served is a very difficult kind of spaghetti which has to be eaten with
// two forks. There are two forks next to each plate, so that presents no
// difficulty. As a consequence, however, this means that no two neighbours
// may be eating simultaneously.

// constants
const hunger = 3

// variables
var philosophers = []string{"Plato", "Socrates", "Aristotle", "Pascal", "Locke"}
var wg sync.WaitGroup
var sleepTime = 1 * time.Second
var eatTime = 3 * time.Second
var leftlocknumber = 0

func diningProblem(philosopher string, leftfork *sync.Mutex, rightfork *sync.Mutex) {
    defer wg.Done()
    fmt.Println(philosopher, "is steated")

    time.Sleep(sleepTime)

    for i := hunger; i > 0; i-- {
        fmt.Println(philosopher, "feels so hungry")
        time.Sleep(sleepTime)

        if leftlocknumber <= 4 {

            leftfork.Lock()
            leftlocknumber++
            fmt.Println(philosopher, "gets the leftfork")

            rightfork.Lock()
            fmt.Println(philosopher, "gets the rightfork")

            fmt.Println(philosopher, "gets all forks needed,then start to eat")
            time.Sleep(sleepTime)

            rightfork.Unlock()
            fmt.Println(philosopher, "puts down rightfork")

            leftfork.Unlock()
            leftlocknumber--
            fmt.Println(philosopher, "puts down leftfork")
        } else {
            i++
            fmt.Printf("too many people!%d have to wait \n", philosopher)
            time.Sleep(sleepTime)
        }
    }

    fmt.Println(philosopher, "is satisfied")
    time.Sleep(sleepTime)
    fmt.Println(philosopher, "leaves")
}

func main() {
    fmt.Println("The dining philosopher problem")
    fmt.Println("------------------------------")
    forks := make([]*sync.Mutex, 5)

    for i := 0; i < len(philosophers); i++ {
        forks[i] = &sync.Mutex{}
    }

    wg.Add(len(philosophers))

    for i := 0; i < len(philosophers); i++ {
        if i == 4 {
            go diningProblem(philosophers[i], forks[i], forks[0])
        } else {
            go diningProblem(philosophers[i], forks[i], forks[i+1])
        }
    }
    wg.Wait()

    fmt.Println("the table is empty")

}

运行结果

go run .
The dining philosopher problem
------------------------------
Locke is steated
Plato is steated
Socrates is steated
Aristotle is steated
Pascal is steated
Pascal feels so hungry
Locke feels so hungry
Plato feels so hungry
Socrates feels so hungry
Aristotle feels so hungry
Aristotle gets the leftfork
Aristotle gets the rightfork
Locke gets the leftfork
Locke gets the rightfork
Locke gets all forks needed,then start to eat
Aristotle gets all forks needed,then start to eat
Socrates gets the leftfork
Locke puts down rightfork
Locke puts down leftfork
Locke feels so hungry
Aristotle puts down rightfork
Aristotle puts down leftfork
Aristotle feels so hungry
Pascal gets the leftfork
Pascal gets the rightfork
Pascal gets all forks needed,then start to eat
Plato gets the leftfork
Socrates gets the rightfork
Socrates gets all forks needed,then start to eat
Socrates puts down rightfork
Socrates puts down leftfork
Socrates feels so hungry
Plato gets the rightfork
Plato gets all forks needed,then start to eat
Aristotle gets the leftfork
Pascal puts down rightfork
Pascal puts down leftfork
Pascal feels so hungry
Aristotle gets the rightfork
Aristotle gets all forks needed,then start to eat
Locke gets the leftfork
Aristotle puts down rightfork
Aristotle puts down leftfork
Aristotle feels so hungry
Plato puts down rightfork
Plato puts down leftfork
Plato feels so hungry
Locke gets the rightfork
Locke gets all forks needed,then start to eat
Pascal gets the leftfork
Socrates gets the leftfork
Socrates gets the rightfork
Socrates gets all forks needed,then start to eat
Socrates puts down rightfork
Socrates puts down leftfork
Socrates feels so hungry
Locke puts down rightfork
Locke puts down leftfork
Locke feels so hungry
Pascal gets the rightfork
Pascal gets all forks needed,then start to eat
Plato gets the leftfork
Plato gets the rightfork
Plato gets all forks needed,then start to eat
Aristotle gets the leftfork
Plato puts down rightfork
Plato puts down leftfork
Plato feels so hungry
Socrates gets the leftfork
Locke gets the leftfork
Locke gets the rightfork
Locke gets all forks needed,then start to eat
Pascal puts down rightfork
Pascal puts down leftfork
Pascal feels so hungry
Aristotle gets the rightfork
Aristotle gets all forks needed,then start to eat
Aristotle puts down rightfork
Aristotle puts down leftfork
Aristotle is satisfied
Socrates gets the rightfork
Socrates gets all forks needed,then start to eat
Locke puts down rightfork
Locke puts down leftfork
Locke is satisfied
Plato gets the leftfork
Pascal gets the leftfork
Pascal gets the rightfork
Pascal gets all forks needed,then start to eat
Pascal puts down rightfork
Pascal puts down leftfork
Pascal is satisfied
Aristotle leaves
Socrates puts down rightfork
Socrates puts down leftfork
Socrates is satisfied
Plato gets the rightfork
Plato gets all forks needed,then start to eat
Locke leaves
Plato puts down rightfork
Plato puts down leftfork
Plato is satisfied
Pascal leaves
Socrates leaves
Plato leaves
the table is empty