RocketMQ学习教程:07.RocketMQ消息查询【云图智联】

在实际开发中,经常需要排查一条消息是否成功发送到底层MQ中,或者查看MQ中消息的内容,以及如何将消息发送给指定的/所有的消费者组重新消费。本文对RocketMQ提供到的查询机制和背后原理进行深入的介绍。文章主要包括4个部分:

  • 消息查询介绍:介绍消息查询中使用到的Message Key 、Unique Key、Message Id 的区别

  • 消息查询工具:分别介绍命令行工具、管理平台、客户端API这三种工具的详细用法,以及如何让消费者重新消费特定的消息。

  • 核心实现原理:介绍Message Key & Unique Key与Message Id的实现机制上区别,Unique Key在Exactly Once语义下的作用,以及为什么Message Id查询效率更高。

  • 索引机制:介绍Message Key & Unique Key底层使用的哈希索引机制

 

1 消息查询介绍

RocketMQ提供了3种消息查询方式:           

  • 按照Message Key 查询:消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。

  • 按照Unique Key查询:除了业务开发同学明确的指定消息中的key,RocketMQ生产者客户端在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。

  • 按照Message Id 查询:Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,并会将Message Id作为发送结果的一部分进行返回。Message Id中属于精确匹配,可以唯一定位一条消息,不需要使用哈希索引机制,查询效率更高。

RocketMQ有意弱化Unique Key与Message Id的区别,对外都称之为Message Id。在通过RocketMQ的命令行工具或管理平台进行查询时,二者可以通用。在根据Unique Key进行查询时,本身是有可能查询到多条消息的,但是查询工具会进行过滤,只会返回一条消息。种种情况导致很多RocketMQ的用户,并未能很好对二者进行区分。

业务开发同学在使用RocketMQ时,应该养成良好的习惯,在发送/消费消息时,将这些信息记录下来,通常是记录到日志文件中,以便在出现问题时进行排查。

以生产者在发送消息为例,通常由以下3步组成:

  1. //1 构建消息对象Message
  2. Message msg = new Message();
  3. msg.setTopic("TopicA");
  4. msg.setKeys("Key1");
  5. msg.setBody("message body".getBytes());
  6. try{
  7.     //2 发送消息
  8.     SendResult result = producer.send(msg);
  9.     
  10.     //3 打印发送结果
  11.     System.out.println(result);
  12. }catch (Exception e){
  13.     e.printStackTrace();
  14. }

第1步:构建消息

构建消息对象Message,在这里我们通过setKeys方法设置消息的key,如果有多个key可以使用空格" "进行分割

第2步:发送消息

发送消息,会返回一个SendResult对象表示消息发送结果。

第3步:打印发送结果

结果中包含Unique Key和Message Id,如下所示:

  1. SendResult [
  2. sendStatus=SEND_OK, 
  3. msgId=C0A801030D4B18B4AAC247DE4A0D0000,
  4. offsetMsgId=C0A8010300002A9F000000000007BEE9,
  5. messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], 
  6. queueOffset=0]

其中:

  • sendStatus:表示消息发送结果的状态       

  • msgId:注意这里的命名虽然是msgId,但实际上其是Unique Key

  • offsetMsgId:Broker返回的Message ID 。在后文中,未进行特殊说明的情况下,Message ID总是表示offsetMsgId。

  • messageQueue:消息发送到了哪个的队列,如上图显示发送到broker-a的第0个的队列 

  • queueOffset:消息在队列中的偏移量,每次发送到一个队列时,offset+1 

事实上,用户主动设置的Key以及客户端自动生成的Unique Key,最终都会设置到Message对象的properties属性中,如下图所示:

下载 (1).png

 

其中:

  • KEYS:表示用户通过setKeys方法设置的消息key,

  • UNIQ_KEY:表示消息发送之前由RocketMQ客户端自动生成的Unique Key。细心的读者发现了其值与上述打印SendResult结果中的msgId字段的值是一样的,这验证了前面所说的msgId表示的实际上就是Unique Key的说法。

在了解如何主动设置Key,以及如何获取RocketMQ自动生成的Unique Key和Message Id后,就可以利用一些工具来进行查询。

 

2 消息查询工具

RocketMQ提供了3种方式来根据Message Key、Unique Key、Message Id来查询消息,包括:

  • 命令行工具:主要是运维同学使用

  • 管理平台:运维和开发同学都可以使用

  • 客户端API:主要是开发同学使用

这些工具除了可以查询某条消息的内容,还支持将查询到的历史消息让消费者重新进行消费,下面分别进行讲述。

2.1 命令行工具

RocketMQ自带的mqadmin命令行工具提供了一些子命令,用于查询消息,如下:

  1. $ sh bin/mqadmin 
  2. The most commonly used mqadmin commands are: 
  3. ...
  4.    queryMsgById         按照Message Id查询消息
  5.    queryMsgByKey        按照Key查询消息 
  6.    queryMsgByUniqueKey  按照UNIQ_KEY查询消息
  7. ...

此外,还有一个queryMsgByOffset子命令,不在本文讲述范畴内

2.1.1 按照Message Key查询

mqadmin工具的queryMsgByKey子命令提供了根据key进行查询消息的功能。注意,由于一个key可能对应多条消息,查询结果只会展示出这些消息对应的Unique Key,需要根据Unique Key再次进行查询。

queryMsgByKey子命令使用方法如下所示:

  1. $ sh bin/mqadmin queryMsgByKey -h
  2. usage: mqadmin queryMsgByKey [-h] -k  [-n ] -t 
  3.  -h,--help                打印帮助信息
  4.  -k,--msgKey         指定消息的key,必须提供
  5.  -n,--namesrvAddr    指定nameserver地址
  6.  -t,--topic          指定topic,必须提供

例如,要查询在TopicA中,key为Key1的消息

  1. $ sh bin/mqadmin queryMsgByKey -k Key1 -t TopicA -n localhost:9876
  2. #Message ID                           #QID         #Offset
  3. C0A80103515618B4AAC2429A6E970000         0               0
  4. C0A80103511B18B4AAC24296D2CB0000         0               0
  5. C0A8010354C418B4AAC242A281360000         1               0
  6. C0A8010354C718B4AAC242A2B5340000         1               1

这里,我们看到输出结果中包含了4条记录。其中:

  • Message ID列:这里这一列的名字显示有问题,实际上其代表的是Unique Key

  • QID列&


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部