- https://javabetter.cn/sidebar/sanfene/rocketmq.html
- https://www.cnblogs.com/crazymakercircle/p/17895128.html
- https://rocketmq.apache.org/zh/docs/featureBehavior/01normalmessage
- https://blog.csdn.net/qq_24950043/article/details/139741449
1、基础
1.为什么要使用消息队列?
消息队列是一种很重要的中间件技术,广泛存在分布式系统中, 以提高系统的可用性,解耦能力和异步通讯效率。
1.1 解耦
生产者将消息放入队列,消费中,从队列里消费消息,这样实现了生产者和消费者就实现了解耦,不直接交互;
1.2 异步
系统可以将那些耗时的任务放在消息队列中异步处理,从而快手响应用户的请求;
1.3 削峰
削峰填谷是一种应对系统高并发请求的瞬时流量高峰,通过消息队列,可以将瞬时高峰流量转为持续的低流量,从而保护系统的高可用性。
2、为什么选择rocketMQ?
3、rocketMQ的优缺点
- 优点:
- 单机吞吐量: 十万级
- 可用性: 非常高,分布式架构
- 消息可靠性: 经过参数优化配置,消息可以做到0丢失
- 功能支持: MQ功能完善,分布式,扩展性好
- RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ
- 缺点:
- 配置相对比较复杂,学习成本比较高
- 没有在 MQ 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
4、消息队列的模型
4.1 队列模型
生产者向某个队列发送消息,一个队列存在多个生产者的消息,一个队列也可以有多个消费者; 但是消费者之间是竞争关系,每条消息只能被一个消费者消费
4.2 发布/订阅模型
在这个模型中,消息的发送方是Publisher,消息的订阅方是Subscribe, 服务端存放消息的地方是Topic。
生产者发送消息给Topic,订阅了这个Topic的消费者全部都会收到这个消息;
这个消息,就可以被多个消费者消费了~
5、RocketMq的消息模型
RocketMq使用的是标准的发布订阅模型! RocketMq本身由一下几部分组成
- Message: 传输的消息。 一条Message必须包含Topic,可选的Tag和额外的键值对,用于在Broker查找此消息
- Topic(主题): 可以认为是消息的归类,是一级类型;Topic和生产者、消费者关系松散,一个Topic可以有多个消费者,一个生产者也可以同时向不同的Topic发送消息
- Tag(标签):可以看做是子主题,二级类型; 同一个业务模块不同目的地可以用相同的Topic,不同的tag来标识;
- Group:每个消费组中都有消费主题中一份完整的消息,不同消费组不受到影响;同一个消费组内,消费者之间存在竞争关系,只能被消费组的一个消费者消费
- Message queue(消息队列):一个Topic下可以有多个消息队列,Topic包括多个Message Queue, 如果一个Consumer需要获取Topic下的所有消息,就需要遍历所有的Message Queue
- Offset: 在Topic的消费过程中,由于消息需要被不同的消费者多次消费,所以消费完的消息不会立即删除; 这需要RocketMQ在每个消费组上维护一个消费位置(consumer Offset),每
消费一次,下标就+1;
6、消息的消费模式
- 集群模式:默认模式,一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费
- 广播模式:会给消费者组的每一个发送进行消费
7、Rocket的基本架构
8、基本架构的四个组成部分
8.1 NameServer
NameServer是一个无状态的服务器
- 功能:
- NameServer和Broker节点保持长连接
- 维护Topic的路由信息;服务发现功能, Broker 的心跳信息,确保其可用性
- 特点:
- 每个NameServer节点是相互独立的,彼此之前没有信息交互
- Nameserver 被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker,Consumer 也会定时从 NameServer 获取 Topic 的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 Topic 到 NameServer
8.2 Broker
消息存储和中转角色,负责存储和转发消息
8.3 Producer
消息生产者,用户端负责发送消息,由用户自行实现和分布式部署
Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
RocketMQ 提供了三种方式发送消息:同步、异步和单向
同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集
8.4 Consumer
消息消费者,负责消费消息, 一般是后台系统负责异步消费
- 消费模式:集群消费和广播消息
- 消息的获取模式: pull和push
- 拉取型:主动从服务器拉取消息,只要批量拉取到消息,用户应用就会启动消费过程
- 推送型:封装了消息的拉取,消费精度和其他内部维护工作,将消息到达时执行的回调留给用户程序来执行
进阶
9、如何保证消息的可用性、可靠性、不丢失呢?
消息可能会在三个阶段发生丢失: 生产阶段、存储阶段、消费阶段
9.1 生产阶段
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递
- 同步发送的时候,要注意处理响应结果和异常。 如果返回响应都OK,表示消息成功发送到broker; 如果响应失败或者发生其他异常,都应该重试。
- 异步发送的时候,应该在回调方法检查。如果发生失败或者异常,都应该进行重试。
- 如果发生超时异常,也可以通过查询日志的API,来检查是否在Broker存储成功。
9.2 存储阶段
在存储阶段,通过配置可靠性优先的Broker参数,来避免因宕机丢消息,简单的说可靠性优先的场景都应该使用同步;
- 消息只要持久化到CommitLog(日志文件),即使Broker宕机,未能消费的消息也能恢复以后再消息。
- Broker的刷盘机制: 同步刷盘和异步刷盘,不管哪种方式,都能保证消息一定存储在pagecache中,但是同步刷盘更可靠, Producer发送消息等数据持久化到磁盘之后再返回响应给productor。
- Broker通过主从模式来保证高可用,Broker支持master和slave同步复制、master和salve异步复制模式。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失。
9.3 消费阶段
从Consumer角度分析,如何确保消息被成功消费?
- Consumer保证消息成功消费的关键在确认的时机,不要在收到消息就立马发送消息确认, 而是在执行完所有消费的业务逻辑之后再确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去消息队列拉取消息,就还是之前的那一条。
10.如何处理消息重复的问题?
RocketMQ可以保证消息一定投递且不丢失,但是无法保证消息不重复消费
因此,需要再业务端做好幂等处理,或者消息去重。
11、怎么处理消息积压?
-
消费者扩容:如果当前topic的Message Queue的数量大于消费者数量,直接扩容消费者的数量,提高消费能力,尽快把积压的消息消费完。
-
消息迁移Queue扩容:如果当前topic的MessageQueue的数量小于消费者, 队列长度过小了。可以先临时的Topic,临时的topic多设置一些MessageQueue,然后先用一些消费者把消费的数据丢到临时的 Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的 Topic 里的数据,消费完了之后,恢复原状。
-
假设某个topic积压了100w的消息
-
1、短期方案:
- 新增5个Consumer实例
- 每个实例设置批量拉取参数pullbatchSize =1000
- 在消费逻辑中引入异步处理,减少每条消息的处理时间
-
2、长期方案:
- 分析积压原因,由于生产速率过快还是消费性能不足,比如慢sql,调用时间等长耗时的操作。
- 对系统进行限流或者扩容,根据监控进行动态调整。
12.顺序消息如何实现?
14.延时消息?
电商订单的超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h以后去检查订单状态,若果还未付款就取消订单
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
但是,目前RocketMQ支持的延时级别是有限的:private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
RocketMQ如何实现延时消息的,临时存储+定时任务!
15.怎么实现分布式消息事务?
半消息: 是指暂时还不能被Consumer消费的消息, Produce成功发生到Broker端的消息,但是此消息被标记为“暂不可投递”状态,只有等Producer端执行完本地事务二次确人之后Consumer才能消费此消息。
1.Producer 向 broker 发送半消息
2.Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
3.Producer 端执行本地事务。
4.正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
5.异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
6.Producer 端查询本地事务的状态
7.根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
8.消费者段消费到消息之后,执行本地事务。
16. 死信队列是什么?
死信队列用于存储哪些无法被着正常处理的消息,这些消息被称为死信
产生死信的原因: 消费者在处理时发生异常,且达到了最大重试次数。 当消费失败的原因被找到后,可以重发这些消息,让消费者重新消费!如果暂时无法处理,为避免到期后被删除,可以先将死信消息导出进行保存。
17. rocketMQ的整体工作流程
rocketMQ是一个分布式消息队列,也就是消息队列+分布式系统
主要有四部分组成: NameServer注册中心集群、 Producer生产者集群、Consumer消费者集群、Broker组成:
- 1.Broker启动时向所有的NameServer注册,保存长连接,没30s发生一次心跳。
- 2.Producer在发生消息的时候向NameServer获取Broker的服务器地址,根据负载均衡算法选择一台服务器来发生消息
- 3.Consumer消费消息同样先从 向NameServer获取Broker的服务器地址,然后主动拉取消息来消费。
https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/nice-article/weixin-mianznxrocketmqessw-ec571bd4-fa24-4ada-87ab-f761a7dfdf3f.jpg
__END__