NATS

CLI Basics

nats --help                               # general help
nats server --help                        # server subcommands
nats pub --help                           # publish help
nats sub --help                           # subscribe help
nats req --help                           # request help
nats account info                        # account information
nats context save local --server nats://localhost:4222  # save context
nats context select local                 # switch context

Server Setup

nats-server                              # start server (default :4222)
nats-server -p 5222                      # custom port
nats-server -a 0.0.0.0                   # bind address
nats-server -c nats.conf                 # config file
nats-server -js                          # enable JetStream
nats-server -sd /data/nats               # store directory
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node2:6222  # cluster
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest -js

Publish / Subscribe

nats pub subject.hello "Hello World"     # publish message
nats pub subject.data '{"key":"value"}'  # publish JSON
nats sub subject.hello                   # subscribe to subject
nats sub "subject.>"                     # subscribe with wildcard
nats sub subject.hello --queue workers   # queue subscribe

Queue Groups

nats sub orders.new --queue order-workers   # subscriber in queue group
nats sub orders.new --queue order-workers   # another member (load balanced)
nats pub orders.new "order-123"             # message delivered to one member

Messages published to a subject are delivered to only one subscriber within the same queue group, enabling load balancing.

Request / Reply

nats reply service.time "Received at $(date)"   # register replier
nats req service.time ""                          # send request, wait for reply
nats req service.echo "ping" --timeout 5s         # request with timeout

JetStream Streams

nats stream add ORDERS --subjects "orders.>" --storage file --retention limits --max-msgs 100000 --max-age 72h
nats stream info ORDERS
nats stream list
nats stream purge ORDERS
nats stream delete ORDERS
nats stream edit ORDERS --max-msgs 200000
nats stream add EVENTS --subjects "events.>" --storage memory --retention interest --max-bytes 1GB --replicas 3

JetStream Consumers

nats consumer add ORDERS pull-cons --pull --deliver all --replay instant
nats consumer add ORDERS push-cons --target push.ords --deliver last --ack explicit
nats consumer info ORDERS pull-cons
nats consumer list ORDERS
nats consumer next ORDERS pull-cons          # fetch next message
nats consumer delete ORDERS pull-cons

JetStream KV Store

nats kv add config                          # create KV bucket
nats kv put config db.host "db.internal"    # put key-value
nats kv get config db.host                  # get value
nats kv del config db.host                  # delete key
nats kv history config db.host              # revision history
nats kv status config                       # bucket info
nats kv list                                # list all buckets

Wildcards

nats sub "orders.*"                        # single token wildcard
nats sub "orders.>"                        # multi-token wildcard (catch-all)
nats sub ">"                               # subscribe to everything
nats sub "orders.*.eu"                     # match orders.new.eu, orders.update.eu
nats sub "orders.>"                        # match orders.new, orders.new.eu, ...

Wildcard tokens:

  • `*` matches exactly one token
  • `>` matches one or more tokens (must be last)

Retained Messages / Durables

nats consumer add ORDERS durable-1 --pull --deliver all --ack explicit --durable
nats consumer next ORDERS durable-1 --wait 5s
nats stream add STATUS --subjects "status.>" --storage file --max-msgs-per-subject 1

Durable consumers survive client disconnections and resume from where they left off.

Clustering

nats-server --cluster nats://0.0.0.0:6222 --cluster_name my-cluster --routes nats://node1:6222,nats://node2:6222
# nats.conf (cluster block)
cluster {
  name: "my-cluster"
  listen: "0.0.0.0:6222"
  routes: [
    nats://node1:6222
    nats://node2:6222
  ]
}

Monitoring

nats-server -m 8222                       # enable monitoring port
curl http://localhost:8222/varz            # server variables
curl http://localhost:8222/connz           # connection info
curl http://localhost:8222/routez          # route info
curl http://localhost:8222/subsz           # subscription info
curl http://localhost:8222/jsz             # JetStream info

TLS & Auth

nats-server --tls --tlscert cert.pem --tlskey key.pem --tlscacert ca.pem
nats-server --user admin --pass secret
# nats.conf
tls {
  cert_file: "/etc/nats/cert.pem"
  key_file: "/etc/nats/key.pem"
  ca_file: "/etc/nats/ca.pem"
}
authorization {
  users: [
    { user: "admin", password: "$2a$11$..." }
  ]
  token: "my-token"
}

Go Client Examples

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Publish("subject", []byte("hello"))
sub, _ := nc.Subscribe("subject", func(m *nats.Msg) {
    fmt.Printf("Received: %s\n", string(m.Data))
})
defer sub.Unsubscribe()
msg, _ := nc.Request("service.time", nil, 2*time.Second)
fmt.Printf("Reply: %s\n", string(msg.Data))
js, _ := jetstream.New(nc)
s, _ := js.CreateStream(context.Background(), jetstream.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"orders.>"},
})
kv, _ := js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
    Bucket: "config",
})
kv.Put(context.Background(), "key", []byte("value"))
entry, _ := kv.Get(context.Background(), "key")

CLI 基础

nats --help                               # 通用帮助
nats server --help                        # 服务端子命令
nats pub --help                           # 发布帮助
nats sub --help                           # 订阅帮助
nats req --help                           # 请求帮助
nats account info                        # 账户信息
nats context save local --server nats://localhost:4222  # 保存上下文
nats context select local                 # 切换上下文

服务器配置

nats-server                              # 启动服务器(默认 :4222)
nats-server -p 5222                      # 自定义端口
nats-server -a 0.0.0.0                   # 绑定地址
nats-server -c nats.conf                 # 指定配置文件
nats-server -js                          # 启用 JetStream
nats-server -sd /data/nats               # 存储目录
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node2:6222  # 集群模式
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest -js

发布 / 订阅

nats pub subject.hello "Hello World"     # 发布消息
nats pub subject.data '{"key":"value"}'  # 发布 JSON
nats sub subject.hello                   # 订阅主题
nats sub "subject.>"                     # 通配符订阅
nats sub subject.hello --queue workers   # 队列订阅

队列组

nats sub orders.new --queue order-workers   # 队列组订阅者
nats sub orders.new --queue order-workers   # 另一个成员(负载均衡)
nats pub orders.new "order-123"             # 消息仅投递给一个成员

发布到同一主题的消息在同一个队列组内只投递给一个订阅者,实现负载均衡。

请求 / 回复

nats reply service.time "Received at $(date)"   # 注册回复者
nats req service.time ""                          # 发送请求并等待回复
nats req service.echo "ping" --timeout 5s         # 带超时的请求

JetStream 流

nats stream add ORDERS --subjects "orders.>" --storage file --retention limits --max-msgs 100000 --max-age 72h
nats stream info ORDERS
nats stream list
nats stream purge ORDERS
nats stream delete ORDERS
nats stream edit ORDERS --max-msgs 200000
nats stream add EVENTS --subjects "events.>" --storage memory --retention interest --max-bytes 1GB --replicas 3

JetStream 消费者

nats consumer add ORDERS pull-cons --pull --deliver all --replay instant
nats consumer add ORDERS push-cons --target push.ords --deliver last --ack explicit
nats consumer info ORDERS pull-cons
nats consumer list ORDERS
nats consumer next ORDERS pull-cons          # 拉取下一条消息
nats consumer delete ORDERS pull-cons

JetStream KV 存储

nats kv add config                          # 创建 KV 存储桶
nats kv put config db.host "db.internal"    # 写入键值
nats kv get config db.host                  # 读取值
nats kv del config db.host                  # 删除键
nats kv history config db.host              # 查看修订历史
nats kv status config                       # 存储桶信息
nats kv list                                # 列出所有存储桶

通配符

nats sub "orders.*"                        # 单层通配符
nats sub "orders.>"                        # 多层通配符(匹配所有子级)
nats sub ">"                               # 订阅所有消息
nats sub "orders.*.eu"                     # 匹配 orders.new.eu, orders.update.eu
nats sub "orders.>"                        # 匹配 orders.new, orders.new.eu, ...

通配符规则:

  • `*` 匹配恰好一个层级
  • `>` 匹配一个或多个层级(必须是最后一个 token)

持久订阅 / 持久消费者

nats consumer add ORDERS durable-1 --pull --deliver all --ack explicit --durable
nats consumer next ORDERS durable-1 --wait 5s
nats stream add STATUS --subjects "status.>" --storage file --max-msgs-per-subject 1

持久消费者在客户端断开后仍然存在,重新连接后从上次位置继续消费。

集群

nats-server --cluster nats://0.0.0.0:6222 --cluster_name my-cluster --routes nats://node1:6222,nats://node2:6222
# nats.conf(集群配置块)
cluster {
  name: "my-cluster"
  listen: "0.0.0.0:6222"
  routes: [
    nats://node1:6222
    nats://node2:6222
  ]
}

监控

nats-server -m 8222                       # 启用监控端口
curl http://localhost:8222/varz            # 服务器变量
curl http://localhost:8222/connz           # 连接信息
curl http://localhost:8222/routez          # 路由信息
curl http://localhost:8222/subsz           # 订阅信息
curl http://localhost:8222/jsz             # JetStream 信息

TLS 与认证

nats-server --tls --tlscert cert.pem --tlskey key.pem --tlscacert ca.pem
nats-server --user admin --pass secret
# nats.conf
tls {
  cert_file: "/etc/nats/cert.pem"
  key_file: "/etc/nats/key.pem"
  ca_file: "/etc/nats/ca.pem"
}
authorization {
  users: [
    { user: "admin", password: "$2a$11$..." }
  ]
  token: "my-token"
}

Go 客户端示例

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Publish("subject", []byte("hello"))
sub, _ := nc.Subscribe("subject", func(m *nats.Msg) {
    fmt.Printf("收到消息: %s\n", string(m.Data))
})
defer sub.Unsubscribe()
msg, _ := nc.Request("service.time", nil, 2*time.Second)
fmt.Printf("回复: %s\n", string(msg.Data))
js, _ := jetstream.New(nc)
s, _ := js.CreateStream(context.Background(), jetstream.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"orders.>"},
})
kv, _ := js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
    Bucket: "config",
})
kv.Put(context.Background(), "key", []byte("value"))
entry, _ := kv.Get(context.Background(), "key")