RabbitMQ基础

介绍

RabbitMQ是一个在AMQP基础上完成的,可复用的企业级消息系统。

特性:

  • 可靠 持久化、生产者确认、消费端确认、镜像等机制保证可靠性。
  • 灵活 丰富的内置交换器、交换器捆绑、插件机制实现自己的交换器。
  • 可扩展 集群弹性伸缩。
  • 高可用 镜像机制。
  • 多协议支持 AMQP、STOMP、MQTT等协议支持。
  • 管理界面 提供了一个易用的用户界面,可以监控和管理消息、集群中的节点等。
  • 插件机制 提供了许多插件,也可以通过自定义插件进行扩展。

基本概念

Queue

队列,是RabbitMQ的内部对象,用于存储消息。

Exchange

交换器。 暂时可以理解成生产者将消息投递到队列中,实际上这个在RabbitMQ中不会发生。 真实情况是,生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中。 如果路由不到,或许会返回给生产者,或许直接丢弃。 RabbitMQ常用的交换器类型有:

交换器类型 说明
fanout 发布订阅模式。
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct 路由模式。
会把消息路由到哪些BindingKey和RoutingKey完全匹配的队列中。
topic 广播模式。
会把消息路由到哪些BindingKey和RoutingKey模糊匹配的队列中。
headers 不依赖于路由键的匹配规则来路由消息,而是根据发送消息内容中的headers属性进行匹配。
header类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

RoutingKey

路由键。 生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。 在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。

Binding

绑定。 RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。

RoutingKey与BindingKey的关系

  • 在direct类型的交换器中,RoutingKey与BindingKey是同一个东西。
  • 在topic交换器类型下,RoutingKey与BindingKey之间需要做模糊匹配,两者并不是相同的。

消费模式

RabbitMQ的消费模式分为两种:推模式和拉模式。 推模式调用Basic.Consume进行消费。 拉模式调用Basic.Get进行消费。

消息模式

消息模式 图示 介绍
简单消息模式(Hello World) Hello World 1. 一个生产者
2. 一个消费者
3. 绑定到一个队列上
工作队列模式(Work queue) Work queue 1. 一个生产者
2. 多个消费者
3. 绑定到一个队列中
4. 可以根据消费者的能力动态的调整任务分配
发布订阅模式(Publish/Subscribe) Publish/Subscribe 1. 一个生产者
2. 多个消费者
3. 每个消费者绑定不同的队列
4. 生产者通过交换机跟队列连接
5. 生产者生产的消息,消费者同一时间都会收到相同内容
注意:发布订阅模式,消息先发到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息会丢失。
路由模式(Routing) Routing 1. Publish/Subscribe模式的升级版
2. 与Publish/Subscribe不同的是,消费端可以根据key的不同,接收到的消息内容不同,而Publish/Subscribe消费端接收到的消息是相同的。
广播模式(Topics) Topics 1. Routing模式的升级版。
2. 与Routing模式不同的是,可以通过key的通配符进行消息匹配
RPC模式 RPC 不常用

持久化

持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。 RabbitMQ的持久化分三个部分:交换器的持久化、队列的持久化、消息的持久化。 队列的持久化能保证其本身的元数据不会因为异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。 要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。 需要同时设置队列的持久化和消息的持久化,单单设置消息持久化而不设置队列的持久化毫无意义。

存储机制

磁盘写入机制

不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。 持久化的消息在大道队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。 这两种类型的消息的落盘处理都在RabbitMQ的“持久层”中完成。

持久层

持久层是一个逻辑上的概念,实际包含两个部分:

  • 队列索引(rabbit_queue_index)
  • 消息存储(rabbit_msg_store)

队列索引负责维护队列中落盘消息的消息,包括消息的存储节点、是否已被交付给消费者、是否已被消费者ack等。 每个队列都有与之对应的一个队列索引。

消息存储以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。 消息存储可以分为:

  • msg_store_persistent:负责持久化消息的持久化,重启后消息不会丢失。
  • msg_store_transient:负责非持久化消息的持久化,重启后消息会丢失。

消息存储位置

消息(消息体、属性、headers)可以直接存储在队列索引中,也可以被保存在消息存储中。 最佳的配备是较小的消息存储在队列索引中,较大的消息存储在消息存储中。这个消息大小界定可以通过queue_index_embed_msgs_bellow来配置,默认大小为4096,单位是B。

消息删除机制

消息的删除只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。 执行删除操作时,并不立即对在文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。 当一个文件中都是垃圾数据时可以将这个文件删除。 当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小可和所有文件(至少有3个文件存在的情况下)的数据大小比值超过设置的阈值GARBAGE_FRACTION(默认值为0.5)时才会触发垃圾回收将两个文件合并。 执行合并的两个文件一定是逻辑上相邻的两个文件。

队列的结构

队列由以下两部分组成:

  • rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。
  • backing_queue:是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

惰性队列

RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。 惰性队列会尽可能地将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。 当消费者由于各种各样的原因(比如消费者下线、宕机,或者由于维护而关闭等)致使长时间内不能消费消息而造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能地存储在内存中,这样可以更加快速地将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存测消耗,但是会增加I/O的使用。 注意:如果惰性队列中存储的是非持久化的消息内存的使用率会一直很稳定,但是重启之后消息一样会丢失。

惰性队列和普通队列相比,只有很小的内存开销。

RabbitMQ中的流控

RabbitMQ中的流控概念

RabbitMQ中流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。

RabbitMQ中的流控类型

流控类型 流控机制 说明
Global Flow Control 内存和磁盘告警 RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。
Per-Connection Flow Control 单个Connection的流控 从2.8.0版本开始,RabbitMQ还引入了流控(Flow Control)机制来确保稳定性。

RabbitMQ的流控原理

Erlang进程之间并不共享内存(binary类型的除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱(mailbox)。 默认情况下,Erlang并没有对进程邮箱的大小进行限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。 在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度角度时,如果没有流控,很快会使内部进程邮箱的大小达到内存阈值。

RabbitMQ使用了一种基于信用证算法(credit-based algorithm)的流控机制来限制发送消息的速率以解决前面所提出的问题。 它通过监控各个进程的进程邮箱,当某个进程负载过高而来不及处理消息时,这个进程的邮箱就会开始堆积消息。 当堆积到一定量时,就会阻塞而不接收上游的新消息。

流控机制不只是作用于Connection,同样作用于信道(Channel)和队列。

镜像队列

镜像队列(Mirror Queue)是一种机制,可以将队列镜像到集群中的其它Broke节点上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

RabbitMQ的镜像队列同时支持事务机制和生产者确认机制(publisher confirm)。 在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到Tx.Commit-OK的消息。 在生产者确认机制(publisher confirm)中,生产者进行当前消息确认的前提是该消息被全部接收了。

队列/消息

  • TTL 是 Time To Live 的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间设置,在消息发送时指定
  • RabbitMQ支持队列的过期时间设置,从消息入队时开始计算,只要超过队列的超时时间配置,消息自动清除

消费端限流

假设一个场景,首先,我们RabbitMQ服务器上有上万条未处理的消息,随便打开一个消费端,就会有巨量消息推送过来,压垮客户端。 RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动签收消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行新消息的消费。 在自动签收消息的情况下 不生效。

Return Listener

Return Listener 用于处理一些不可路由的消息。 我们的生产者,通过指定一个Exchange和Routing key,把消息送达到某个消息队列,然后我们的消费者监听队列,进行消费操作。 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的Routing key路由不到,此时我们需要监听这种不可达消息,就是用Return Listener。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//生产者
//省略 创建连接和channel
String exchange = "test.return.exchange";
String routingKey = "return.routing-key";
String message = "hello rabbitmq";

//声明交换机 不绑定队列 触发return 回调
channel.exchangeDeclare(exchange, "topic");

//添加 return 监听器
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyTest, String exchange, String routingKey,
AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
System.err.println("replyCode: " + replyCode);
System.err.println("replyTest: " + replyTest);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("basicProperties: " + basicProperties);
System.err.println("body: " + new String(body, "UTF-8"));
}
});
//第三个参数 是否打开 return 机制。如果为true,则监听器会收到路由不可达的消息,然后进行处理,如果为false,Broker会自动删除消息。
channel.basicPublish(exchange, routingKey, true,null, message.getBytes());
}

死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。 当消息在一个队列中变成死信之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。 消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正常消费而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。 DLX配合TTL使用还可以实现延迟队列的功能。

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

优先级队列

具有高优先级的队列具有高的优先权,优先权高的消息具备优先被消费的特权。 可以通过设置队列的x-max-priority参数来实现。 优先级最低是0,最高位队列设置的最大优先级。

常见问题

是否设置持久化?

可以将所有的消息都设置为持久化,但是这样会严重影响RAbbitMQ的性能。 写入磁盘的速度比写入内存的速度慢得不是一点点。 对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。 在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

如何实现消费端限流

在非自动确认的模式下,可以采用限流模式,rabbitmq提供了服务质量保障qos机制来控制一次消费消息数量。

1
2
3
4
    ......
Channel channel = conn.createChannel();
//channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);
channel.basicQos(0, 2, false);

如何确保RabbitMQ中消息不丢失?

将交换器、队列、消息都设置了持久化之后,并不能百分之百保证数据不丢失。 ### 丢数据场景一 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。 这种情况很好解决,将autoAck参数设置为false,并进行手动确认。 ### 丢数据场景二 在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。 RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。 (1)针对这个问题,可以引入RabbitMQ的镜像队列机制,相当于配置了副本,如果主节点在此特殊时间内挂掉,可以自动切换到从节点,这样有效地保证了高可用,除非整个集群都挂掉。 在实际生产环境中关键业务队列一般都会设置镜像队列。 (2)还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至RabbitMQ中,前提还要保证在调用channel.basicPublish方法的时候交换器能够将消息正确路由到相应的队列之中。

如何确保生产者的消息正确的到达服务器?

RabbitMQ针对这个问题,提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送方确认机制实现

事务机制

RabbitMQ客户端中与事务机制相关的方法有三个:

  • channel.txSelect:将当前信道设置成事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

采用事务机制会严重降低RabbitMQ的消息吞吐量。

发送方确认机制(publisher confirm)

生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者直销消息已经正确到达目的地了。 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。 引入发送方确认机制可以弥补事务机制的缺陷,提高整体的吞吐量。

注意

  1. 事务机制和发送方确认机制两者是互斥的,不能共存。
  2. 事务机制和发送方确认机制确保的是消息能够正确地发送至RabbitMQ。这里的“发送至RabbitMQ”的含义是指消息被正确地发送至RabbitMQ的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。
  3. 事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制的最大好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。

为什么发送方确认机制比事务机制吞吐率要高

发送方确认模式是每发送一条消息就调用channel.waitForConfirm方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。 事务机制和它一样,发送消息之后等待服务端确认,之后再发送消息。两者的存储确认原理相同,尤其对于持久化的消息来说,两者都需要等待消息确认落盘之后才会返回(调用Linux内核的fsync方法)。 在同步等待的方式下,发送方确认机制发送一条消息需要通信交互的命令是2条:Basic.Publish和Basic.Ack,事务机制是3条:Basic.Publish、Tx.Commit/.Commit-OK(或者Tx.Rollback/.Rollback-Ok),事务机制多了一个命令帧报文的交互,所以QPS会略微下降。

消费端推拉模式的选择?

如果只想从队列获取单条消息而不是持续订阅,建议使用Basic.Get进行消费(拉模式)。 如果要实现高吞吐量,消费者应使用Basic.Consume进行消费(推模式)。

哪些情况下RabbitMQ的消息顺序会被打破?

场景一

生产者使用了事务机制,在发送消息之后遇到了异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么这个消息在生产者这个源头就出现了错序。

场景二

启用生产者确认时,如果发生超时、中断,又或者是收到了RabbitMQ的Basic.Nack命令时,需要补偿发送,结果与事务机制一样会错序。

场景三

如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。

场景四

消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。

场景五

一个队列前后顺序分别有msg1、msg2、msg3、msg4这4个消息,同时有ConsumerA和ConsumerB这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA中的消息为msg1何msg3,ConsumerB中的消息为msg2和msg4。ConsumerA收到消息msg1之后并不想处理而调用了Basic.Nack/.Reject将消息拒绝,与此同时将requeue设置为true,这样这条消息就可以重新存入队列中。消息msg1之后被发送到了ConsumerB中,此时ConsumerB已经消费了msg2、msg4,之后再消费msg1,这样消息顺序性也就错乱了。

如何确保消费者得到的消息和发送者发布的消息的顺序是一致的?

在消息体内添加全局有序标识(类似于Sequence ID)来实现。

为什么应该弃用QueueingConsumer

QueueingConsumer在客户端3.x版本中用得如火如荼,但是在4.x版本开始就被标记为@Deprecated。 它存在内存溢出缺陷,例如: 由于某些原因,队列之中堆积了比较多的消息,就可能导致消费客户端内存溢出假死,于是发生恶行循环,队列消息不断堆积而得不到消化。 这个内存溢出的问题可以使用Basic.Qos来得到有效解决,Basic.Qos可以限制某个消费者所保持未确认消息的数量,也就是间接地限制了QueueingConsumer中的LinkedBlockingQueue的大小。 注意一定要在调用Basic.Comsume之前调用Basic.Qos才能生效。 为了不必要的麻烦,建议在消费的时候尽量使用继承DefaultConsumer的方式。

RabbitMQ提供了怎样的消息传输保障?

一般消息中间件的消息传输保障分为三个层级:

消息传输保障 说明 RabbitMQ支持情况
最多一次 消息可能会丢失,但绝不会重复传输 支持
最少一次 消息绝不会丢失,但可能会重复传输 支持
恰好一次 每条消息肯定会被传输一次且仅传输一次 不支持

重复消费是怎样造成的?

场景一

消费者在消费完一条消息之后向RabbitMQ发送确认(Basic.Ack)命令,此时由于网络断开或其它原因造成RabbitMQ没有收到这个确认命令,那么RabbitMQ不会将此条消息标记删除。 在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。

场景二

生产者在使用发送者确认机制的时候,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费。

如何保障“恰好一次”,避免重复消费?

RabbitMQ没有去重机制来保证“恰好一次”,不仅是RabbitMQ,目前大多数主流的消息中间件都没有消息去重机制,也不保障“恰好一次”。 去重机制一般是在业务客户端实现,比如引入GUID的概念,针对GUID,如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。 建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助Redis等其它产品进行去重处理。

如何提升RabbitMQ的性能?

  1. 开启Erlang语言的HiPE功能,这样保守估计可以提高30%~40%的性能,不过在较旧版本的Erlang中,这个功能不太稳定,建议使用较新版本的Erlang,版本至少是18.0。
  2. 多个rabbit_amqqueue_process替换单个rabbit_amqqueue_process,这样可以充分利用rabbit_reader或者rabbit_channel进程中被流控的性能。

如何进行消息追踪

在RabbitMQ中可以使用Firehose功能来实现消息追踪,Firehose可以记录每一次发送或者消费消息的记录,方便RabbitMQ的使用者进行调试、排错等。

消费端的手动ACK和NACK

消费端进行消费时,如果业务出现异常NACK消息,经历三到四次重试后依然异常,手动ACK,然后进行补偿。

消费端重回队列

将没有处理成功的消息重新投递到Queue的队尾中。 ### 生产端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//生产者
//省略 创建连接和channel

String exchange = "test.ack.exchange";
String routingKey = "ack.routing-key";

for (int i = 0; i < 5; i++) {
Map<String, Object> headers = new HashMap<>();
headers.put("index", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2) //持久化
.headers(headers)
.build();

String message = "hello rabbitmq " + i;
channel.basicPublish(exchange, routingKey, properties, message.getBytes());
}

消费端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//消费者
//省略 创建连接和channel
String queueName = "ack-queue";
String exchange = "test.ack.exchange";
String routingKey = "ack.routing-key";
//声明交换机
channel.exchangeDeclare(exchange, "topic");
//声明队列
channel.queueDeclare(queueName, false, false, false, null);
//队列、交换机、routing key 三者绑定
channel.queueBind(queueName, exchange, routingKey);

//必须手工签收 关闭autoACK
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.err.println(message);
if ((Integer) properties.getHeaders().get("index") == 0) {
//第三个参数 是否重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
}else{
//第二个参数 是否批量签收
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
});

权限问题报错

问题描述

1
2
3
4
5
6
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - vhost sleuth1 not found, class-id=10, method-id=40)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.1.2.jar:5.1.2]

解决办法

检查权限。 一般是用户没有权限访问。

运维

安装

CentOS7安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# install erlang
$ cd /usr/local
$ wget -c https://github.com/rabbitmq/erlang-rpm/releases/download/v22.2.5/erlang-22.2.5-1.el7.x86_64.rpm
$ yum -y localinstall erlang-22.2.5-1.el7.x86_64.rpm

# install rabbitmq
$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm
$ rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
$ yum -y localinstall rabbitmq-server-3.8.2-1.el7.noarch.rpm
$ vim /etc/rabbitmq/rabbitmq-env.conf
##### 添加如下内容 #####
NODENAME=rabbit@localhost
##### 内容结束 #####
$ chkconfig rabbitmq-server on
$ systemctl start rabbitmq-server
$ rabbitmq-plugins enable rabbitmq_management
$ rabbitmqctl add_user admin admin
$ rabbitmqctl set_user_tags admin administrator
$ rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
$ rabbitmqctl status

MacOS安装

1
2
3
4
5
$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq
$ brew services stop rabbitmq
$ curl http://localhost:15672
字段 说明
用户名 guest
密码 guest

Docker(rabbitmq)

rabbitmq

1
2
3
4
$ docker pull rabbitmq:3-management
$ docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq:3-management
# 查看rabbitmq运行状况
$ docker logs some-rabbit

标题 说明
URL http://localhost:15672
用户名 guest
密码 guest

Docker(rabbitmq:management)

1
2
3
4
5
6
7
$ docker pull rabbitmq:management
# 创建并启动
$ docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 rabbitmq:management
# 启动同时设置用户和密码
$ docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
# 查看rabbitmq运行状况
$ docker logs rabbit
标题 说明
URL http://localhost:15672
用户名 guest
密码 guest

获取服务器状态信息

1
$ rabbitmqctl status

管理服务

1
2
3
4
5
6
7
8
# 启动方法1
$ service rabbitmq-server restart

# 启动方法2
$ rabbitmq-server –detached

# 停止
$ rabbitmqctl stop

管理队列

查看所有队列信息

1
$ rabbitmqctl list_queues

清除所有队列

1
$ rabbitmqctl reset

管理应用

关闭应用

1
$ rabbitmqctl stop_app

启动应用

1
$ rabbitmqctl start_app

管理用户

查看用户和角色

1
$ sudo rabbitmqctl list_users

新建用户

1
$ rabbitmqctl add_user xxxpwd

删除用户

1
$ rabbitmqctl delete_user xxx

改密码

1
$ rabbimqctlchange_password {username} {newpassword}

设置角色

1
2
$ rabbitmqctlset_user_tags {username} {tag ...}
# Tag可以为 administrator,monitoring, management

创建管理员用户

1
2
$ sudo rabbitmqctl add_user  user_admin  passwd_admin
$ sudo rabbitmqctl set_user_tags user_admin administrator

创建RabbitMQ监控用户

1
2
$ sudo rabbitmqctl add_user  user_monitoring  passwd_monitor
$ sudo rabbitmqctl set_user_tags user_monitoring monitoring

创建某个项目的专用用户,只能访问项目自己的virtual hosts

1
2
$ sudo rabbitmqctl  add_user  user_proj  passwd_proj
$ sudo rabbitmqctl set_user_tags user_proj management

插件管理

开启某个插件

1
2
$ rabbitmq-pluginsenable xxx
# 重启生效

关闭某个插件

1
2
$ rabbitmq-pluginsdisab xxx
# 重启生效

管理虚拟主机

新建虚拟主机

1
$ rabbitmqctladd_vhost  xxx

删除虚拟主机

1
$ rabbitmqctl  delete_vhost xxx

配置集群(未完待续)

环境说明

hostname IP CPU 内存 角色
node4 10.10.1.21 4核 8G HAProxy
Keepalived
node5 10.10.0.138 4核 8G HAProxy
Keepalived
node6 10.10.0.139 4核 8G RabbitMQ
node7 10.10.0.140 4核 8G RabbitMQ
node8 10.10.0.143 4核 8G RabbitMQ

操作系统统一为:CentOS7.4

安装步骤

安装RabbitMQ(node6、node7、node8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
### 安装Erlang
$ cd /usr/local
$ yum install -y gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git
$ wget http://erlang.org/download/otp_src_21.2.tar.gz
$ tar zxvf otp_src_21.2.tar.gz
$ cd otp_src_21.2
$ ./otp_build autoconf
$ ./configure && make && sudo make install
# 验证Erlang是否安装成功
$ erl

### 安装RabbitMQ
$ cd /usr/local
$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.10/rabbitmq-server-3.7.10-1.el7.noarch.rpm
$ rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
$ yum install rabbitmq-server-3.7.10-1.el7.noarch.rpm

安装HAPRoxy(node4、node5)

安装Keepalived(node4、node5)

监控(未完待续)

分布式部署(未完待续)

RabbitMQ可以通过3种方式实现分布式部署:集群、Federation和Shovel。 这3种方式不是互斥的,可以根据需要选择其中的一种或者以几种方式组合来达到分布式部署的目的。 Federation和Shovel可以为RabbitMQ的分布式部署提供更高的灵活性,但同时也提高了部署的复杂性。

附录

用户角色

角色 说明
none 不能访问 management plugin
management - 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中的queues, exchanges 和 bindings
- 查看和关闭自己的channels 和 connections
- 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker - management可以做的任何事
- 查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring - management可以做的任何事
- 列出所有virtual hosts,包括他们不能登录的virtual hosts
- 查看其他用户的connections和channels
- 查看节点级别的数据如clustering和memory使用情况
- 查看真正的关于所有virtual hosts的全局的统计信息
administrator - policymaker和monitoring可以做的任何事
- 创建和删除virtual hosts
- 查看、创建和删除users
- 查看创建和删除permissions
- 关闭其他用户的connections

参考

坚持原创技术分享,您的支持将鼓励我继续创作!
0%