rabbitmqctl status # broker status
rabbitmqctl list_vhosts # list virtual hosts
rabbitmqctl add_vhost myapp # create vhost
rabbitmqctl delete_vhost myapp # delete vhost
rabbitmqctl list_users # list users
rabbitmqctl add_user admin secret # create user
rabbitmqctl set_permissions -p myapp admin ".*" ".*" ".*" # set permissions
rabbitmqctl set_user_tags admin administrator # set user role
rabbitmqctl delete_user guest # delete user
rabbitmqctl list_queues name messages consumers # list queues
rabbitmqctl list_exchanges name type # list exchanges
rabbitmqctl list_bindings # list bindings
rabbitmqctl close_all_connections --reason "restart" # close connections
rabbitmqctl reset # reset node (delete all data)
rabbitmqadmin declare exchange name=orders type=direct durable=true
rabbitmqadmin declare exchange name=events type=topic durable=true
rabbitmqadmin declare exchange name=logs type=fanout durable=true
rabbitmqadmin declare exchange name=rpc type=headers durable=false
rabbitmqadmin delete exchange name=orders
Exchange types:
rabbitmqadmin declare queue name=order.new durable=true
rabbitmqadmin declare queue name=order.process durable=true arguments='{"x-max-length":10000,"x-message-ttl":3600000}'
rabbitmqadmin declare queue name=temp.queue durable=false auto_delete=true
rabbitmqadmin delete queue name=order.new
rabbitmqadmin purge queue name=order.new
rabbitmqadmin declare binding source=orders destination=order.new routing_key=order.new
rabbitmqadmin declare binding source=events destination=order.process routing_key="order.#"
rabbitmqadmin declare binding source=logs destination=order.new routing_key=""
rabbitmqadmin declare binding source=rpc destination=order.new arguments='{"x-match":"all","format":"pdf"}'
rabbitmqadmin publish exchange=orders routing_key=order.new payload='{"id":1,"item":"book"}'
rabbitmqadmin publish exchange=events routing_key="order.created.eu" payload='{"event":"created"}'
rabbitmqadmin publish exchange=logs routing_key="" payload="log message"
rabbitmq-diagnostics publish_exchange_messages orders 10 # publish test messages
rabbitmqadmin get queue=order.new ackmode=ack_requeue_false # get one message
rabbitmqadmin get queue=order.new count=5 ackmode=ack_requeue_true # peek 5 messages
rabbitmq-consumer --queue order.new --url amqp://guest:guest@localhost:5672/ # CLI consumer
For production consumers, use a client library (Python, Go, Java, etc.) with proper ack handling.
rabbitmqctl list_queues name messages_ready messages_unacknowledged
Ack modes:
rabbitmqadmin get queue=order.new ackmode=ack_requeue_false # manual ack
rabbitmqadmin get queue=order.new ackmode=ack_requeue_true # reject + requeue
direct — orders.new → queue bound with routing_key=orders.new
topic — orders.*.eu → orders.new.eu, orders.update.eu
topic — orders.# → orders.new, orders.new.eu, orders.update.eu.urgent
fanout — (ignore key) → all bound queues receive message
headers — x-match: all → match all header key-value pairs
headers — x-match: any → match any header key-value pair
rabbitmqadmin declare exchange name=dlx.exchange type=direct durable=true
rabbitmqadmin declare queue name=dlq.orders durable=true
rabbitmqadmin declare binding source=dlx.exchange destination=dlq.orders routing_key=dlq.orders
rabbitmqadmin declare queue name=order.new durable=true arguments='{"x-dead-letter-exchange":"dlx.exchange","x-dead-letter-routing-key":"dlq.orders"}'
Messages are dead-lettered when:
rabbitmqadmin declare queue name=short.lived durable=true arguments='{"x-message-ttl":60000}'
rabbitmqadmin declare queue name=expire.queue durable=true arguments='{"x-expires":3600000}'
rabbitmqadmin declare queue name=lazy.queue durable=true arguments='{"x-queue-mode":"lazy"}'
rabbitmqadmin publish exchange=orders routing_key=order.new payload="ttl msg" properties='{"expiration":"5000"}'
rabbitmqctl join_cluster rabbit@node1 # join cluster
rabbitmqctl cluster_status # view cluster status
rabbitmqctl forget_cluster_node rabbit@node3 # remove node
rabbitmqctl stop_app && rabbitmqctl join_cluster rabbit@node1 && rabbitmqctl start_app # full join flow
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues
rabbitmqctl set_policy ha-two "^orders\." '{"ha-mode":"exactly","ha-params":2}' --apply-to queues
curl -u guest:guest http://localhost:15672/api/overview
curl -u guest:guest http://localhost:15672/api/queues
curl -u guest:guest http://localhost:15672/api/exchanges
curl -u guest:guest http://localhost:15672/api/connections
curl -u guest:guest http://localhost:15672/api/nodes
curl -u guest:guest -X POST http://localhost:15672/api/users/guest/password -d '{"password":"newpass"}'
rabbitmq-plugins enable rabbitmq_management # enable management UI on :15672
# rabbitmq.conf
listeners.tcp.default = 5672
management.tcp.port = 15672
default_user = guest
default_pass = guest
default_vhost = /
log.file.level = info
disk_free_limit.absolute = 2GB
vm_memory_high_watermark.relative = 0.6
# advanced.config — consumer timeout
[
{rabbit, [
{consumer_timeout, 600000}
]}
].
rabbitmqctl eval 'application:set_env(rabbit, tcp_listen_options, [{backlog, 4096}]).'
rabbitmqctl eval 'os:setenv("RABBITMQ_IO_THREAD_POOL_SIZE", "128").'
# rabbitmq.conf
queue_master_locator = min-masters
quorum_commands_soft_limit = 256
channel_max = 2047
heartbeat = 60
Key tuning knobs:
rabbitmqctl status # 代理状态
rabbitmqctl list_vhosts # 列出虚拟主机
rabbitmqctl add_vhost myapp # 创建虚拟主机
rabbitmqctl delete_vhost myapp # 删除虚拟主机
rabbitmqctl list_users # 列出用户
rabbitmqctl add_user admin secret # 创建用户
rabbitmqctl set_permissions -p myapp admin ".*" ".*" ".*" # 设置权限
rabbitmqctl set_user_tags admin administrator # 设置用户角色
rabbitmqctl delete_user guest # 删除用户
rabbitmqctl list_queues name messages consumers # 列出队列
rabbitmqctl list_exchanges name type # 列出交换机
rabbitmqctl list_bindings # 列出绑定
rabbitmqctl close_all_connections --reason "restart" # 关闭所有连接
rabbitmqctl reset # 重置节点(删除所有数据)
rabbitmqadmin declare exchange name=orders type=direct durable=true
rabbitmqadmin declare exchange name=events type=topic durable=true
rabbitmqadmin declare exchange name=logs type=fanout durable=true
rabbitmqadmin declare exchange name=rpc type=headers durable=false
rabbitmqadmin delete exchange name=orders
交换机类型:
rabbitmqadmin declare queue name=order.new durable=true
rabbitmqadmin declare queue name=order.process durable=true arguments='{"x-max-length":10000,"x-message-ttl":3600000}'
rabbitmqadmin declare queue name=temp.queue durable=false auto_delete=true
rabbitmqadmin delete queue name=order.new
rabbitmqadmin purge queue name=order.new
rabbitmqadmin declare binding source=orders destination=order.new routing_key=order.new
rabbitmqadmin declare binding source=events destination=order.process routing_key="order.#"
rabbitmqadmin declare binding source=logs destination=order.new routing_key=""
rabbitmqadmin declare binding source=rpc destination=order.new arguments='{"x-match":"all","format":"pdf"}'
rabbitmqadmin publish exchange=orders routing_key=order.new payload='{"id":1,"item":"book"}'
rabbitmqadmin publish exchange=events routing_key="order.created.eu" payload='{"event":"created"}'
rabbitmqadmin publish exchange=logs routing_key="" payload="log message"
rabbitmq-diagnostics publish_exchange_messages orders 10 # 发布测试消息
rabbitmqadmin get queue=order.new ackmode=ack_requeue_false # 获取一条消息
rabbitmqadmin get queue=order.new count=5 ackmode=ack_requeue_true # 预览 5 条消息
rabbitmq-consumer --queue order.new --url amqp://guest:guest@localhost:5672/ # CLI 消费者
生产环境建议使用客户端库(Python、Go、Java 等)并正确处理确认机制。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
确认模式:
rabbitmqadmin get queue=order.new ackmode=ack_requeue_false # 手动确认
rabbitmqadmin get queue=order.new ackmode=ack_requeue_true # 拒绝 + 重新入队
direct — orders.new → 绑定 routing_key=orders.new 的队列
topic — orders.*.eu → 匹配 orders.new.eu, orders.update.eu
topic — orders.# → 匹配 orders.new, orders.new.eu, orders.update.eu.urgent
fanout — (忽略路由键) → 所有绑定队列都收到消息
headers — x-match: all → 匹配所有头部键值对
headers — x-match: any → 匹配任意头部键值对
rabbitmqadmin declare exchange name=dlx.exchange type=direct durable=true
rabbitmqadmin declare queue name=dlq.orders durable=true
rabbitmqadmin declare binding source=dlx.exchange destination=dlq.orders routing_key=dlq.orders
rabbitmqadmin declare queue name=order.new durable=true arguments='{"x-dead-letter-exchange":"dlx.exchange","x-dead-letter-routing-key":"dlq.orders"}'
消息进入死信的条件:
rabbitmqadmin declare queue name=short.lived durable=true arguments='{"x-message-ttl":60000}'
rabbitmqadmin declare queue name=expire.queue durable=true arguments='{"x-expires":3600000}'
rabbitmqadmin declare queue name=lazy.queue durable=true arguments='{"x-queue-mode":"lazy"}'
rabbitmqadmin publish exchange=orders routing_key=order.new payload="ttl msg" properties='{"expiration":"5000"}'
rabbitmqctl join_cluster rabbit@node1 # 加入集群
rabbitmqctl cluster_status # 查看集群状态
rabbitmqctl forget_cluster_node rabbit@node3 # 移除节点
rabbitmqctl stop_app && rabbitmqctl join_cluster rabbit@node1 && rabbitmqctl start_app # 完整加入流程
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues
rabbitmqctl set_policy ha-two "^orders\." '{"ha-mode":"exactly","ha-params":2}' --apply-to queues
curl -u guest:guest http://localhost:15672/api/overview
curl -u guest:guest http://localhost:15672/api/queues
curl -u guest:guest http://localhost:15672/api/exchanges
curl -u guest:guest http://localhost:15672/api/connections
curl -u guest:guest http://localhost:15672/api/nodes
curl -u guest:guest -X POST http://localhost:15672/api/users/guest/password -d '{"password":"newpass"}'
rabbitmq-plugins enable rabbitmq_management # 启用管理界面(端口 :15672)
# rabbitmq.conf
listeners.tcp.default = 5672
management.tcp.port = 15672
default_user = guest
default_pass = guest
default_vhost = /
log.file.level = info
disk_free_limit.absolute = 2GB
vm_memory_high_watermark.relative = 0.6
# advanced.config — 消费者超时
[
{rabbit, [
{consumer_timeout, 600000}
]}
].
rabbitmqctl eval 'application:set_env(rabbit, tcp_listen_options, [{backlog, 4096}]).'
rabbitmqctl eval 'os:setenv("RABBITMQ_IO_THREAD_POOL_SIZE", "128").'
# rabbitmq.conf
queue_master_locator = min-masters
quorum_commands_soft_limit = 256
channel_max = 2047
heartbeat = 60
关键调优项: