RabbitMQ

RabbitMQ

同步调用的优势

  • 时效性,等待到结果返回后

同步调用的问题

  • 扩展性差
  • 性能下降
  • 级联失败问题

异步调用

  • 就是基于消息通知的方式 一般包含三个角色
  • 消息发送着:投递消息的人,就是原来的调用方
  • 消息代理:管理,缓存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接受和处理消息的人,就是原来的服务提供方

异步调用的优势

  • 解除耦合,扩展性强
  • 无需等待,性能好
  • 故障隔离
  • 缓存消息,流量削峰填谷

异步调用的问题是什么

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于broker的可靠性

在docker中安装镜像

  • 执行
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
  • 访问15672端口 账号密码:guest

认识RabbitMQ

image-20231226202616352

  • 交换机是用来路由转发消息,不能保存消息

快速使用

需求

  • 新建队列hello.queque1和hello.queue2
  • 向默认的amp.fanout交换机发送一条消息
  • 查看消息是否到hello.queque1和hello.queque2
  1. 创建消息队列image-20231226203358984

  2. 选择交换机image-20231226203429342

  3. 将交换机和消息队列绑定image-20231226203959966

  4. 发送消息image-20231226204053537

  5. 查看消息image-20231226204406763

添加虚拟主机

需求

  • 在RabbitMQ的控制台完成下列操作:
  • 新建一个用户ya
  • 为ya用户创建一个virtual host
  • 测试不同virtual host之间的数据隔离现象

image-20231226205325183

SpringAMQPA

image-20231226210403311

使用

  1. 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在客户端创建一个新的队列
  2. 配置文件
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /yaya #虚拟主机
username: ya
password: 123 #这里密码要和虚拟主机的对应
  1. springAMQP 提供了RabbitTemplate 工具类
  2. 测试代码
package com.ya.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* @program: untitled
* @description:
* @author: ChestnutDuck
* @create: 2023-12-26 21:28
**/
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void testSendMessageQueue(){
String queueName="demo.queue";
String msg="hello ,我是publisher";
this.rabbitTemplate.convertAndSend(queueName,msg);
}
}

  1. 结果image-20231226213311532

  2. 监听消息队列

package com.ya.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @program: untitled
* @description:
* @author: ChestnutDuck
* @create: 2023-12-27 14:38
**/
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "demo.queue")
public void ListenSimpleQueue(String msg){
System.out.println("收到了发送者的消息:"+msg);
}
}

  1. 发送消息,结果如下image-20231227145007665

模拟workQueue

  • image-20231227145202439

  • 消费者

@RabbitListener(queues = "work.queue")
public void ListenWorkQueue(String msg){
System.err.println("消费者1收到了发送者的消息:"+msg); //err表示错误 打印出来的是红色的字
}
@RabbitListener(queues = "work.queue")
public void ListenWorkQueue2(String msg){
System.out.println("消费者2收到了发送者的消息:"+msg);

}
  • 发送者
@Test
void testWorkQueue() throws InterruptedException {
String queueName="work.queue";
for (int i=0;i<50;i++){
String msg="hello ,我是publisher"+i;
this.rabbitTemplate.convertAndSend(queueName,msg);
Thread.sleep(20);
}
}
  • 测试结果image-20231227150006304

  • 如果有多个消费者,他会平均分配消息队列 工作方式像轮询

  • 现在假如两个消费者处理速度的能力不同分别在他们执行完打印语句后睡20ms和100ms

  • 发现只是处理速度变了,但还是一人执行一半image-20231227150743930

  • 这对于微服务来说并不合适,应该按照谁的处理能力强,谁多处理一些,我们可以通过设置application.yml 的preFetch的值为1,确保同一时刻最多投递给消费者1条消息

spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /yaya #虚拟主机
username: ya
password: 123
listener:
simple:
prefetch: 1 #表示处理完一条消息才能继续接受

  • 很明显的可以看到处理快的处理多image-20231227151449282

work模型

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

  • 真正的生产环境都会经过exchange来发送消息,而不是直接通过发送到队列
  • Fanout Exchange 会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
  • image-20231227153614081
@Test
void testSendFanout(){
String exchange="ya.fanout"; //自己创建的fanout
String msg="hello ,everyone";
this.rabbitTemplate.convertAndSend(exchange,null,msg);
}
  • 绑定的每个消费者都会收到广播image-20231227154340798

  • fanoutExchange会将消息路由到每个绑定的队列

Direct交换机

  • Direct Exchange 会将接受到的消息根据规则路由到指定的Queue。因此称为定向路由

  • 每一个Queue都与交换机设置一个bindingKey

  • 发布者发送消息时。指定消息的RoutingKey

  • 交换机将消息路由到BindKey与消息RoutingKey一致的队列

  • image-20231227155132715

  • image-20231227155600972

@Test
void testSendDirect(){
String exchange="ya.direct";
String msg="红色警报,";
this.rabbitTemplate.convertAndSend(exchange,"yellow",msg); //会根据你提供的不同的RoutingKey 去决定发送给内个消费者
}

Topic交换机

  • TopicExchange 与 DirectExchange类似,区别在与routingKey可以是多个单词的列表,并且以.分割。

  • Queue与Exchange指定BindingKey时可以使用通配符

  • #: 代指0个或多个单词

  • *: 代指一个单词

  • image-20231227160701291

@Test
void testSendTopic(){
String exchange="ya.topic";
String msg="红色警报,";
this.rabbitTemplate.convertAndSend(exchange,"china.news",msg); //这个只会绑定你设置好的通配符
}

声明队列和交换机

  • SpringAMQP提供了几个类,用来声明队列、交换机及绑定关系
  • Queue 用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Bindnig 用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

基于bean声明

package com.ya.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @program: untitled
* @description:
* @author: ChestnutDuck
* @create: 2023-12-27 16:19
**/
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
FanoutExchange build = (FanoutExchange) ExchangeBuilder.fanoutExchange("ya.fanout").build();
// return new FanoutExchange("ya.fanout")
return build;
}
@Bean
public Queue FanoutQueue(){
// QueueBuilder.durable("").build();
return new Queue("fanout.queue");
}
@Bean
public Binding fanoutBinding(){
return BindingBuilder.bind(FanoutQueue()).to(fanoutExchange());
//不是调用的方法,是在容器中找bean
}

}

基于注解声明

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name="direct.queue1",durable = "true"),//是否可持续化
exchange = @Exchange(name = "ya.direcct",type= ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void ListenDirectQueue(String msg) throws InterruptedException {
System.err.println("消费者1收到了发送者的消息:"+msg);

}

消息转化器

  • image-20231227164439701

  • 对象会被序列化image-20231227165246426

  • image-20231227165332889

  • 所以用json序列化

  • 引入依赖

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
  • 注入json序列化工具
    @Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
//注意消费端和发送端都要注入这个bean
  • 效果image-20231227170415147

MQ高级

消息可靠性

发送者可靠性问题

生产者重连

  • 有的时候由于网络波动,可能会出现客户端连接Q失败的情况。通过配置我们可以开启连接失败后的重连机制(注意是在发送者这边配置)
spring:
rabbitmq:
connection-timeout: 1s #设置Mq的连接超时问题
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval *mutiplier
max-attempts: 3 # 最大重视次数

  • 测试结果
2024-01-16 17:21:50.885  INFO 25860 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2024-01-16 17:21:53.925 INFO 25860 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2024-01-16 17:21:56.955 INFO 25860 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]

  • 从测试结果中我们可以看到重试了三次
  • 重试不是消息发送的重试,而是连接重试
  • 注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
    如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

  • 侧重消息发送时失败改怎么办
  • 开启确认机制后。在MQ成功收到消息后会返回确认消息给生产者 返回结果有以下几种情况
    • 消息投递到了MQ ,但是路由失败。此时会通过PublisherReturndan返回路由异常原因,然后返回ACK,告知投递成功 (一般不会产生)
    • 临时消息投递到MQ,并且入队成功,返回ACK,告知投递成功
    • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
    • 其他情况都会返回NACK,告知投递失败

image-20240116174123496

实现生产者确认
  1. 在生产者的微服务的application.yml中添加配置
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
# publisher-confirm-type有三种模式可选:
# none :关闭confirm机制
#simple :同步阻塞等待MQ的回执消息
#correlated:MQ异步回调方式返回回执消息
publisher-returns: true #开启publisher return机制 专门用来返回路由消息
  1. 每个RabbitTemplate 只能配置一个ReturnCallback 因此需要在项目启动过程中配置
@Slf4j
@Configuration
public class ConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获RabbitTemplate
RabbitTemplate rabbitTemplate =applicationContext.getBean(RabbitTemplate.class);//设置Returncallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("收到的消息的return callback,exchange:{},key:{},msg:{},code:{},text:{}",
returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),
returnedMessage.getReplyCode(),returnedMessage.getReplyText());
}
});
}
}
  1. 发送消息,指定消息ID、消息ConfirmCallback
@Test
void testConfirmCallBack(){
//1. 创建cd
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); //获取一个随机的id
//2. 添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){

@Override
public void onSuccess(CorrelationData.Confirm result) {
if(result.isAck()){ // result.isAck() , boolean类型, true代表ack回执,false 代表nack回执
log.debug("发送消息成功,收到ack!");
}else {// result.getReason(), string类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack,reason : {}",result.getReason()); //之后应该是重发消息
}
}

@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败",ex);
}
});
this.rabbitTemplate.convertAndSend("ya.direct","yellow","hello",cd);
Thread.sleep(2000); //因为回调是需要时间 ,但是测试在单元测试中 单元测试结束后就销毁
}

image-20240116182642863

  • 将路由故意写错会返回如下消息
2024-01-16 18:33:35.220 DEBUG 23152 --- [ 127.0.0.1:5672] com.ya.publisher.SpringAmqpTest          : 发送消息成功,收到ack!
2024-01-16 18:33:35.220 DEBUG 23152 --- [nectionFactory1] com.ya.publisher.config.ConfirmConfig : 收到的消息的return callback,exchange:ya.direct,key:yellow2,msg:(Body:'"hello"' MessageProperties [headers={spring_returned_message_correlation=6458d228-a7b6-41e8-a869-135af26cf093, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),code:312,text:NO_ROUTE
  • 将交换机写错后 返回消息如下
2024-01-16 18:34:58.710 ERROR 21632 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'ya.dir' in vhost '/yaya', class-id=60, method-id=40)
2024-01-16 18:34:58.710 ERROR 21632 --- [nectionFactory1] com.ya.publisher.SpringAmqpTest : 发送消息失败,收到 nack,reason : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'ya.dir' in vhost '/yaya', class-id=60, method-id=40)

  • 如何确认消息发送可靠 首先可以在mq中配置生产者的重连机制,避免网络波动导致消息发送的失败,如果是其他原因导致的消息发送失败,Mq支持生产者确认机制,当我们开启消息确认机制,消息到Mq后,Mq会返回一个ACK回执,如果发送失败,Mq会返回一个nack,这样我们可以经过回执的情况做相应的操作,然后我们可以基本确认消息可靠性,但是以上手段会增加系统的负担和开销,因此在大多情况下不建议开启消息确认机制,除非对消息可靠性有较高要求

MQ可靠性

  • 在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题
    1. 一旦MQ宕机,内存中的消息会丢失
    2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

数据持久化

  • RabbitMq实现数据持久化包括3个方面

    • 交换机持久化
    • 队列持久化
  • 上面这两个spring会自动持久化

    • 消息持久化 重启之后消息仍然在,消息保存在磁盘,不会进行mq操作内存,引起Mq阻塞
    • 向Mq发送一百万条消息并将消息设置为非持久化
@Test
void testPageOut(){
Message build = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)) //手动设置消息的字符集
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //设置消息为非持久化消息
for (int i=0;i<1000000;i++){
this.rabbitTemplate.convertAndSend("fanout.queue",build);
}

}

image-20240118184814999

  • In memory可以看到消息在内存中存放

  • 内个黄线表示接受消息的速率,当内存满的时候 PagedOut会往磁盘里放,接受的速率会变成0,产生mq阻塞

  • 将消息改为持久化

image-20240118190224124

  • 内存满的时候,曲线虽然会下降,但是不会到0,不会出现阻塞,性能相较之前有提高,但是不是特别好

Lazy Queue

  • 从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。惰性队列的特征如下:

    1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)

    2. 消费者要消费消息时才会从磁盘中读取并加载到内存

    3. 支持数百万条的消息存储

      在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

  • 响应速度会相应的慢一点

  • 要设置一个队列为惰性队列,只需要在声明队列时。指定x-queue-mode模式为lazy即可

  • 基于bean声明

@Bean
public Queue FanoutQueue(){
QueueBuilder.durable("")
.lazy() //开启Lazy模式
.build();

return new Queue("fanout.queue");
}
  • 基于注解声明
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void ListenLazyQueue(String msg) {
System.err.println("消费者1收到了发送者的消息:"+msg);
}
  • image-20240118194140429

  • 过来一条消息往磁盘中存一条,不会有阻塞,且性能变高

RabbitMQ如何保证消息的可靠性

  • 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样
    队列中的消息会持久化到磁盘,MQ重启消息依然存在。
  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为
    队列的默认模式。LazyQueue会将所有消息都持久化。
  • 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才
    会给生产者返回ACK回执

消费者的可靠性

消费者确认机制

  • 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
    1. ack:成功处理消息,RabbitMQ从队列中删除该消息
    2. nack:消息处理失败,RabbitMQ需要再次投递消息
    3. reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息 (消息本身有问题)

image-20240118204937030

  • spring 动态代理在接受到消息时 会调用你的消息处理逻辑 ,如果是成功的,会自动返回ack,如果抛出异常 spring根据异常类型返回相应状态码

  • SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式

    • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全不建议使用
    • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
    • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时,根据异常判断返回不同结果:
      • 如果是业务异常,会自动返回nack
      • 如果是消息处理或校验异常,自动返回reject
  • 在消费者微服务yml中配置

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none #none,关闭ack;manual,手动ack;auto 自动ack


消息失败处理

  • 当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无线循环。导致mq的消息处理飙升,带来不必要的压力我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始的失败等待时长为1秒
multiplier: 1 #下次失败的等待时长倍数,下次等待时间=multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true 无状态;false有状态 如果业务中包含事务,这里改为false
  • 上面这种情况 如果多次重试失败,超过最大重试次数,消息会丢失
  • 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
    • RejestAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    • ReoublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机,再进行相应处理,或者给开发人员发短信

image-20240118225218632

ReoublishMessageRecoverer

  1. 首先,定义接受失败消息的交换机、队列及其绑定关系
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",
name = "enabled",havingValue = "true") //只有这个spring.rabbitmq.listener.simple.retry.enabled=true 时,这个bean才会生效
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
}
  1. 然后定义RepublishMessageRecoverer
@Bean
public MessageRecoverer ReoublishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","erros"); //后面的时key
}
  1. 发生异常时,控制台打印信息(向交换机error.direct 发送消息 key为error)
ConsumerApplication in 1.027 seconds (JVM running for 1.696)
消费者2收到了发送者的消息:hello
消费者2收到了发送者的消息:hello
消费者2收到了发送者的消息:hello
2024-01-18 23:10:19.477 WARN 30344 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.direct' with routing key error
  1. mq中控台信息为,方便后续开发人员处理

image-20240118231449883

  • 消费者如何保证消息一定被消费?
    • 开启消费者确认机制为auto,由spring确认消息处理成功后返
      回ack,异常时返回nack
    • 开启消费者失败重试机制,并设置MessageRecoverer,多次重
      试失败后将消息投递到异常交换机,交由人工处理

业务幂等性

  • 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
  • image-20240119183845036

唯一消息id

  • 方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
    1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
    2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
    3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();//2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true) ;
return jjmc;
}
  • 源码实现image-20240119191524675

  • 只需设置为true 会自动产生id

  • image-20240119192042845

  • 下面这种是生产者开启后消息会带一个id

业务判断

  • 结合业务逻辑,基于业务本身做判断。以我们的业务为列:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其他状态不做处理

  • mybatisplus(技巧与此处无关)image-20240119193457988

  • 如何保证支付服务与交易服务之间的订单状态一致性?

    1. 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
    2. 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
    3. 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。
  • 如果交易服务消息处理失败,有没有什么兜底方案?

    • 我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

  • 生产者发送消息时指定一个时间。消费者不会立刻收到消息,而是在指定时间之后才收到消息

  • 延时任务:设置在一定时间之后才执行的任务

  • image-20240119195329596

  • 在一些秒杀 订票的场景下,如果你占着订单一直不支付、会影响其他顾客的利益或商家的利益、可以下单之后发送一个延时消息、在收到延时消息后删掉对应未支付的订单或修改订单为已过期

死信交换机

  • 当一个队列中的消息满足下列情况之一时,就会成为死信( dead letter)

    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的regueue参数设置为false·
    • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
    • 要投递的队列消息堆积满了,最早的消息可能成为死信
  • 如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机( Dead Letter Exchange,简称DLX)。image-20240119200131745

  • 在代码中加入时效

    @Test
void testSend(){
// Message build = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
// .setExpiration("10000")
// .build();//这里不用消息转换器是因为上面的错误消息会发到之前的错误交换机中,这里silmle.queue没有绑定消费者mq会当成错误信息
this.rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", new MessagePostProcessor() {//这里是后置消息处理器
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("3000");
return message;
}
});
log.info("消息发送成功");
}
}

image-20240119204433271

image-20240119204451321

  • 这里延时了3秒

延迟消息插件

  • RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

  • 下载地址(https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases)

  • 将下载好的插件复制到rabbit的插件下 image-20240119211532487

  • 进入容器内部 docker exec -it rabbit /bin/bash

  • 执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 重启容器

  • 在代码中声明延迟交换机时加入delayed 属性

  • 基于注解声明

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name="delay.queue",durable = "true"),//是否可持续化
exchange = @Exchange(name = "ya.direct",type= ExchangeTypes.DIRECT,delayed="true" ),
key = "delay"
))
  • 基于bean声明
   @Bean
public FanoutExchange fanoutExchange(){
FanoutExchange build = (FanoutExchange) ExchangeBuilder
.fanoutExchange("delay.fanout")
.delayed() //设置delayed属性为true
.durable(true) //持久化
.build();
// return new FanoutExchange("ya.fanout")
return build;
}
  • 发送消息时需要通过头x-delay来设置过期时间
this.rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置延迟消息
message.getMessageProperties().setDelay(5000);
return message;
}
});
  • 如果延时消息过长可能会造成定时任务过多,会给服务器cpu带来压力,Mq的延时消息适用于短时

取消超时订单

  • 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
    • 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大
    • 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

image-20240119215427663

  • 优化:可以把很长的延时消息分成几个,大部人在购物时可能会直接支付,挂着不支付这样占少数