RocketMQ的消费模式

RocketMQ 是基于发布订阅模型的消息中间件。所谓的发布订阅就是说,consumer 订阅了 broker 上的某个 topic,当 producer 发布消息到 broker 上的该 topic 时,consumer 就能收到该条消息。RocketMQ默认支持两种消费模式,分别是集群消费模式和广播消费模式,默认集群消费模式。

集群

MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。

消费同一类消息的多个 consumer 实例组成一个消费者组,也可以称为一个 consumer 集群,这些 consumer 实例使用同一个 group name。需要注意一点,除了使用同一个 group name,订阅的 tag 也必须是一样的,只有符合这两个条件的 consumer 实例才能组成 consumer 集群。

集群消费模式

当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

集群消费模式
1
2
3
4
5
6
7
8
9
10
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  

// 同样也要设置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9876");
// 默认就是集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
适用场景&注意事项
  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到consumer集群内任意一个的consumer实例消费处理,如果需要被集群下所有的consumer实例消费处理,请使用广播模式。
  • 集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一个consumer实例上,因此处理消息时不应该做任何确定性假设。
广播消费模式

当使用广播消费模式时,MQ 会将每条消息推送给集群内所有消费者,保证消息至少被每个消费者消费一次。

广播消费模式
1
2
3
4
5
6
7
8
9
10
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  

// 同样也要设置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9876");
// 设置为广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
适用场景&注意事项
  • 每条消息都需要被consumer集群内所有的consumer 实例消费一次,也就是说每条消息至少被每一个consumer 实例消费一次。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,MQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以服务端不提供堆积查询和报警功能。

目前仅 Java 客户端支持广播模式。

虽然广播消费能保证集群内每个consumer实例都能消费消息,但是消费进度的维护、不具备消息重投的机制大大影响了实际的使用。因此,在实际使用中,更推荐使用集群消费,因为集群消费不仅拥有消费进度存储的可靠性,还具有消息重投的机制。而且,我们通过集群消费也可以达到广播消费的效果。

使用集群模式模拟广播模式

有些场景希望一个消息需要多个消费者消费,并且也希望消费者消费失败,消息能够重新投递。

集群模式模拟广播消费

订单支付完成通知下游系统,积分服务扣减订单使用的积分、优惠券服务将优惠券状态更新为已使用、库存服务扣减商品库存、仓库准备发货等等

适用场景&注意事项
  • 每条消息都需要被多个消费者处理,每个消费者的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。

如果业务上确实需要使用广播消费,那么我们可以通过创建多个 consumer 实例,每个 consumer 实例属于不同的 consumer group,但是它们都订阅同一个 topic。

举个例子,我们创建 4 个 consumer 实例,consumer1(属于consumerGroup1)、consumer2(属于consumerGroup 2)、consumer 3(属于consumerGroup3)和 consumer4(属于consumerGroup4),它们都订阅了 topicA ,那么当 producer 发送一条消息到 topic A 上时,由于3个consumer 属于不同的 consumer group,所以 3 个consumer都能收到消息,也就达到了广播消费的效果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

/**
* 设置不同的消费组名,实现集群模式模拟广播消费
* @param consumerGroup 消费组名
* @throws Exception
*/
public static void consumerMessage(String consumerGroup) throws Exception {

if (StringUtils.isBlank(consumerGroup)){
consumerGroup = "unique_group_name";
}
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// Specify name server addresses.
consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");

// 重要:设置消费者消息最大重试次数
//consumer.setMaxReconsumeTimes(5);
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {

int reconsumeTimes = message.getReconsumeTimes();
String msgId = message.getMsgId();

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = simpleDateFormat.format(new Date());
String messageContext = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消费时间:" + format + " ,消息消费次数=" + reconsumeTimes + ", msgId=" + msgId + ", 消息内容:" + messageContext);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

//Launch the consumer instance.
consumer.start();
}

除此之外,每个 consumer 实例的消费逻辑可以一样也可以不一样,每个consumer group还可以根据需要增加 consumer 实例,比起广播消费来说更加灵活。

  • 作者: Sam
  • 发布时间: 2021-01-18 23:32:39
  • 最后更新: 2021-01-18 23:41:25
  • 文章链接: https://ydstudios.gitee.io/post/122ee231.html
  • 版权声明: 本网所有文章除特别声明外, 禁止未经授权转载,违者依法追究相关法律责任!