RabbitMq-死信队列介绍和用法
死信和死信队列的概念
什么是死信?简单来说就是无法被消费和处理的消息。一般生产者将消息投递到broker或者queue,消费者直接从中取出消息进行消费。但有时因为某些原因导致消息不能被消费,导致消息积压在队列中,这样的消息如果没有后续的处理就会变成死信,那么专门存放死信的队列就是死信队列。
什么是死信交换机?
那么什么是死信交换机呢?死信交换机是指专门将死信路由到死信队列的交换机。
产生死信的原因
- 消息超过TTL,即消息过期
- 消息被nack或reject,且不予重新入队
- 队列达到最大长度
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
如图,一个消息被消费者拒绝了,变成了死信:\

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

TTL
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间

死信队列实战和应用
死信队列的应用并不难,无非就是多定义了一个交换机、routingKey和队列罢了。在声明普通队列时传入Map参数,往Map中put死信队列名称、死信routingKey、消息TTL等参数即可完成死信自动投递到死信队列的流程。通过如下代码即可绑定普通队列和死信交换机了,而且还能设置routingKey和队列长度等参数,无需像传统的那样通过channel绑定。
1 2 3 4 5
| Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10000); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); routingKey arguments.put("x-dead-letter-routing-key", "lisi"); arguments.put("x-max-length", 10);
|
生产者Producer:
public class Producer { // 普通交换机名称 public static final String NORMAL_EXCHANGE = “normal_exchange”;
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void main(String[] args) throws IOException { Channel channel = RabbitMQUtils.getChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build();
for (int i = 0;i < 10;i++) { String message = i + "info"; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); } }
|
普通队列消费者C1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_QUEUE = "normal_queue"; public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10000); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "lisi"); arguments.put("x-max-length", 10);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("consumer01等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); if (msg.equals("info5")) { System.out.println("consumer01接收的消息:" + new String(message.getBody())); System.out.println(msg + ":此消息是被拒绝的"); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("consumer01接收的消息:" + new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } };
CancelCallback cancelCallback = consumerTag -> { System.out.println("C1取消消息"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); } }
|
死信队列消费者C2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("consumer02等待接收消息"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("consumer02接收的消息:" + new String(message.getBody())); };
CancelCallback cancelCallback = consumerTag -> { System.out.println("C2取消消息"); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } }
|
依次启动生产者,和两个消费者,并停掉普通队列的消费者,我们发现生产者发送的消息被死信队列消费者C2给接收了。
在上面的代码中,我在普通队列中设置了消息的TTL为5s,但是我又在生产者设置发送的消息TTL为10s,那么RabbitMQ会以哪个为准呢?其实RabbitMQ会以较短的TTL为准
我的智能BI项目中运用死信交换机
声明交换机、队列和routingKey的配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| package com.miku.config;
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
import static com.miku.constant.MessageConstant.*;
@Configuration public class RabbitMqConfig {
@Bean("BiExchange") public DirectExchange Biexchange(){ return new DirectExchange(BI_EXCHANGE, true, false); }
@Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } @Bean("BiChartQueue") public Queue BiQueue(){ Map<String,Object> map=new HashMap<>(3); map.put("x-message-ttl",20000); map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(BI_QUEUE).withArguments(map).build(); }
@Bean("BiToMdQueue") public Queue BiToMdQUeue(){ Map<String,Object> map=new HashMap<>(3); map.put("x-message-ttl",20000); map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(BI_TO_MD_QUEUE).withArguments(map).build(); }
@Bean("deadLetterQueue") public Queue deadLetterQueue(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean public Binding binding(@Qualifier("BiChartQueue") Queue queue, @Qualifier("BiExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(BI_ROUTING_KEY); }
@Bean public Binding biToMdBinding(@Qualifier("BiToMdQueue") Queue queue, @Qualifier("BiExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(BI_TO_MD_ROUTING_KEY); }
@Bean public Binding deadQueueBingingDeadExchange(@Qualifier("deadLetterQueue") Queue deadLetterQueueu, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchage){ return BindingBuilder.bind(deadLetterQueueu).to(deadLetterExchage).with(DEAD_LETTER_ROUTING_KEY); }
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
|
普通消费者(负责异步生成图表信息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @SneakyThrows @RabbitListener(queues = BI_QUEUE) public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliverTag){ log.info("接收消息是:{}",message); if (StringUtils.isNotBlank(message)){ channel.basicNack(deliverTag,false,false); log.info("消息为空拒绝接收,此消息正在被转发到死信队列中"); } Long chartId=Long.parseLong(message); Chart chart = chartService.getById(chartId); if (chart==null){ channel.basicNack(deliverTag,false,false); log.info("chartId为:{}的chart不存在",chartId); throw new BusinessException(ErrorCode.PARAMS_ERROR,"chartId为:"+chartId+"的chart不存在"); } String goal = chart.getGoal(); String chartType = chart.getChartType(); String csvData = chart.getChartData();
StringBuilder userInput = new StringBuilder(); userInput.append("分析需求:").append("\n"); String userGoal = goal; if (StringUtils.isNotBlank(chartType)) { userGoal += ",请使用:" + chartType; } userInput.append(userGoal).append("\n"); userInput.append("原始数据:").append("\n"); userInput.append(csvData).append("\n"); boolean update = chartService.update(Wrappers.<Chart>lambdaUpdate() .eq(Chart::getId, chart.getId()) .set(Chart::getStatus, ChartStatusEnum.RUNNING.getValue())); if (!update) { handleChartErrorByMq(chartId, "更新图表状态为运行中失败"); return; } String result = sparkUtils.synchronousCalls(AiConstant.MATH_MAP, userInput.toString()); String[] split = result.split("【【【【【"); if (split.length < 3) { handleChartErrorByMq(chartId,"ai生成数据格式不正确"); return; } String genChart = split[1].trim(); String genResult = split[2].trim();
chart.setExecMessage("图表生成成功!!!"); chart.setStatus(ChartStatusEnum.SUCCEEd.getValue()); chart.setGenChart(genChart); chart.setGenResult(genResult); boolean updateById = chartService.updateById(chart); if (!updateById) { handleChartErrorByMq(chartId,"更新图表状态为成功中失败"); } Boolean credits = creditService.consumeCredits(chart.getUserId(), -1L); ThrowUtils.throwIf(!credits,ErrorCode.OPERATION_ERROR,"你的积分不足"); redisTemplate.opsForValue().set(CHAHE_KEY+chartId,JSONUtil.toJsonStr(chart)); channel.basicAck(deliverTag,false); }
|
死信队列消费者(负责处理死信)
收到死信后我是直接确认了,这种方式可能不好,你也可以换成其他方式比如重新入队,或者写入数据库并打上日志等等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component @Slf4j public class TtlQueueConsumer { @Resource BIMessageProducer biMessageProducer;
@SneakyThrows @RabbitListener(queues = "bi_dead_letter_queue", ackMode = "MANUAL") public void doTTLMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { log.info("已经接受到死信消息:{}", message); biMessageProducer.sendMessage(message); channel.basicAck(deliveryTag, false); } }
|