RabbitMq-死信队列介绍和用法

死信和死信队列的概念

什么是死信?简单来说就是无法被消费和处理的消息。一般生产者将消息投递到broker或者queue,消费者直接从中取出消息进行消费。但有时因为某些原因导致消息不能被消费,导致消息积压在队列中,这样的消息如果没有后续的处理就会变成死信,那么专门存放死信的队列就是死信队列


什么是死信交换机?

那么什么是死信交换机呢?死信交换机是指专门将死信路由到死信队列的交换机。

产生死信的原因

  • 消息超过TTL,即消息过期
  • 消息被nack或reject,且不予重新入队
  • 队列达到最大长度

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:\

pkoI1gJ.png

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

pkoIGuR.png

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

pkoIJD1.png

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

pkoINE6.png

TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

pkoIK4U.png


死信队列实战和应用

死信队列的应用并不难,无非就是多定义了一个交换机、routingKey和队列罢了。在声明普通队列时传入Map参数,往Map中put死信队列名称、死信routingKey、消息TTL等参数即可完成死信自动投递到死信队列的流程。通过如下代码即可绑定普通队列和死信交换机了,而且还能设置routingKey和队列长度等参数,无需像传统的那样通过channel绑定。

1
2
3
4
5
Map<String, Object> arguments = new HashMap<>(); // 过期时间
arguments.put("x-message-ttl", 10000); // 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信
routingKey arguments.put("x-dead-letter-routing-key", "lisi"); // 设置正常队列的长度限制 、
arguments.put("x-max-length", 10);

生产者Producer:
public class Producer { // 普通交换机名称 public static final String NORMAL_EXCHANGE = “normal_exchange”;

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
//死信消息 设置TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();

// 延迟消息
for (int i = 0;i < 10;i++) {
String message = i + "info";
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
}
}


普通队列消费者C1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Consumer01 {
// 普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
// 声明死信和普通交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明普通队列
Map<String, Object> arguments = new HashMap<>();
// 过期时间
arguments.put("x-message-ttl", 10000);
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "lisi");
// 设置正常队列的长度限制
arguments.put("x-max-length", 10);

// 声明普通队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("consumer01等待接收消息");

DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if (msg.equals("info5")) {
System.out.println("consumer01接收的消息:" + new String(message.getBody()));
System.out.println(msg + ":此消息是被拒绝的");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false); //拒绝此消息并不放回普通队列
} else {
System.out.println("consumer01接收的消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};

CancelCallback cancelCallback = consumerTag -> {
System.out.println("C1取消消息");
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
}
}

死信队列消费者C2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Consumer02 {
// 死信队列名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("consumer02等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("consumer02接收的消息:" + new String(message.getBody()));
};

CancelCallback cancelCallback = consumerTag -> {
System.out.println("C2取消消息");
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
}
}

依次启动生产者,和两个消费者,并停掉普通队列的消费者,我们发现生产者发送的消息被死信队列消费者C2给接收了。

在上面的代码中,我在普通队列中设置了消息的TTL为5s,但是我又在生产者设置发送的消息TTL为10s,那么RabbitMQ会以哪个为准呢?其实RabbitMQ会以较短的TTL为准


我的智能BI项目中运用死信交换机

声明交换机、队列和routingKey的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.miku.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static com.miku.constant.MessageConstant.*;

@Configuration
public class RabbitMqConfig {


//声明生成普通交换机
@Bean("BiExchange")
public DirectExchange Biexchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange(BI_EXCHANGE, true, false);
}


/**
* 声明死信的交换机
* @return
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
//声明队列
@Bean("BiChartQueue")
public Queue BiQueue(){
//使用QueueBulider构建队列,durable就是持久化的
Map<String,Object> map=new HashMap<>(3);
map.put("x-message-ttl",20000);
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
return QueueBuilder.durable(BI_QUEUE).withArguments(map).build();
}

/**
* 生成文本转md的队列
* @return
*/
@Bean("BiToMdQueue")
public Queue BiToMdQUeue(){
//使用QueueBulider构建队列,durable就是持久化的
Map<String,Object> map=new HashMap<>(3);
map.put("x-message-ttl",20000);
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
return QueueBuilder.durable(BI_TO_MD_QUEUE).withArguments(map).build();
}

/**
* 声明死信的队列
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

/**
* 绑定生成图表交换机和队列
* @return
*/
@Bean
public Binding binding(@Qualifier("BiChartQueue") Queue queue,
@Qualifier("BiExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(BI_ROUTING_KEY);
}

/**
* 绑定文本转md的交换机和队列
* @return
*/
@Bean
public Binding biToMdBinding(@Qualifier("BiToMdQueue") Queue queue,
@Qualifier("BiExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(BI_TO_MD_ROUTING_KEY);
}


/**
* 将死信队列和死信交换机绑定
* @param deadLetterQueueu
* @param deadLetterExchage
* @return
*/
@Bean
public Binding deadQueueBingingDeadExchange(@Qualifier("deadLetterQueue") Queue deadLetterQueueu,
@Qualifier("deadLetterExchange") DirectExchange deadLetterExchage){
return BindingBuilder.bind(deadLetterQueueu).to(deadLetterExchage).with(DEAD_LETTER_ROUTING_KEY);
}


@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}


普通消费者(负责异步生成图表信息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@SneakyThrows
@RabbitListener(queues = BI_QUEUE)
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliverTag){
log.info("接收消息是:{}",message);
if (StringUtils.isNotBlank(message)){
//如果失败,消息拒绝
channel.basicNack(deliverTag,false,false);
log.info("消息为空拒绝接收,此消息正在被转发到死信队列中");
}
Long chartId=Long.parseLong(message);
//查询chart表
Chart chart = chartService.getById(chartId);
if (chart==null){
channel.basicNack(deliverTag,false,false);
log.info("chartId为:{}的chart不存在",chartId);
throw new BusinessException(ErrorCode.PARAMS_ERROR,"chartId为:"+chartId+"的chart不存在");
}
String goal = chart.getGoal();
String chartType = chart.getChartType();
String csvData = chart.getChartData();

StringBuilder userInput = new StringBuilder();
userInput.append("分析需求:").append("\n");
//拼接分析目标
String userGoal = goal;
if (StringUtils.isNotBlank(chartType)) {
userGoal += ",请使用:" + chartType;
}
userInput.append(userGoal).append("\n");
userInput.append("原始数据:").append("\n");
userInput.append(csvData).append("\n");
//将Chart表的状态改为执行中
boolean update = chartService.update(Wrappers.<Chart>lambdaUpdate()
.eq(Chart::getId, chart.getId())
.set(Chart::getStatus, ChartStatusEnum.RUNNING.getValue()));
if (!update) {
handleChartErrorByMq(chartId, "更新图表状态为运行中失败");
return;
}
String result = sparkUtils.synchronousCalls(AiConstant.MATH_MAP, userInput.toString());
//对返回结果做拆分,按照5个中括号进行拆分
String[] split = result.split("【【【【【");
//拆分完还需要校验
if (split.length < 3) {
handleChartErrorByMq(chartId,"ai生成数据格式不正确");
return;
}
String genChart = split[1].trim();
String genResult = split[2].trim();

//TODO: 使用redis缓存图表数据,提高查询效率
chart.setExecMessage("图表生成成功!!!");
chart.setStatus(ChartStatusEnum.SUCCEEd.getValue());
chart.setGenChart(genChart);
chart.setGenResult(genResult);
//执行成功
//修改数据库状态为执行成功
boolean updateById = chartService.updateById(chart);
if (!updateById) {
handleChartErrorByMq(chartId,"更新图表状态为成功中失败");
}
//消费积分-每次使用消费1积分
Boolean credits = creditService.consumeCredits(chart.getUserId(), -1L);
ThrowUtils.throwIf(!credits,ErrorCode.OPERATION_ERROR,"你的积分不足");
//存入缓存
redisTemplate.opsForValue().set(CHAHE_KEY+chartId,JSONUtil.toJsonStr(chart));
//如果任务执行成功,手动执行ack
channel.basicAck(deliverTag,false);
}

死信队列消费者(负责处理死信)

收到死信后我是直接确认了,这种方式可能不好,你也可以换成其他方式比如重新入队,或者写入数据库并打上日志等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class TtlQueueConsumer {
@Resource
BIMessageProducer biMessageProducer;

@SneakyThrows
@RabbitListener(queues = "bi_dead_letter_queue", ackMode = "MANUAL")
public void doTTLMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
log.info("已经接受到死信消息:{}", message);
biMessageProducer.sendMessage(message);
channel.basicAck(deliveryTag, false);
}
}

文章结束!