消息队列

消息队列原理以及各种消息中间件
帅旋
关注
充电
IT宅站长,技术博主,架构师,全网id:arthinking。

RocketMQ特性详解

发布于 2021-10-17 | 更新于 2024-05-16

1 生产端

1.1 消息发布

RocketMQ中定义了如下三种消息通信的方式:

1
2
3
4
5
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
  • SYNC:同步发送,生产端会阻塞等待发送结果;
    • 应用场景:这种方式应用场景非常广泛,如重要业务事件通知。
  • ASYNC:异步发送,生产端调用发送API之后,立刻返回,在拿到Broker的响应结果后,触发对应的SendCallback回调;
    • 应用场景:一般用于链路耗时较长,对 RT 较为敏感的业务场景;
  • ONEWAY:单向发送,发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别;
    • 应用场景:适用于耗时非常短,对可靠性要求不高的场景,如日志收集。

SYNC和ASYNC关注发送结果,ONEWAY不关注发送结果。发送结果如下:

1
2
3
4
5
6
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
  • SEND_OK:消息发送成功。SEND_OK并不意味着投递是可靠的,要确保消息不丢失,需要开启SYNC_MASTER同步或者SYNC_FLUSH同步写;
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是刷盘超时。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒内没有完成消息的刷盘,则会返回这个状态;
  • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。如果Broker的brokerRole=SYNC_MASTER,并且5秒内没有完成同步,则会返回这个状态;
  • SLAVE_NOT_AVAILABLE:消息发送成功,但是无可用的Slave节点。如果Broker的brokerRole=SYNC_MASTER,但是没有发现SLAVE节点或者SLAVE节点挂掉了,那么会返回这个状态。

源码内容更精彩,欢迎大家进一步阅读源码详细了解消息发送的内幕:

  • 同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
  • 异步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
  • 单向发送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)

1.2 顺序消费

消息的有序性指的是一类消息消费的时候,可以按照发送顺序来消费,比如:在Java架构杂谈茶餐厅吃饭产生的消息:进入餐厅、点餐、下单、上菜、付款,消息要按照这个顺序消费才有意义,但是多个顾客产生的消息是可以并行消费的。顺序消费又分为全局顺序消费和分区顺序消费:

  • 全局顺序:同一个Topic下的消息,所有消息按照严格的FIFO顺序进行发布和消费。适用于:性能要求不高,所有消息严格按照FIFO进行发布和消费的场景;
  • 分区顺序:同一个Topic下,根据消息的特定业务ID进行sharding key分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费。适用于:性能要求高,在同一个分区中严格按照FIFO进行发布和消费的场景。

一般情况下,生产者是会以轮训的方式把消息发送到Topic的消息队列中的:

image-20211017213242909

在同一个Queue里面,消息的顺序性是可以得到保证的,但是如果一个Topic有多个Queue,以轮训的方式投递消息,那么就会导致消息乱序了。

为了保证消息的顺序性,需要把保持顺序性的消息投递到同一个Queue中。

1.2.1 如何保证消息投递的顺序性

RocketMQ提供了MessageQueueSelector接口,可以用来实现自定义的选择投递的消息队列的算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (int i = 0; i < orderList.size(); i++) {
String content = "Hello itzhai.com. Java架构杂谈," + new Date();
Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i,
content.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 订单号与消息队列个数取模,保证让同一个订单号的消息落入同一个消息队列
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());
System.out.printf("content: %s, sendResult: %s%n", content, sendResult);
}

如上图,我们实现了MessageQueueSelector接口,并在实现的select方法里面,指定了选择消息队列的算法:订单号与消息队列个数取模,保证让同一个订单号的消息落入同一个消息队列

image-20211017213318790

有个异常场景需要考虑:假设某一个Master节点挂掉了,导致Topic的消息队列数量发生了变化,那么继续使用以上的选择算法,就会导致在这个过程中同一个订单的消息会分散到不同的消息队列里面,最终导致消息不能顺序消费。

为了避免这种情况,只能选择牺牲failover特性了。

现在投递到消息队列中的消息保证了顺序,那如何保证消费也是顺序的呢?

1.2.2 如何保证消息消费的顺序性?

RocketMQ中提供了MessageListenerOrderly,该对象用于有顺序收异步传递的消息,一个队列对应一个消费线程,使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
consumer.registerMessageListener(new MessageListenerOrderly() {
// 消费次数,用于辅助模拟各种消费结果
AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

return ConsumeOrderlyStatus.SUCCESS;
}
});

如果您使用的是MessageListenerConcurrently,表示并发消费,为了保证消息消费的顺序性,需要设置为单线程模式。

使用MessageListenerOrderly的问题:如果遇到某条消息消费失败,并且无法跳过,那么消息队列的消费进度就会停滞。

1.3 延迟队列(定时消息)

定时消费是指消息发送到Broker之后不会立即被消费,而是等待特定的时间之后才投递到Topic中。定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一个queue只存相同延迟的消息,保证具有相同延迟的消息能够顺序消费。比如,我们设置1秒后把消息投递到topic-itzhai-comtopic,则存储的文件目录如下所示:

image-20211017213559746

Broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

定时消息的副作用:定时消息会在第一次写入Topic和调度写入实际的topic都会进行计数,因此发送数量,tps都会变高。

使用延迟队列的场景:提交了订单之后,如果等待超过约定的时间还未支付,则把订单设置为超时状态。

RocketMQ提供了以下几个固定的延迟级别:

1
2
3
4
5
6
7
public class MessageStoreConfig {
...
// 10个level,level:1~18
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
...
}

level = 0 表示不使用延迟消息。

另外,消息消费失败也会进入延迟队列,消息发送时间与设置的延迟级别和重试次数有关

以下是发送延迟消息的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ScheduledMessageProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 指定该消息在10秒后被消费者消费
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}

1.4 数据完整性与事务消息

通过消息对系统进行解耦之后,势必会遇到分布式系统数据完整性的问题。

1.4.1 实现分布式事务的手段有哪些?

我们可以通过以下手段解决分布式系统数据最终一致性问题:

  • 数据库层面的2PC(Two-phase commit protocol),二阶段提交,同步阻塞,效率低下,存在协调者单点故障问题,极端情况下存在数据不一致的风险。对应技术上的XA、JTA/JTS。这是分布式环境下事务处理的典型模式;
  • 数据库层面的3PC,三阶段提交,引入了参与者超时机制,增加了预提交阶段,使得故障恢复之后协调者的决策复杂度降低,但整体的交互过程变得更长了,性能有所下降,仍旧会存在数据不一致的问题;
  • 业务层面的TCC ,Try - Confirm - Cancel。对业务的侵入较大,和业务紧耦合,对于每一个操作都需要定义三个动作分别对应:Try - Confirm - Cancel,将资源层的两阶段提交协议转换到业务层,成为业务模型中的一部分;
  • 本地消息表;
  • 事务消息;

RocketMQ事务消息(Transactional Message)则是通过事务消息来实现分布式事务的最终一致性。下面看看RocketMQ是如何实现事务消息的。

1.4.2 RocketMQ如何实现事务消息?

如下图:

image-20211017213817767

事务消息有两个流程:

  1. 事务消息发送及提交:
    1. 发送half消息;
    2. 服务端响应half消息写入结果;
    3. 根据half消息的发送结果执行本地事务。如果发送失败,此时half消息对业务不可见,本地事务不执行;
    4. 根据本地事务状态执行Commit或者Rollback。Commit操作会触发生成ConsumeQueue索引,此时消息对消费者可见
  2. 补偿流程:
    5. 对于没有Commit/Rollback的事务消息,会处于pending状态,这对这些消息,MQ Server发起一次回查;
    6. Producer收到回查消息,检查回查消息对应的本地事务的转塔体;
    7. 根据本地事务状态,重新执行Commit或者Rollback。

补偿阶段主要用于解决消息的Commit或者Rollback发生超时或者失败的情况。

half消息:并不是发送了一半的消息,而是指消息已经发送到了MQ Server,但是该消息未收到生产者的二次确认,此时该消息暂时不能投递到具体的ConsumeQueue中,这种状态的消息称为half消息。

1.4.3 RocketMQ事务消息是如何存储的?

发送到MQ Server的half消息对消费者是不可见的,为此,RocketMQ会先把half消息的Topic和Queue信息存储到消息的属性中,然后把该half消息投递到一个专门的处理事务消息的队列中:RMQ_SYS_TRANS_HALF_TOPIC,由于消费者没有订阅该Topic,所以无法消息half类型的消息。

image-20211017213932431

生产者执行Commit half消息的时候,会存储一条专门的Op消息,用于标识事务消息已确定的状态,如果一条事务消息还没有对应的Op消息,说明这个事务的状态还无法确定。RocketMQ会开启一个定时任务,对于pending状态的消息,会先向生产者发送回查事务状态请求,根据事务状态来决定是否提交或者回滚消息。

当消息被标记为Commit状态之后,会把half消息的Topic和Queue相关属性还原为原来的值,最终构建实际的消费索引(ConsumeQueue)。

**RocketMQ并不会无休止的尝试消息事务状态回查,默认查找15次,超过了15次还是无法获取事务状态,RocketMQ默认回滚该消息。**并打印错误日志,可以通过重写AbstractTransactionalMessageCheckListener类修改这个行为。

可以通过Broker的配置参数:transactionCheckMax来修改此值。

1.5 消息重投

如果消息发布方式是同步发送会重投,如果是异步发送会重试。

消息重投可以尽可能保证消息投递成功,但是可能会造成消息重复。

什么情况会造成重复消费消息?

  • 出现消息量大,网络抖动的时候;
  • 生产者主动重发;
  • 消费负载发生变化。

可以使用的消息重试策略:

  • retryTimesWhenSendFailed:设置同步发送失败的重投次数,默认为2。所以生产者最多会尝试发送retryTimesWhenSendFailed+1次。
    • 为了最大程度保证消息不丢失,重投的时候会尝试向其他broker发送消息;
    • 超过重投次数,抛出异常,让客户端自行处理;
    • 触发重投的异常:RemotingException、MQClientException和部分MQBrokerException;
  • retryTimesWhenSendAsyncFailed:设置异步发送失败重试次数,异步重试不会选择其他Broker,不保证消息不丢失;
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。重要的消息可以开启此选项。

oneway发布方式不支持重投。

1.6 批量消息

为了提高系统的吞吐量,提高发送效率,可以使用批量发送消息。

批量发送消息的限制:

  • 同一批批量消息的topic,waitStoreMsgOK属性必须保持一致;
  • 批量消息不支持延迟队列;
  • 批量消息一次课发送的上限是4MB。

发送批量消息的例子:

1
2
3
4
5
6
String topic = "itzhai-test-topic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));
producer.send(messages);

如果发送的消息比较多,会增加复杂性,为此,可以对大消息进行拆分。以下是拆分的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ListSplitter implements Iterator<List<Message>> { 
// 限制最大大小
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
}
}

// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}

1.7 消息过滤

RocketMQ的消费者可以根据Tag进行消息过滤来获取自己感兴趣的消息,也支持自定义属性过滤。

Tags是Topic下的次级消息类型/二级类型(注:Tags也支持TagA || TagB这样的表达式),可以在同一个Topic下基于Tags进行消息过滤。

消息过滤是在Broker端实现的,减少了对Consumer无用消息的网络传输,缺点是增加了Broker负担,实现相对复杂。

2 消费端

2.1 消费模型

消费端有两周消费模型:集群消费和广播消费。

集群消费

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

2.2 消息重试

RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%consumerGroupName的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。

RocketMQ对于重试消息的处理是先保存至Topic名称为SCHEDULE_TOPIC_XXXX的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%consumerGroupName的重试队列中。

比如,我们设置1秒后把消息投递到topic-itzhai-comtopic,则存储的文件目录如下所示:

image-20211017213559746

2.3 死信队列

当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)

在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费


由于RocketMQ是使用Java写的,所以它的代码特别适合拿来阅读消遣,我们继续来看看RocketMQ的源码结构…

不不,还是算了,一下子又到周末晚上了,时间差不多了,今天就写到这里了。有空再聊。


我精心整理了一份Redis宝典给大家,涵盖了Redis的方方面面,面试官懂的里面有,面试官不懂的里面也有,有了它,不怕面试官连环问,就怕面试官一上来就问你Redis的Redo Log是干啥的?毕竟这种问题我也不会。

image-20211007142531823

Java架构杂谈公众号发送Redis关键字获取pdf文件:

image-20211010220323135

References

\

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/mq/rocketmq/features.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。

请帅旋喝一杯咖啡

咖啡=电量,给帅旋充杯咖啡,他会满电写代码!

IT宅

关注公众号及时获取网站内容更新。