NATS JetStream 是一个高性能、持久化、分布式消息队列系统,它为发布/订阅、队列和流式处理提供了丰富的功能。在 Go 中实现 NATS JetStream 队列可以通过 NATS 客户端库来完成。
以下是一个简单的示例,演示了如何使用 Go 中的 NATS 客户端库连接到 NATS JetStream,并创建队列进行消息发布和订阅:
package mainimport ("log""time""github.com/nats-io/nats.go"
)func main() {// 连接到 NATS 服务器nc, err := nats.Connect("nats://localhost:4222")if err != nil {log.Fatal(err)}defer nc.Close()// 创建 JetStream 连接js, err := nc.JetStream()if err != nil {log.Fatal(err)}// 定义队列名称和主题名称queueName := "my_queue"subject := "my_subject"// 创建持久化队列_, err = js.AddConsumer(queueName, &nats.ConsumerConfig{Durable: queueName,DeliverPolicy: nats.DeliverAllPolicy, // 消息持久化策略FilterSubject: subject, // 订阅的主题})if err != nil {log.Fatal(err)}// 发布消息到 JetStream 队列for i := 0; i < 10; i++ {msg := []byte("Message " + string(i))if err := js.Publish(subject, msg); err != nil {log.Fatal(err)}}// 订阅 JetStream 队列消息js.Subscribe(subject, func(msg *nats.Msg) {log.Printf("Received message: %s", msg.Data)})// 等待一段时间,以便观察消息time.Sleep(5 * time.Second)
}
在这个示例中,我们使用 github.com/nats-io/nats.go
包来连接到 NATS 服务器,并创建了一个 JetStream 连接。
然后,我们使用 JetStream 连接创建了一个持久化队列,并发布了一些消息到指定的主题。最后,我们通过 JetStream 连接订阅了相同的主题,以便接收发布的消息。