**高并发、微服务 、性能调优实战案例100讲，所有案例均源于个人工作实战，均配合代码落地**

加我微信：itsoku，所有案例均提供在线答疑。



# 第29节 MQ专题-如何确保消息的可靠性

视频可能有点长，请大家认证看完，本文对于工作或面试，都是非常有用的一节。

<span style="font-weight:bold; color:red">目前整个课程59块钱，100个案例，含所有源码 & 文档 & 技术支持，可点击左下角小黄车了解</span>

## 消息的使用过程

![image-20240428175659655](img/image-20240428175659655.png)



## 整个过程中消息可靠性需要考虑3个问题

1. 如何确保生产者这边业务执行成功，消息一定会投递到MQ？
2. 如何确保消息到达MQ后，MQ这边不会丢失？
3. 如何确保消费者一定能消费到这条消息？



## 如何确保生产者这边业务执行成功，消息一定投递成功？

这块涉及到消息投递的整个过程，下面咱们来通过一个案例来了解消息投递的整个过程，以及在这个过程中，如何确保业务执行成功，消息一定会投递成功。



## 电商中有这样的一个场景

下单成功之后送积分的操作，我们使用mq来实现

下单成功之后，投递一条消息到mq，积分系统消费消息，给用户增加积分

下面会介绍4种方式，来看下这个业务中消息投递的一个过程，这些方案，面试的时候，如果你能够说出来，会让面试官对你刮目先看。



## 消息投递方式一：业务事务中投递消息

### 正常流程

- **step1**：开启本地事务
- **step2**：生成购物订单
- **step3**：投递消息到mq
- **step4**：提交本地事务

这种方式是将发送消息放在了事务提交之前。

### 异常情况

- step3发生异常：导致step4失败，商品下单失败，直接影响到商品下单业务
- step4发生异常，其他step成功：商品下单失败，消息投递成功，给用户增加了积分



## 消息投递方式二：业务事务提交后、后投递消息

下面我们换种方式，我们将发送消息放到事务之后进行。

### 正常流程

- **step1**：开启本地事务
- **step2**：生成购物订单
- **step3**：提交本地事务
- **step4**：投递消息到mq

### 异常情况

step4发生异常，其他step成功：导致商品下单成功，投递消息失败，用户未增加积分

上面两种是比较常见的做法，也是最容易出错的。



## 消息投递方式三：事务消息（二阶段投递）

### 需要再本地业务库添加一张本地消息表（t_msg_record）

- id
- body：消息体
- status：消息状态：0：待投递，1：投递成功，2：投递失败

### 正常流程

- **step1**：开启本地事务
- **step2**：生成购物订单
- **step3**：本地库中插入一条需要发送消息的记录t_msg_record，status为0（待投递）
- **step4**：提交本地事务
- **step5**：若事务提交成功，则投递消息到MQ，然后将t_msg_record中的status置为1（投递成功）；若本地事务提交失败，则将t_msg_record表中的消息记录删掉

### 说明

这种方式借助了数据库的事务，业务和消息记录作为了一个原子操作，业务成功之后，消息记录必定是存在的。

### 异常情况

若step4成功，step5失败了，会导致业务执行成功，而消息投递失败，此时我们需要有个job对待发送的消息进行补偿投递。

### 消息投递补偿job

这个job负责从本地t_msg_record表中查询出状态为0记录，重新投递。

对于投递失败的，采用衰减的方式进行重试，比如第1次失败了，则10秒后，继续重试，若还是失败，则再过20秒，再次重试，需要设置一个最大重试次数，最终还是投递失败，则需要告警+人工干预。

## 建议

若公司微服务比较多，都会用到MQ这块，那么可以将这块做成一个springboot的starter，使用起来更容易。



## 消息投递方式四：独立出来一个消息服务

增加一个**消息服务**及**消息库**，负责消息的落库、将消息发送投递到mq，注意这里新增的一个消息服务可以是一个SpringBoot应用，如果你是做架构的可以考虑这种方式，或者面试的时候也可以给面试官介绍下这种方案

### 业务库需要添加一张消息日志表（t_msg_log）

- id
- bus_id：业务id
- bus_type：业务类型

### 消息服务需要一张消息表（t_msg)

- id：主键，消息id
- msg_log_id：业务方t_msg_log表的id
- body：消息体
- msg_log_url：业务方t_msg_log记录回查的接口
- status：状态，0：待投递，1：投递成功，2：投递失败
- fail_msg：投递失败原因

### 消息投递的过程

- **step1**：开启本地事务
- **step2**：生成购物订单
- **step3**：本地库t_msg_log表写入一条记录：insert into t_msg_log (bus_id,bus_type） values ('订单id','CREATE_ORDER')
- **step4**：调用消息服务，需携带（t_msg_log.id，消息体，消息日志回查的url），消息服务接收到请求后，向t_msg表插入记录（status=0，待发送），并返回消息id：msg_id
- **step5**：提交本地事务
- **step6**：如果上面都成功，使用step4中的msg_id调用消息服务，消息服务则将消息投递到mq中，修改消息记录状态为投递成功(t_msg.status=1)；如果上面有失败的情况，则消息服务将消息删掉

### 可能存在的问题

若step6失败，消息服务t_msg表中的这条消息，将处于待发送状态，但是业务库订单已经生成了，以及t_msg_log表也是有记录的，对于这种情况，消息服务需新增一个job，对于t_msg表中记录为0的消息，拿到t_msg表中的msg_log_id去回查msg_log_url这个接口，去查一下业务库中的t_msg_log 表是否有记录，有记录说明业务是执行成功的，此时消息服务补发消息到MQ就可以了；对于回查不到的，有可能业务方本地事务还未提交，不能认定为业务方本地事务执行失败了，建议等到1天之后，再清理下这种消息。



## 如何确保消息到达MQ后，在MQ这边不会丢失？

有些MQ为了性能，收到消息后，会将消息放在内存中，并没有立即持久化到磁盘，此时MQ挂了，消息会丢失。

若要确保MQ收到消息后，消息不会丢失，则收到投递过来的消息后，立即持久化，这个操作基本上所有的MQ都是支持的，使用的时候配置一下就可以了。

为了防止MQ单节点故障，MQ还需要做主备，这样才可以最大限度的确保消息不会丢失。



## 消费者如何确保消息一定会被消费？

消费者这边可以采用下面的过程，可以确保消息一定会被消费。

- **step1**：从MQ中拉取消息

- **step2**：执行业务逻辑（需要做幂等）

- **step3**：通知MQ删除这条消息

由于step3这个步骤涉及到网络通信，网络通信存在不可靠的因素，可能会失败，导致消息没有被删掉，就会出现该消息再次消费的情况，所以step2需要做幂等处理，这种方式可以确保消息必然会被成功消费一次。



## 下节预告

下节带大家实现事务消息的代码，还没有关注的朋友，先关注下。



# 高并发 & 微服务 & 性能调优实战案例100讲

## 已更新 29 节课

<span style="font-weight:bold; color:red">目前整个课程59块钱，含所有源码 & 文档 & 技术支持，一杯咖啡的价格，还没下手的朋友，赶紧了，马上要涨价了</span>。

```java
1. 分片上传实战
2. 通用并发处理工具类实战
3. 实现一个好用接口性能压测工具类
4. 超卖问题的4种解决方案，也是防止并发修改数据出错的通用方案
5. Semaphore实现接口限流实战
6. 并行查询，优化接口响应速度实战
7. 接口性能优化之大事务优化
8. 通用的Excel动态导出功能实战
9. 手写线程池管理器，管理&监控所有线程池
10. 动态线程池
11. SpringBoot实现动态Job实战
12. 并行查询，性能优化利器，可能有坑
13. 幂等的4种解决方案，吃透幂等性问题
14. 接口通用返回值设计与实现
15. 接口太多，各种dto、vo不计其数，如何命名？
16. 一个业务太复杂了，方法太多，如何传参？
17. 接口报错，如何快速定位日志？
18. 线程数据共享必学的3个工具类：ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal
19. 通过AOP统一打印请求链路日志，排错效率飞升
20. 大批量任务处理常见的方案（模拟余额宝发放收益）
21. 并发环境下，如何验证代码是否正常？
22. MySql和Redis数据一致性
23. SpringBoot数据脱敏优雅设计与实现
24. 一行代码搞定系统操作日志
25. Aop简化MyBatis分页功能
26. ThreadLocal 遇到线程池有大坑 & 通用解决方案
27. SpringBoot读写分离实战（一个注解搞定读写分离 && 强制路由主库）
28. MQ专题-MQ典型的使用场景
29. MQ专题-如何确保消息的可靠性
```



## 课程部分大纲，连载中。。。。

以下课程均来源于个人多年的实战，均提供原理讲解 && 源码落地

1. 分片上传实战
2. 通用并发处理工具类实战
3. 实现一个好用接口性能压测工具类
4. 超卖问题的4种解决方案，也是防止并发修改数据出错的通用方案
5. Semaphore实现接口限流实战
6. 并行查询，优化接口响应速度实战
7. 接口性能优化之大事务优化
8. 通用的Excel动态导出功能实战
9. 手写线程池管理器，管理&监控所有线程池
10. 动态线程池
11. SpringBoot实现动态Job实战
12. 并行查询，性能优化利器，可能有坑
13. 幂等的4种解决方案，吃透幂等性问题
14. 接口通用返回值设计与实现
15. 接口太多，各种dto、vo不计其数，如何命名？
16. 一个业务太复杂了，方法太多，如何传参？
17. 接口报错，如何快速定位日志？
18. 线程数据共享必学的3个工具类：ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal
19. 通过AOP统一打印请求链路日志，排错效率飞升
20. 大批量任务处理常见的方案（模拟余额宝发放收益）
21. 并发环境下，如何验证代码是否正常？
22. MySql和Redis数据一致性
23. SpringBoot数据脱敏优雅设计与实现
24. 一行代码搞定系统操作日志
25. Aop简化MyBatis分页功能
26. ThreadLocal 遇到线程池有大坑 & 通用解决方案
27. SpringBoot读写分离实战（一个注解搞定读写分离 && 强制路由主库）
28. MQ专题：MQ典型的7种使用场景
29. MQ专题：如何确保消息的可靠性
30. MQ专题：事务消息落地
31. MQ专题：消息幂等消费通用方案
32. MQ专题：延迟消息通用方案实战
33. MQ专题：顺序消息通用方案实战
34. MQ专题：消息积压问题
35. 分布式事务：事务消息实现事务最终一致性
36. 分布式事务：通用的TCC分布式事务生产级代码落地实战
37. 分布式锁案例实战
38. 微服务中如何传递上下文？实战
39. 微服务链路日志追踪实战（原理&代码落地）
40. SpringBoot实现租户数据隔离
41. MyBatis进阶：封装MyBatis，实现通用的无SQL版CRUD功能，架构师必备
42. MyBatis进阶：自己实现通用分表功能，架构师必备
43. MyBatis进阶：实现多租户隔离ORM框架
44. SpringBoot中实现自动监听PO的变化，自动生成表结构
45. 分布式专题：其他实战课程等
46. 性能调优：如何排查死锁？
47. 性能调优：如何排查内存溢出？
48. 性能调优：CPU被打满，如何排查？
49. 性能调优：生产代码没生效，如何定位？
50. 性能调优：接口太慢，如何定位？
51. 性能调优：如何查看生产上接口的入参和返回值？
52. 性能调优：远程debug
53. 生产上出现了各种故障，如何定位？
54. db和缓存一致性，常见的方案
55. Redis场景案例。。。
56. 系统资金账户设计案例（一些系统涉及到资金操作）
57. 其他等各种实战案例。。。





