# NSQ has 3 components: nsqd, nsqlookupd, nsqadmin
# nsqd - daemon that receives, buffers, delivers messages
# nsqlookupd - service discovery daemon
# nsqadmin - web UI for monitoring
# typical topology:
# Producer → nsqd → Consumer (via nsqlookupd discovery)
# Multiple nsqd instances register with nsqlookupd
# Consumers query nsqlookupd to find nsqd instances for a topic
nsqd --lookupd-tcp-address=localhost:4160 --broadcast-address=127.0.0.1
nsqd --tcp-address=0.0.0.0:4150 \
--http-address=0.0.0.0:4151 \
--lookupd-tcp-address=localhost:4160 \
--data-path=/var/lib/nsq
nsqd --max-msg-size=1048576 \
--max-rdy-count=2500 \
--max-output-buffer-size=65536 \
--sync-every=100
curl -s http://localhost:4151/stats
curl -s http://localhost:4151/ping
nsqlookupd --tcp-address=0.0.0.0:4160 --http-address=0.0.0.0:4161
curl -s http://localhost:4161/nodes
curl -s http://localhost:4161/topics
curl -s http://localhost:4161/channels?topic=orders
curl -s "http://localhost:4161/lookup?topic=orders"
nsqadmin --lookupd-http-address=localhost:4161
nsqadmin --lookupd-http-address=localhost:4161 \
--nsqd-http-address=localhost:4151 \
--http-address=0.0.0.0:4171
curl -s http://localhost:4171/ping
curl -d 'hello world' 'http://localhost:4151/pub?topic=orders'
curl -X POST 'http://localhost:4151/pub?topic=orders' \
-H 'Content-Type: application/json' \
-d '{"order_id":"123","amount":99.9}'
curl -X POST 'http://localhost:4151/mpub?topic=orders' \
-d $'message1\nmessage2\nmessage3'
curl -s 'http://localhost:4151/topic/create?topic=orders'
curl -s 'http://localhost:4151/topic/delete?topic=orders'
curl -s 'http://localhost:4151/channel/create?topic=orders&channel=worker'
nsq_to_nsq --topic=orders --channel=worker \
--lookupd-http-address=localhost:4161 \
--destination-nsqd-tcp-address=localhost:4150 \
--destination-topic=processed_orders
# HTTP endpoint for single message
curl -s 'http://localhost:4151/msg?topic=orders&channel=worker'
# topics hold messages, each topic can have multiple channels
# each channel receives a copy of every message (fan-out)
curl -s 'http://localhost:4151/stats' # all topics/channels
curl -s 'http://localhost:4151/topic/create?topic=events'
curl -s 'http://localhost:4151/topic/delete?topic=events'
curl -s 'http://localhost:4151/channel/create?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/delete?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/pause?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/unpause?topic=events&channel=archive'
curl -s 'http://localhost:4151/topic/pause?topic=events'
curl -s 'http://localhost:4151/topic/unpause?topic=events'
# NSQ guarantees at-least-once delivery
# messages are requeued if consumer does not FIN within timeout
# default timeout: 60000ms (60s)
# consumer protocol flow:
# RDY 100 # set ready count (prefetch)
# FIN <id> # acknowledge message
# REQ <id> <timeout_ms> # requeue with delay
# TOUCH <id> # extend timeout for this message
# nsqd flags for durability
nsqd --mem-queue-size=0 # all messages go straight to disk
nsqd --sync-every=1 # fsync every message
nsq_to_file --topic=orders --channel=logger \
--lookupd-http-address=localhost:4161 \
--output-dir=/var/log/nsq \
--gzip
nsq_to_file --topic=events --channel=archive \
--nsqd-tcp-address=localhost:4150 \
--output-dir=/tmp/nsq-archive \
--filename-format=2006-01-02_15
nsq_to_http --topic=webhooks --channel=sink \
--lookupd-http-address=localhost:4161 \
--post=http://api.example.com/webhook \
--content-type=application/json
# REQ a message with a delay (requeue back to queue)
# via the TCP protocol:
# REQ <message_id> <timeout_ms>
# defer processing by requeuing with delay
# timeout_ms=0 sends to default requeue queue
# nsqd requeue configuration
nsqd --max-requeue-delay=15m
nsqd --default-requeue-delay=90s
nsqd --max-attempts=5 # after 5 attempts → DP (discounted pipe / dead letter)
# check depth and requeue count
curl -s 'http://localhost:4151/stats?format=json'
nsqd --tls-cert=/etc/nsq/cert.pem \
--tls-key=/etc/nsq/key.pem \
--tls-root-ca-file=/etc/nsq/ca.pem \
--tls-required=tls-verify \
--tls-client-auth-policy=require-verify
nsqd --auth-http-address=http://auth-service:8080/auth
# nsqauthd example response:
# {
# "ttl": 3600,
# "authorizations": [
# {"topic": "orders", "channels": ["*"], "permissions": ["subscribe","publish"]}
# ]
# }
curl -s http://localhost:4151/stats
curl -s http://localhost:4151/stats?format=json
curl -s http://localhost:4151/stats?topic=orders
curl -s http://localhost:4151/ping
curl -s http://localhost:4161/nodes
curl -s http://localhost:4161/topics
# nsq_stat — live stats in terminal
nsq_stat --lookupd-http-address=localhost:4161
# nsq_stat refreshes every 2s by default
nsq_stat --lookupd-http-address=localhost:4161 --interval=5s
docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 \
nsqio/nsq /nsqlookupd
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 \
nsqio/nsq /nsqd --lookupd-tcp-address=host.docker.internal:4160 \
--broadcast-address=host.docker.internal
docker run -d --name nsqadmin -p 4171:4171 \
nsqio/nsq /nsqadmin --lookupd-http-address=host.docker.internal:4161
docker compose up -d
// producer
config := nsq.NewConfig()
p, _ := nsq.NewProducer("localhost:4150", config)
err := p.Publish("orders", []byte(`{"id":"123"}`))
p.Stop()
// consumer
config := nsq.NewConfig()
c, _ := nsq.NewConsumer("orders", "worker", config)
c.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
process(m.Body)
return nil
}))
c.ConnectToNSQD("localhost:4150")
// or: c.ConnectToNSQLookupd("localhost:4161")
# NSQ 有 3 个核心组件:nsqd、nsqlookupd、nsqadmin
# nsqd - 消息守护进程,负责接收、缓冲和投递消息
# nsqlookupd - 服务发现守护进程
# nsqadmin - 监控 Web 界面
# 典型拓扑:
# 生产者 → nsqd → 消费者(通过 nsqlookupd 发现)
# 多个 nsqd 实例向 nsqlookupd 注册
# 消费者查询 nsqlookupd 来查找 topic 对应的 nsqd 实例
nsqd --lookupd-tcp-address=localhost:4160 --broadcast-address=127.0.0.1
nsqd --tcp-address=0.0.0.0:4150 \
--http-address=0.0.0.0:4151 \
--lookupd-tcp-address=localhost:4160 \
--data-path=/var/lib/nsq
nsqd --max-msg-size=1048576 \
--max-rdy-count=2500 \
--max-output-buffer-size=65536 \
--sync-every=100
curl -s http://localhost:4151/stats
curl -s http://localhost:4151/ping
nsqlookupd --tcp-address=0.0.0.0:4160 --http-address=0.0.0.0:4161
curl -s http://localhost:4161/nodes
curl -s http://localhost:4161/topics
curl -s http://localhost:4161/channels?topic=orders
curl -s "http://localhost:4161/lookup?topic=orders"
nsqadmin --lookupd-http-address=localhost:4161
nsqadmin --lookupd-http-address=localhost:4161 \
--nsqd-http-address=localhost:4151 \
--http-address=0.0.0.0:4171
curl -s http://localhost:4171/ping
curl -d 'hello world' 'http://localhost:4151/pub?topic=orders'
curl -X POST 'http://localhost:4151/pub?topic=orders' \
-H 'Content-Type: application/json' \
-d '{"order_id":"123","amount":99.9}'
curl -X POST 'http://localhost:4151/mpub?topic=orders' \
-d $'message1\nmessage2\nmessage3'
curl -s 'http://localhost:4151/topic/create?topic=orders'
curl -s 'http://localhost:4151/topic/delete?topic=orders'
curl -s 'http://localhost:4151/channel/create?topic=orders&channel=worker'
nsq_to_nsq --topic=orders --channel=worker \
--lookupd-http-address=localhost:4161 \
--destination-nsqd-tcp-address=localhost:4150 \
--destination-topic=processed_orders
# HTTP 端点获取单条消息
curl -s 'http://localhost:4151/msg?topic=orders&channel=worker'
# topic 保存消息,每个 topic 可以有多个 channel
# 每个 channel 都会收到所有消息的副本(扇出分发)
curl -s 'http://localhost:4151/stats' # 所有 topic/channel
curl -s 'http://localhost:4151/topic/create?topic=events'
curl -s 'http://localhost:4151/topic/delete?topic=events'
curl -s 'http://localhost:4151/channel/create?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/delete?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/pause?topic=events&channel=archive'
curl -s 'http://localhost:4151/channel/unpause?topic=events&channel=archive'
curl -s 'http://localhost:4151/topic/pause?topic=events'
curl -s 'http://localhost:4151/topic/unpause?topic=events'
# NSQ 保证至少一次投递
# 如果消费者未在超时时间内 FIN,消息会被重新入队
# 默认超时:60000ms(60 秒)
# 消费者协议流程:
# RDY 100 # 设置就绪数量(预取)
# FIN <id> # 确认消息
# REQ <id> <timeout_ms> # 延迟重新入队
# TOUCH <id> # 延长此消息的超时时间
# nsqd 持久化配置
nsqd --mem-queue-size=0 # 所有消息直接写入磁盘
nsqd --sync-every=1 # 每条消息都 fsync
nsq_to_file --topic=orders --channel=logger \
--lookupd-http-address=localhost:4161 \
--output-dir=/var/log/nsq \
--gzip
nsq_to_file --topic=events --channel=archive \
--nsqd-tcp-address=localhost:4150 \
--output-dir=/tmp/nsq-archive \
--filename-format=2006-01-02_15
nsq_to_http --topic=webhooks --channel=sink \
--lookupd-http-address=localhost:4161 \
--post=http://api.example.com/webhook \
--content-type=application/json
# 通过 REQ 延迟重新入队
# 通过 TCP 协议:
# REQ <message_id> <timeout_ms>
# timeout_ms=0 会发送到默认重入队队列
# nsqd 重入队配置
nsqd --max-requeue-delay=15m
nsqd --default-requeue-delay=90s
nsqd --max-attempts=5 # 超过 5 次尝试后进入错误队列
# 查看深度和重入队计数
curl -s 'http://localhost:4151/stats?format=json'
nsqd --tls-cert=/etc/nsq/cert.pem \
--tls-key=/etc/nsq/key.pem \
--tls-root-ca-file=/etc/nsq/ca.pem \
--tls-required=tls-verify \
--tls-client-auth-policy=require-verify
nsqd --auth-http-address=http://auth-service:8080/auth
# nsqauthd 示例响应:
# {
# "ttl": 3600,
# "authorizations": [
# {"topic": "orders", "channels": ["*"], "permissions": ["subscribe","publish"]}
# ]
# }
curl -s http://localhost:4151/stats
curl -s http://localhost:4151/stats?format=json
curl -s http://localhost:4151/stats?topic=orders
curl -s http://localhost:4151/ping
curl -s http://localhost:4161/nodes
curl -s http://localhost:4161/topics
# nsq_stat — 终端实时统计
nsq_stat --lookupd-http-address=localhost:4161
# nsq_stat 默认每 2 秒刷新
nsq_stat --lookupd-http-address=localhost:4161 --interval=5s
docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 \
nsqio/nsq /nsqlookupd
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 \
nsqio/nsq /nsqd --lookupd-tcp-address=host.docker.internal:4160 \
--broadcast-address=host.docker.internal
docker run -d --name nsqadmin -p 4171:4171 \
nsqio/nsq /nsqadmin --lookupd-http-address=host.docker.internal:4161
docker compose up -d
// 生产者
config := nsq.NewConfig()
p, _ := nsq.NewProducer("localhost:4150", config)
err := p.Publish("orders", []byte(`{"id":"123"}`))
p.Stop()
// 消费者
config := nsq.NewConfig()
c, _ := nsq.NewConsumer("orders", "worker", config)
c.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
process(m.Body)
return nil
}))
c.ConnectToNSQD("localhost:4150")
// 或:c.ConnectToNSQLookupd("localhost:4161")