Rabbitmq解析与应用

Rabbitmq

一直在学习和了解Rabbitmq相关的内容,但是之前的项目年代比较久远和其它一些原因,因此在项目没有使用到过Rabbitmq相关的技术。最近,趁着项目要大刀阔斧的重构,经过对项目的实际实际分析与考虑,发现针对业务中的某些场景可以使用Rabbitmq技术,使得项目变得更易于扩展和使用,并且可以是项目的影响可以更加迅速。

Rabbitmq的实现采用AMQP协议的标准。AMQP(Advanced Message Queuing Protocol),即高级消息队列协议,本身属于应用层协议,主要是为了满足消息发送者与消费者实现解耦,消息发送者无需知道消息消费者是否存在或着是谁,只需要将消息发送到特定的地点,而消息消费者也无需知道该条消息来源于什么地方,只需要否则消费该消息做相应的操作。Rabbitmq本身采用erlang语言编写,支持多种客户端,而且本身支持集群部署,具有较高的容错性和可扩展性。

特点

  1. 可靠性:Rabbitmq具有消费者确认、发送者确认机制,备份交换器、死信队列、延迟队列等功能来保证消息的正确发送与消费,Rabbitmq并且具有具有消息(Message)、队列(Queue)、交换器(Exchange)的持久化,从而使得即使Rabbitmq节点出现问题,也能事后进行修复而不丢失数据。
  2. 灵活的路由:Rabbitmq在发送消息队列到消费方时,会先经过交换器(Exchange),交换器用于决定将该消息发送到什么地方。并且针对复杂的场景,可以将多个交换器绑定在一起或使用插件来是想复杂的Exchange。
  3. 消息集群:Rabbitmq本身即支持集群部署,这个会在后边的章节做详细介绍。
  4. 支持多种客户端:如:java、python。目前本人只使用java的客户端RabbitmqTemplate。
  5. 高可用:Rabbit支持镜像队列,从而实现高可用。
  6. 提供网页版的管理界面,便于管理和使用等。

相关概念

  1. * *Connection(连接) **:
    Connection是一个网络的连接,可以是一个TCP连接,也可以是其它相关连接。

  2. * *Channel(信道) **: 信道是多路复用连接中一条双向数据流通道,是一种虚拟的连接。为了避免创建和销毁TCP连接的昂贵开销,所以引入了信道的概念,一条连接可以拥有多条信道。

  3. * *Message(消息) **:

  4. * *Queue(队列) **: 队列是Rabbitmq重要组成部分之一,队列的特性包含很多,包括持久化、排他性、是否自动删除、过期时间、优先级等。之后会在高级特性中详细介绍其特性。

  5. * *Exchange(交换器) **: Exchange是Rabbitmq的重要组成部分之一,根据路由键来决定将消息具体的发送到哪一条或者哪几条队列中。Exchange根据不同类型的分发策略具有4种不同的交换器类型:driect(直接)、topic(主题)、fanout(广播)、header(根据头部),header和direct完全一致,并且性能没有direct好,因此现在已基本不是用header交换器。我们接下来只分析其余3中交换器,并且附有代码例子

    • Direct Exchange:
      消息的路由键(Routing Key)和绑定键Binding Key完全一致,则将消息发送到对应的队列中。如一个消息的路由键为”single.message”,一个队列和交换器的绑定键也为”singleMessage”,则将这条消息发送到绑定键为”single.message”的队列中。
      1
      2
      3
      4
      channel.exchangeDeclare("exchangeName", ExchangeTypes.DIRECT, true, false, null);
      channel.queueDeclare("queueName", true, true, false, null);
      channel.queueBind("queueName", "exchangeName", "simple");
      channel.basicPublish("echangeName", "simple", true, new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), "hello world!".getBytes());
    • Topic Exchange:
      消息的路由键(Routing Key)满足绑定键(Binding Key)的表达式,则将消息发送到对应的队列中。在这个类型中,绑定键(Binding Key)是一个正则表达式,支持两种特殊的通配符“#”和“*”,“#”匹配0个或多个单词,“*”匹配一个单词。如一个消息的路由键为”single.messgae”,
      而绑定键为“*.messgae”,这条消息将会发送到绑定键为“*.message”的队列中,同样消息路由键为“multi.message”也会发送到改队列中。
      1
      2
      3
      4
      channel.exchangeDeclare("exchangeName", ExchangeTypes.TOPIC, true, false, null);
      channel.queueDeclare("queueName", true, true, false, null);
      channel.queueBind("queueName", "exchangeName", "*.message");
      channel.basicPublish("exchangeName", "", true, new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), "hello world!".getBytes());
    • Fanout Exchange:
      Fanout Exchange一般不需要定义消息的路由键,它会将所有的消息发送到该队列器绑定的队列中去。
      1
      2
      3
      4
      channel.exchangeDeclare("exchangeName", ExchangeTypes.FANOUT, true, false, null);
      channel.queueDeclare("queueName", true, true, false, null);
      channel.queueBind("queueName", "exchangeName", "");
      channel.basicPublish("exchangeName", "simple.message", true, new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), "hello world!".getBytes());
  6. * *Routing Key(路由键): **对于大部分的消息发送都应该提供路由键,用于决定该消息需要发送到哪条队列中。由于广播交换器会将消息发布到与之绑定的所有队列中,所以广播队列器可以不需要配置Routing Key,因为无意义。

  7. * *Binding Key(绑定): ** 用于将Queue和Exchange进行关联起来,从而决定需要将该消息发布到哪条队列中。

  8. * *virtual host(虚拟机): ** 表示具有一系列交换器、消息队列和相关对象,具有共同的身份认证和加密环境的独立服务器域,其本身是一个服务器,默认的为“/”。

  9. * *broker(服务器实体): **表示一个消息队列服务器的实体。即拥有自己的队列、交换器、绑定,可以实现发送和接收消息。

  10. * *完整代码实现: ** 由SpringBoot+Rabbitmq实现

    • 队列和交换器的配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    package com.marchen.mapperdemo.config;

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class RabbitConfig {

    /**
    * 声明具有持久性的traceQueue队列
    * @return Queue
    */
    @Bean
    public Queue traceQueue(){
    return QueueBuilder.durable("traceQueue").build();
    }

    /**
    * 声明具有持久性、排他性和当队列中不存在消息时自动删除特性的队列。
    * @return Queue
    */
    @Bean
    public Queue infoQueue(){
    return QueueBuilder.durable("infoQueue").exclusive().autoDelete().build();
    }

    /**
    * 声明具有持久性、排他性的debugQueue队列
    * @return Queue
    */
    @Bean
    public Queue debugQueue(){
    return QueueBuilder.durable("debugQueue").exclusive().build();
    }

    /**
    * 声明具有持久性、排他性的队列,队列名称使用自动生产的队列名称
    * @return Queue
    */
    @Bean
    public Queue warnQueue(){
    return QueueBuilder.durable("warnQueue").build();
    }

    /**
    * 声明具有持久性、排他性的队列,队列名称使用自动生产的队列名称
    * @return Queue
    */
    @Bean
    public Queue errorQueue(){
    return QueueBuilder.durable("errorQueue").build();
    }

    /**
    * 声明具有持久性并且直接分发消息的交换器
    * @return Exchange
    */
    @Bean
    public Exchange directExchange(){
    return ExchangeBuilder.directExchange("directExchange").durable(true).build();
    }

    /**
    * 声明具有持久性并且根据主题分发消息的交换器
    * @return Exchange
    */
    @Bean
    public Exchange topicExchange(){
    return ExchangeBuilder.topicExchange("topicExchange").build();
    }

    /**
    * 声明具有持久性并且直接分发消息的交换器
    * @return Exchange
    */
    @Bean
    public Exchange fanoutExchange(){
    return ExchangeBuilder.fanoutExchange("fanoutExchange").build();
    }

    /**
    * 将debugQueue和直接分发消息的交换器绑定起来,路由键为debugLogger
    * @return Binding
    */
    @Bean
    public Binding debugBinding(){
    return BindingBuilder.bind(debugQueue()).to(directExchange()).with("debugLogger").noargs();
    }

    /**
    * 将debugQueue和根据主题分发消息的交换器绑定起来,路由键为*.logger;当发送的debug.logger、info.logger时,都能分发到warnQueue中
    * @return Binding
    */
    @Bean
    public Binding warnBinding(){
    return BindingBuilder.bind(warnQueue()).to(topicExchange()).with("*.logger").noargs();
    }

    /**
    * 将infoQueue和根据主题分发消息的交换器绑定起来,路由键为*.logger;当发送的debug.logger、info.logger时,都能分发到infoQueue中
    * @return Binding
    */
    @Bean
    public Binding infoBinding(){
    return BindingBuilder.bind(infoQueue()).to(topicExchange()).with("*.logger").noargs();
    }

    /**
    * 将traceQueue和广播分发消息的交换器绑定起来,路由键为空;当发送任何消息时,都能分发到traceQueue中
    * @return Binding
    */
    @Bean
    public Binding traceBinding(){
    return BindingBuilder.bind(traceQueue()).to(fanoutExchange()).with("").noargs();
    }

    /**
    * 将errorQueue和广播分发消息的交换器绑定起来,路由键为空;当发送任何消息时,都能分发到errorQueue中
    * @return Binding
    */
    @Bean
    public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(fanoutExchange()).with("").noargs();
    }
    }
    • 消费者的代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53

    //直接交换器的消费者实现

    package com.marchen.mapperdemo.consumer;

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;

    @RabbitListener(queues={"debugQueue"})
    @Component
    public class DebugConsumer {

    @RabbitHandler
    public void print(String msg){
    System.out.println("this is debug message:"+msg);
    }
    }

    //广播交换器的消费者实现

    package com.marchen.mapperdemo.consumer;

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;

    @RabbitListener(queues = {"errorQueue", "traceQueue"})
    @Component
    public class TraceConsumer {

    @RabbitHandler
    public void print(String message){
    System.out.println("this is trace/error message:"+message);
    }
    }

    //主题模式交换器的消费者实现
    package com.marchen.mapperdemo.consumer;

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;

    @RabbitListener(queues = {"warnQueue", "infoQueue"})
    @Component
    public class WarnConsumer {

    @RabbitHandler
    public void print(String msg){
    System.out.println("this is warn/info message:"+msg);
    }
    }
    • 测试上述案例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    package com.marchen.mapperdemo.action;

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    @RestController
    public class loggerAction {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * 输出结果:
    * this is warn/info message:i am debug message
    */
    @RequestMapping("/sendDebug")
    public void sendDebugMessage(){
    this.rabbitTemplate.convertAndSend("directExchange", "debugLogger", "i am debug message");
    }

    /**
    * 输出结果:
    * this is warn/info message:i am warn message
    * this is warn/info message:i am warn message
    * 输出两次,因为warn.logger满足键绑定的队列的两个(warnQueue、infoQueue)
    */
    @RequestMapping("/sendWarn")
    public void sendWarnMessage(){
    this.rabbitTemplate.convertAndSend("topicExchange", "warn.logger", "i am warn message");
    }

    /**
    * 输出结果:
    * this is warn/info message:i am info message
    * this is warn/info message:i am info message
    * 输出两次,因为info.logger满足键绑定的队列的两个(warnQueue、infoQueue)
    */
    @RequestMapping("/sendInfo")
    public void sendInfoMessage(){
    this.rabbitTemplate.convertAndSend("topicExchange", "info.logger", "i am info message");
    }

    /**
    * 输出结果:
    * this is trace/error message:i am trace message
    * this is trace/error message:i am trace message
    * 输出结果为两次,因为满足条件的队列有两个(traceQueue, errorQueue)
    */
    @RequestMapping("/sendError")
    public void sendErrorMessage(){
    this.rabbitTemplate.convertAndSend("fanoutExchange", "errorLogger", "i am error message");
    }

    /**
    * 输出结果:
    * this is trace/error message:i am trace message
    * this is trace/error message:i am trace message
    * 输出结果为两次,因为满足条件的队列有两个(traceQueue, errorQueue)
    */
    @RequestMapping("/sendTrace")
    public void sendTraceMessage(){
    this.rabbitTemplate.convertAndSend("fanoutExchange", "traceLogger", "i am trace message");
    }
    }

高级特性

  1. * *持久化机制: **持久化分为消息持久化、队列持久化以及交换器的持久化。

    • 交换器持久化:将交换器设置为持久化可以保证服务器重启后交换器的元数据不会丢失;若未设置交换器持久化,虽然不会造成消息体丢失,但是消息不能再发送到该交换器中了。对于经常使用到的交换器应将其设置持久化,具体代码如下:
    1
    2
    3
    4
    5
    6
    /**
    * exchangeName: 交换器名称
    * ExchangeTypes.DIRECT: 代表交换器的类型;direct、topic、fanout
    * true: 代表交换器是否持久化
    */
    channel.exchangeDeclare("exchangeName", ExchangeTypes.DIRECT, true);
    • 队列持久化:队列持久化可以保证队列中的元数据在服务器重启后不会丢失;若未设置队列持久化,即使设置了消息持久化,也有可能造成消息体丢失。就相当于:人都没了、要钱干啥。
    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * queueName: 为队列的名称
    * 第一个true:代表是否队列持久化。
    * 第二个true:排外性, 用于当连接关闭时,是否删除该队列;其二是其它连接创建的的通道是否可以访问该队列中的数据。
    * 第三个false:代表当队列中没有消息时,是否自动删除该队列。
    * 最后一个null: 代表其它的队列相关配置包括“队列的过期时间、队列最大消息长度等,之后会在后边小节中介绍到”
    */
    channel.queueDeclare("queueName", true, true, false, null);
    • 消息持久化:消息持久化可以保证在设置了队列持久化的基础上,如果服务器重启,则消息还会存在;若队列设置了持久化但消息未设置,则会造成“人活着钱没了”。
    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * exchangeName: 交换器名称
    * routingKey: 路由键名称
    * MessageProperties.PERSISTENT_TEXT_PLAIN: 代表消息体的持久化。
    * MessageProperties.PERSISTENT_TEXT_PLAIN本质上是返回了一个BasicProperties实例,该实例将内容类型设置文本格式并且持久化deliverModel设置为2(持久化)返回。
    * msg.getBytes: 代表发送内容的字节码
    */
    channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

    注意: 要提高Rabbitmq的可靠性,需要将上述都设置持久化,但是设置持久化必然会造成性能的影响,导致处理速度变慢。在Rabbitmq的集群部署中,一般会将其中的一个实体设置为持久化、这样当消息在该实体中已同步到磁盘中时,才会给生产者发送响应成功的标--生产者确认

  2. ** 生产者确认:** 针对上述情况,我们考虑下,如果一个消息没有到达队列中,那么会存在什么情况?即使我们设置了各种持久化,但是消息根本没有发布成功,也会造成消息的丢失。为此,Rabbitmq提供了生产者确认机制,当消息体成功到达队列中并且对于设置了持久化的,消息应已同步到磁盘中时,需要告知生产者成功或失败。生产者确认机制主要通过publisher confirm和事务来实现生产者确认。

    • publisher confirm:有三种方式来实现confirm功能:

      • 单条同步确认:即没发布一条消息,等待其生产者确认。
      1
      2
      3
      4
      5
      6
      7
      //首先进行消息发布,然后调用channel.waitForConfirms方法等待生产者确认回馈并且做相应的处理
      channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
      if(channel.waitForConfirms()){
      System.out.println("发送消息成功!");
      }else{
      System.out.println("发送消息失败!");
      }
      • 多条同步确认:多条同步确认相比单条确认大大提高了的处理和相应速率,但是也存在着缺陷,如果这批数据有一笔发送失败,则需要将这批数据重新发送。若一个服务器及其不稳定,则不推荐该方式。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      //首先进行批量确认的选择;然后发送消息,最后对这批消息是否发布成功进行判断。
      channel.confirmSelect();
      for(int i=0; i< 100; i++){
      channel.basicPublish(RabbitDemo.simleExchangeName, RabbitDemo.simpleRoutingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (msg+i).getBytes());
      }
      if(channel.waitForConfirms(10000)){
      System.out.println("发送消息成功!");
      }else{
      System.out.println("发送消息失败!");
      }
      • 异步确认:异步确认通过向信道中添加监听器实现,可以实现单条的消息的确认,也满足多条消息的确认。个人认为考虑性能和服务器稳定的情况,该方式是实现生产者确认的最好方式。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      //添加确认机制,如果消息到队列中,则调用改方法
      channel.addConfirmListener((deliveryTag, multiple) -> {
      System.out.println("消息序号为"+deliveryTag+"发布消息成功!");
      }, (deliveryTag, multiple) -> {
      System.out.println("消息序号为"+deliveryTag+"发布消息成功!");
      });
      //添加返回机制,如果消息到达交换器却到队列失败时,调用;需要设置manadatory为true,否则消息会丢失。
      channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
      System.out.println("交换器为"+exchange+",在发布消息路由键为"+routingKey+"消息体为"+body+"时出现问题,返回码是"+replyCode+"返回信息为"+replyText);
      });
    • 事务实现: Rabbitmq事务实现和Jdbc的事务实现非常相似,甚至比jdbc的事务使用更加简单。下边的代码例子只是对单条的消息事务的控制,如果需要多条一起进行事务控制的话,将”channel.txSelect”放到循环的外层,其余放在内在内部即可,这里就不具体写相关的代码了。

      1
      2
      3
      4
      5
      6
      7
      8
      //首先打开事务;其次发布消息,若未出现异常则提交事务否则回滚。
      try{
      channel.txSelect();
      channel.basicPublish(RabbitDemo.simleExchangeName, RabbitDemo.simpleRoutingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (msg+i).getBytes());
      channel.txCommit();
      }catch (Exception e){
      channel.txRollback();
      }

      注意:生产者的确认保证了消息从生产者到队列这一阶段的可靠性,对于事务和同步确认机制,它会影响业务的处理速度,若不涉及到太过于重要的业务数据或者消息,则可以不需要设置生产者确认机制。

  3. ** 消费者确认: **消费者确认存在两种确认方式:自动确认和手动确认。

    • 自动确认:自动确认认为消费者只要接收到消息,就会将消息从队列中删除。这个会有个明显的缺点就是在消费者收到消息之后可能因为某些原因而造成没有及时处理消息,此时消费者崩溃,则该消息就会永久丢失,这在一些对可靠性性要求高的情况下,是不允许的。代码如下:
    1
    2
    3
    4
    5
    6
    /**
    * queueName:队列名称;
    * autoAck:是否自动确认;
    * consumer:消费者处理对象
    */
    channel.basicConsume("queueName", autoAck=true, consumer);
    • 手动确认:收到消息后需要消费端手动进行确认,手动确认有三种方式:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
       //手动确认需要将第二个参数设置为false。不采用自动确认,然后在consumer的类中进行手动确认。
    channel.basicConsume("queueName", false, new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body,"UTF-8");
    System.out.println("message="+message);
    if(StringUtils.contains(message,":delete")){
    /**
    * 用于否定确认,一次只能确认一条消息。
    * envelope.getDeliveryTag():消息的唯一Id
    * false:是否将该条消息继续放回队列中
    */
    channel.basicReject(envelope.getDeliveryTag(), false);
    }else if(StringUtils.contains(message, ":reject")){
    /**
    * 用于否定确认,一次可以确认一批消息。
    * envelope.getDeliveryTag():消息的唯一Id
    * true: 是否批量确认,此时会确认比envelope.getDeliveryTag()小的所有消息
    * false:是否将该条消息继续放回队列中
    */
    channel.basicNack(envelope.getDeliveryTag(), true, true);
    }else{
    /**
    * 用于肯定确认,一次可以确认一条消息。
    * envelope.getDeliveryTag():消息的唯一Id
    * false:是否将该条消息继续放回队列中
    */
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    });

    注意:消息手动确认除了单条确认也可以批量确认,可以通过设置multi为true来实现,批量确认会将比当前消息投递唯一码小的所有消息都确认;如果multi为false,只会确认当前消息,例如如果为5,6,7,8的4条消息,而设置了multi为false的只会确认为8的消息,这块请注意,它不是最小的哪一个消息。其二:手动确认会比自动确认的性能差点,但是也弥补了自动确认存在的缺陷,对于可靠性要求较高的建议采用手动确认。

  4. * *过期时间: **过期时间可以针对队列和消息设置,如果一个队列设置了过期时间,则这个队列中的消息存在的时间都会小于过期时间,当队列中的消息超过了过期时间,就会删除;针对消息设置,只是针对单条的消息。

    • 队列过期时间:

      1
      2
      3
      4
      //针对队列过期时间、优先级、私信队列都是通过一个HashMap对象封装的参数来设置
      Map<String, Object> params = new HashMap<>();
      params.put("x-message-ttl",1000);
      channel.queueDeclare(RabbitDemo.simleQueueName, true, true, false, params);
    • 消息过期时间:

      1
      2
      //通过实例化BasicProperties的实现类,采用的是建造者模式,设置deliverModel为2(持久化),超时时间为1分钟(时间单位为ms)。
      channel.basicPublish("exchangeName", "routingKey", new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), (msg).getBytes());

    注意:第一种方式是消息如果超过了超时时间,就会从该队列中删除,至于之后是加到死信队列还是不做操作,根据具体的场景来实现;第二种方式是在消息进行投递的时候才会判断是否超过过期时间,如果超过就从该队列删除,后续操作由具体的实现来做。为什么两种方式不一样,前者因为在队列中,消息的过期时间一样,最快过期的位于队列头部,这样只需要扫描头部就能发现过期的消息;而后者因为每个消息的过期时间不一样,扫描全部的队列会造成性能的浪费,所以在投递消息时判断最好。

  5. * *消息路由失败处理: **在消息的投递过程中,可能会发生找不到目的地的情况,此时如果不针对此种情况做处理,则有可能会造成消息丢失。之前我们已经学习了生产者确认,这小节我们学习另外的处理方案:

    • manadatory: manadatory主要针对的是消息在交换器发送到队列的时候没有找到任何一个满足的队列,此时会返回一个return命令给生产者,生产者可以通过给信道channel添加ReturnListener来处理。若未设置mandatory为true时,会直接将该消息删除。

      1
      2
      3
      4
      5
      6
      //给信道添加返回监听
      channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
      System.out.println("交换器为"+exchange+",在发布消息路由键为"+routingKey+"消息体为"+body+"时出现问题,返回码是"+replyCode+"返回信息为"+replyText);
      });
      //这里主要看第三个参数,用于设置mandatory为true还是false。
      channel.basicPublish("exchangeName", "routingKey", true, new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), (msg).getBytes());
    • immediate: immediate针对于当消息发布到队列中时,发现和路由键匹配的所有队列上不存在任何消费者,就会给生产者返回一个return命令。在Rabbitmq3.0之后已经删除了immediate参数,因为该参数会严重影响镜像队列的性能,所以这个就没有写代码了,想了解的可以自己去搜下,有很多这个例子的。

  1. * *备份交换器: ** 备份交换器是为解决上述的需要在生产者代码中添加ReturnListener的处理业务逻辑,造成生产者的代码比较复杂。而备份交换器只需要在声明交换器时设置一个备份的交换器,则设置了mandatory参数时出现了消息不能路由到队列时,就会将消息路由到备份的交换器。具体的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //设置备份交换器的名称
    Map<String, Object> exchangeParams = new HashMap<>();
    exchangeParams.put("alternate-exchange", "alternateExchange");
    //声明普通的交换器,将备份交换器的参数传入
    channel.exchangeDeclare("exchangeName", ExchangeTypes.DIRECT, true, false, exchangeParams);
    //声明备份的交换器
    channel.exchangeDeclare("alternateExchange", ExchangeTypes.FANOUT, true);
    //声明普通具有延迟时间的队列
    Map<String, Object> params = new HashMap<>();
    params.put("x-message-ttl",1000);
    channel.queueDeclare("queueName", true, true, false, params);
    //将普通的交换器和普通队列绑定起来
    channel.queueBind("queueName", "exchangeName", "routingKey");
    //声明备份队列
    channel.queueDeclare("alternateQueue", true, true, false, null);
    //将备份队列和备份交换器绑定起来
    channel.queueBind("alternateQueue", "alternateExchange", "");
    //发布消息
    channel.basicPublish("exchangeName", "queueName", true, new AMQP.BasicProperties().builder().deliveryMode(2).expiration("60000").build(), (msg).getBytes());

    注意:如果备份交换器和mandatory参数一起使用时,mandatory参数则会不起作用。其二是备份交换器不存在绑定的队列或者匹配的队列,对不会抛出异常,只是消息丢失而已。

  2. * *死信队列: ** 死信队列也被称之为死信交换器,即在声明队列的时候会在声明队列的参数中添加一个x-dead-letter-message参数,值为另一个交换器名称。当一条消息在队列中变为死信之后,就会将该消息发送到死信交换器。消息变为死信消息有以下几种情况:消息被消费者手动拒绝;消息设置了过期,并且超时了;队列达到最大长度,不能再向队列中插入消息。

    1
    2
    3
    4
    5
    6
    Map<String, Object> queueParams = new HashMap<>();
    //设置死信交换器的名称
    queueParams.put("x-dead-letter-message", "deadExchange");
    //设置队列的过期时间
    queueParams.put("x-message-ttl", 1000);
    channel.queueDeclare("queueName", true, true, false, queueParams);
  3. * *延迟队列: ** Rabbitmq本身是没有支持延迟队列的参数的,但是我们可以灵活的使用消息的过期时间和死信队列,读取消息的队列为死信交换器对应的队列,这样便可以实现延迟队列的功能。这个就不在这里写具体的代码实现了,到之后的整体的代码实现中会写一个完整的例子。

  4. * *优先级队列: ** Rabbitmq本身支持队列的优先级高低,对于优先级高的消息优先被消费。

    1
    2
    3
    4
    5
    6
    Map<String, Object> queueParams = new HashMap<>();
    queueParams.put("x-dead-letter-message", "deadExchange");
    queueParams.put("x-message-ttl", 1000);
    //设置队列的优先级为10
    queueParams.put("x-max-priority", 10);
    channel.queueDeclare("queueName", true, true, false, queueParams);
  5. * *RPC实现: ** 在实际的开发中我们经常会遇到生产者和消费者双向交互,即生产者需要发送消息给消费者,消费者接受消息回复生产者,此时我们就需要使用远程过程调用(RPC)。RPC提供了强大的远程调用能力并且不会丧失语言的简洁性,生产者用来发布消息,消费者收到消息之后回复生产者。实现方式是主要在生产者端添加一个回调队列(replyQueue),当消费者回复生产者时,会将消息发送到回调队列中,并且为了区别是对那一条消息进行的回复,生产者会向消息中会保存一个唯一属性值correlationId,这样当消费者回复消息的时候,生产者可以根据correlationId来区分这条消息是不是自己发送的。

    1
     
  6. * *完整代码: ** 完整代码涉及了上述所有的高阶特性,并且采用SpringBoot+Rabbitmq的方式实现。如下:

    • 服务端:
    1
     
    • 客户端:
    1
    2


相关命令

  1. 启动: ./sbin/rabbitmq-server
  2. 后台启动: ./sbin/rabbitmq-server -detachhed
  3. 开启管理界:./sbin/rabbitmq-plugins enable rabbitmq_management
  4. 查看状态:./sbin/rabbitmqctl status
  5. 关闭: ./sbin/rabbitmqctl stop
  6. 关闭远程节点:./sbin/rabbitmqctl -n nodeName stop
  7. 开启应用程序:./sbin/rabbitmqctl start_app
  8. 关闭应用程序:./sbin/rabbitmqctl stop_app
  9. 重置所有队列:./sbin/rabbitmqctl reset
  10. 查看声明的队列:./sbin/rabbitctl list_queues
  11. 查看声明的交换器: ./sbin/rabbitmqctl list_exchanges name type durable auto_delete
  12. 查看绑定: ./sbin/rabbitmqcyl list_bindings

适用场景

根据Rabbitmq的特性、我们可以使用Rabbitmq实现以下几种效果:

  • 异步处理:当我们流程处理扭转到参与人的时候,我们需要给参与人发送消息,发送消息的方式可以为短信、邮件。如果当发送完成再返回客户时,就可能会消耗很长时间。我们则可以将发送的信息保存到一个队列中,消费者处理队列中的信息发给具体的参与人。
  • 应用解耦:用最典型的淘宝购物系统来说,订单系统和库存系统的应用解耦,订单系统向Rabbitmq写入订单信息,库存系统从Rabbitmq读取信息,并且处理库存和发货。优点是订单系统与库存系统解耦,即使库存系统出现问题,只要后期修复,还是可以从Rabbitmq中读取到信息进行库存处理。
  • 流量削峰:当某一时间突然拥有大量请求的时候,可能会造成系统的雪崩。此时可以将请求写入到Rabbitmq消息队列中,如果超过队列的最大长度则不进行写入,而业务处理系统可以Rabbitmq读取请求信息,然后根据相应的请求处理业务逻辑。这样可以有效的避免系统在某一个雪崩。
  • 任务分发:就像上述异步处理所述,当需要发送的信息需要通过短信、邮件的方式处理。此时我们可以采用Rabbitmq的主题模式去将信息存储到Rabbitmq队列中,邮件消费者和短信消费者可以分别去做相应的处理。