RabbitMQ是采用Erlang编程语言实现了高级消息队列协议AMQP (Advanced Message Queuing Protocol)的开源消息代理软件(消息队列中间件

RabbitMq

1. 我们看看在RabbitMQ中的几个主要概念

  1. Producer(生产者) : 消息的生产者,投递方

  2. Consumer(消费者) : 消息的消费者

  3. RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服务节点(单机情况中,就是代表RabbitMQ服务器)

  4. Queue (队列) : 在RabbitMQ中Queue是存储消息数据的唯一形式

  5. Binding (绑定) : RabbitMQ中绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

  6. RoutingKey (路由键) : 消息投递给交换器,通常会指定一个 RoutingKey ,通过这个路由键来明确消息的路由规则

    RoutingKey 通常是生产者和消费者有协商一致的key策略,消费者就可以合法从生产者手中获取数据。这个RoutingKey主要当Exchange交换机模式为设定为direct和topic模式的时候使用,fanout模式不使用RoutingKey

  7. Exchange (交换机) : 生产者将消息发送给交换器(交换机),再由交换器将消息路由导对应的队列中

    交换机四种类型 : fanout,direct,topic,headers

    1. fanout(扇形交换机)

      将发送到该类型交换机的消息(message)路由到所有的与该交换机绑定的队列中,如同一个"扇"状扩散给各个队列

      在这里插入图片描述

      fanout类型的交换机会忽略RoutingKey的存在,将message直接"广播"到绑定的所有队列中

    2. direct(直连交换机)

      根据消息携带的路由键(RoutingKey) 将消息投递到对应的队列中

      在这里插入图片描述

      direct类型的交换机(exchange)是RabbitMQ Broker的默认类型,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。

      交换器会对比路由键和绑定键,如果路由键和绑定键完全相同,则把消息转发到绑定键所对应的队列中。

    3. Topic(主题交换机)

      topic类型交换机在RoutingKey 和 BindKey 匹配规则上更加的灵活. 同样是将消息路由到RoutingKey 和 BindingKey 相匹配的队列中,但是匹配规则有如下的特点 :

      规则1: RoutingKey 是一个使用. 的字符串 例如: “go.log.info” , “java.log.error”

      规则2: BingingKey 也会一个使用 . 分割的字符串, 但是在 BindingKey 中可以使用两种特殊字符 * 和 # ,其中 “*” 用于匹配一个单词,"#“用于匹配多规格单词(零个或者多个单词)

      在这里插入图片描述

      RoutingKey和BindingKey 是一种"模糊匹配” ,那么一个消息Message可能 会被发送到一个或者多个队列中

      无法匹配的消息将会被丢弃或者返回者生产者

    4. Headers (头交换机)

      Headers类型的交换机使用不是很多

      关于Headers Exchange 摘取一段比较容易理解的解释 :

      有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

      我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

      头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。

2. RabbitMQ工作流程

消息生成流程

  1. 消息生产者与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel

  2. 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)

  3. 声明一个队列并设置其相关属性(排他性,持久化自动删除等)

  4. 通过路由键将交换机和队列绑定起来

  5. 消息生产者发送消息给 RabbitMQ Broker , 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列

  6. 查找匹配成功,则将消息存储到队列中

  7. 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者

  8. 关闭信道Channel , 关闭连接

消息消费流程

  1. 消息消费者与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel

  2. 消费者向RabbitMQ Broker 请求消费者相应队列中的消息

  3. 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息

  4. 消费者确认(ack) 接收消息, RabbitMQ Broker 消除已经确认的消息

  5. 关闭信道Channel ,关闭连接

3. Golang使用RabbitMQ

rabbitmqctl list_queues可以检查队列

rabbitmqctl list_exchanges列出所有的交换机

rabbitmqctl list_bindings列出所有绑定

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
hello    1

使用rabbitmq/amqp091-go客户端库

1
go get github.com/rabbitmq/amqp091-go

amqp客户端中,当我们将队列名称作为空字符串提供时,我们会使用生成的名称创建一个非持久队列:

1
2
3
4
5
6
7
8
q, err := ch.QueueDeclare(
  "" ,     // 名称
  false , // 持久的
  false , // 未使用时删除
  true ,   // 独占
  false , // 无等待
  nil ,    // 参数
)

当方法返回时,队列实例包含一个由 RabbitMQ 生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

当声明它的连接关闭时,队列将被删除,因为它被声明为独占。

我们已经创建了一个扇出交换和一个队列。现在我们需要告诉交换机向我们的队列发送消息。交换和队列之间的这种关系称为绑定

1
2
3
4
5
6
7
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil,
)