网易乐得技术团队

rocketMQ按key查询问题分析

按key查询问题排查分析

问题表现

通过mqadmin脚本使用key来查询message按照脚本提示,执行以下命令:

1
2
3
[a@localhost bin]$ sh mqadmin queryMsgByKey -n 127.0.0.1:9876 -k KEY95 -t TopicTestjjj
#Message ID #QID #Offset
0AEA60AC2F5473D16E938C72F9E9005F 5 9

获得messageId,再调用按照ID查询message脚本:

1
2
3
4
5
6
[a@localhost bin]$ sh mqadmin queryMsgById -i 0AEA60AC2F5473D16E938C72F9E9005F -n 127.0.0.1:9876
com.alibaba.rocketmq.client.exception.MQClientException: CODE: 208 DESC: query message by id finished, but no message.
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
at com.alibaba.rocketmq.client.impl.MQAdminImpl.viewMessage(MQAdminImpl.java:257)
at com.alibaba.rocketmq.tools.admin.DefaultMQAdminExtImpl.viewMessage(DefaultMQAdminExtImpl.java:895)
at com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt.viewMessage(DefaultMQAdminExt.java:124)

网上查到的资料也都是用这种方法来查询,但在当前环境中是这种错误
搭建的管理后台按key查询也是无结果
两种方式查都没结果,这里就比较悲剧了,对于一个消息,消息的key是唯一能作为客户端指定的信息来查询结果的,无法查询,就意味着无法追踪到消息,这是不能不解决的一个问题。
两种方式(官方脚本/管理后台)都查不到,mqadmin脚本是官方自带的。管理后台注明支持的版本号比目前咱们用的版本号低,问题可能会比较复杂,遂决定先解决官方自带脚本的问题。

问题分析解决ID问题

根据错误提示是无法查找到消息,但消息客户端反馈已经插入成功,这里比较奇怪,查看源码MQAdminImpl报错位置:

1
2
3
4
5
6
7
8
9
10
11
public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MessageId messageId = null;
try {
messageId = MessageDecoder.decodeMessageId(msgId);
} catch (Exception e) {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); //这里抛出的错误
}
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
messageId.getOffset(), timeoutMillis);
}

错误信息经过封装,其实不能完全反馈错误,将messageId = MessageDecoder.decodeMessageId(msgId); 单独拿出来执行,确认报错为:

1
2
3
4
5
Exception in thread "main" java.lang.IllegalArgumentException: port out of range:794063825
at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
at java.net.InetSocketAddress.<init>(InetSocketAddress.java:188)
at com.alibaba.rocketmq.common.message.MessageDecoder.decodeMessageId(MessageDecoder.java:80)
at com.alibaba.rocketmq.example.ordermessage.Consumer.main(Consumer.java:71)

可以看出这个报错是从msgId解析出port异常导致,正常端口号范围最大为65535,该值非法,也就是说这个messageId应该是不对的
附decodeMessageId源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
SocketAddress address;
long offset;
byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
ByteBuffer bb = ByteBuffer.wrap(port);
int portInt = bb.getInt(0);
address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
bb = ByteBuffer.wrap(data);
offset = bb.getLong(0);
return new MessageId(address, offset);
}

发送消息中客户端设置的key为消息的标识,目前rocketmq中消息不是很多,可以使用脚本获得:

1
2
[appuser@localhost bin]$ sh mqadmin printMsg -n "127.0.0.1:9876" -t TopicTestjjj -s TagA -c UTF-8 | grep KEY95
MSGID: 0AEA60AC2F5473D16E938C72F9E9005F MessageExt [queueId=5, storeSize=193, queueOffset=9, sysFlag=0, bornTimestamp=1477650745321, bornHost=xxx, storeTimestamp=1477650744167, storeHost=xxx, msgId=0A78F13600002A9F00000000000039FD, commitLogOffset=14845, bodyCRC=859062417, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTestjjj, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, KEYS=KEY95, UNIQ_KEY=0AEA60AC2F5473D16E938C72F9E9005F, WAIT=true, TAGS=TagA}, body=17]] BODY: Hello RocketMQ 95

返回中的相关信息:

1
2
3
msgId=0A78F13600002A9F00000000000039FD
KEYS=KEY95
UNIQ_KEY=0AEA60AC2F5473D16E938C72F9E9005F

可以看出之前queryMsgByKey返回表格中的#Message ID字段并不是msgId,而是UNIQ_KEY,这也就是端口号超限问题的原因,调用queryMsgById传的并不是msgid而是UNIQ_KEY
同时分析queryMsgByKey源码也可以获得返回有可能是UNIQ_KEY而不是msgId:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s%n",//
"#Message ID",//
"#QID",//
"#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d%n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
}
//MessageExt的子类MessageClientExt中的实现:
@Override
public String getMsgId() {
String uniqID = MessageClientIDSetter.getUniqID(this);
if (uniqID == null) {
return this.getOffsetMsgId();
}
else {
return uniqID;
}
}

问题分析解决UNIQ_KEY问题

既然已经搞清楚目前返回的是UNIQ_KEY,那么按照UNIQ_KEY查询呢,正好mqadmin脚本里有queryMsgByUniqueKey可以使用,执行来看看:

1
2
3
4
5
6
7
8
9
[a@localhostbin]$ sh mqadmin queryMsgByUniqueKey -i 0AEA60AC2F5473D16E938C72F9E9005F -n 127.0.0.1:9876 -t TopicTestjjj
com.alibaba.rocketmq.client.exception.MQClientException: CODE: 208 DESC: query message by key finished, but no message.
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
at com.alibaba.rocketmq.client.impl.MQAdminImpl.queryMessage(MQAdminImpl.java:416)
at com.alibaba.rocketmq.client.impl.MQAdminImpl.queryMessageByUniqKey(MQAdminImpl.java:270)
at com.alibaba.rocketmq.tools.admin.DefaultMQAdminExtImpl.viewMessage(DefaultMQAdminExtImpl.java:259)
at com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt.viewMessage(DefaultMQAdminExt.java:434)
at com.alibaba.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.queryById(QueryMsgByUniqueKeySubCommand.java:85)
at com.alibaba.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand.execute(QueryMsgByUniqueKeySubCommand.java:198)

墨菲定律往往这时候得到证明,又报错了,还是查不到消息,这时候继续从源码入手,先来看看DefaultMQAdminExtImpl.java:259所在的方法:

1
2
3
4
5
6
7
8
9
10
@Override
public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
} catch (Exception e) {
log.warn("the msgid maybe created by new client", e);
}
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId); //这里抛错
}

从这里log的warn报警来看,UNIQ_KEY为较新版本新加的一种改良,而程序这里对这也做了兼容处理。这里也得吐槽下脚本执行queryMsgByKey输出信息的误导,#Message ID简直坑爹
MQAdminImpl.java:270的代码如下:

1
2
3
4
5
6
7
8
9
10
11
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
}
else {
return null;
}
}

查看queryMessage的定义:

1
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey)

以及
queryMessage中的报错位置:

1
2
3
4
5
if (!messageList.isEmpty()) {
return new QueryResult(indexLastUpdateTimestamp, messageList);
} else {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
}

以及构造查询中的参数部分:

1
2
3
4
5
6
QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
requestHeader.setEndTimestamp(end);

可以看出查询消息,除了指定的topic、uniquekey还有maxNum、begin、end,即最大数量,开始时间,结束时间三个值,最大数量根据queryMessage调用为32,开始时间为MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000,结束时间为Long.MAX_VALUE。
这里查不出来消息感觉上很可能是时间范围的错误,这时需要确认一下MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime()究竟是什么时间,将该段代码在eclipse中执行确认:

1
2
3
4
Date nearlyTimeFromID = MessageClientIDSetter.getNearlyTimeFromID("0AEA60AC2F5473D16E938C72F9E9005F");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS");
System.out.println(sdf.format(nearlyTimeFromID));
System.out.println(nearlyTimeFromID.getTime());

输出:

1
2
2016-10-28 18:32:25:321
1477650745321

这个时间有没有问题,又是什么时间,注意到之前message里的输出包含的两个时间:

1
bornTimestamp=1477650745321, bornHost=xxx, storeTimestamp=1477650744167, storeHost=xxx

对这两个时间转化成比较好识别的时间格式:

1
2
bornTimestamp=1477650745321, bornHost=xxx => 2016-10-28 18:32:25:321
storeTimestamp=1477650744167, storeHost=xxx => 2016-10-28 18:32:24:167

可以看出MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime()就是bornTimestamp,这里storeTimestamp比bornTimestamp少了1.1秒多,即这个时间比MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000还要小,推测不包括在:

1
2
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);

的查询范围内导致的无结果返回。而这个查询合理猜测应该是判断的是storeTimestamp,正常理解下bornTimestamp应该比storeTimestamp更早才对,这里代码中又对bornTimestamp减去了1s,正常来讲是不会有问题的,那么问题可能在哪?
这些数据是在自己电脑上生成的,bornTimestamp会不会是存的客户端本机生成时间呢,经过确认我自己的电脑确实比测试服务器快1s左右。将我本机电脑时间调慢3s,再插入一条数据确认bornTimestamp,确认该时间确实是本机电脑时间,即客户端时间。对于新插入的这条数据再按key查询就能正常查到了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[a@localhostbin]$ sh mqadmin queryMsgByKey -n 127.0.0.1:9876 -k theone -t TopicTestjjj
#Message ID #QID #Offset
0AEA60AA306073D16E930688FA710000 7
[a@localhost bin]$ sh mqadmin queryMsgByUniqueKey -i 0AEA60AA306073D16E930688FA710000 -n 127.0.0.1:9876 -t TopicTestjjj
Topic: TopicTestjjj
Tags: [TagA]
Keys: [theone]
Queue ID: 7
Queue Offset: 10
CommitLog Offset: 168449
Reconsume Times: 0
Born Timestamp: 2016-11-02 18:27:20,307
Store Timestamp: 2016-11-02 18:27:23,958
Born Host: xxx
Store Host: xxx
System Flag: 0
Properties: {KEYS=theone, UNIQ_KEY=0AEA60AA306073D16E930688FA710000, WAIT=true, TAGS=TagA}
Message Body Path: /tmp/rocketmq/msgbodys/0AEA60AA306073D16E930688FA710000
MessageTrack [consumerGroup=please_rename_unique_group_name_3, trackType=NOT_ONLINE, exceptionDesc=CODE:206 DESC:the consumer group[please_rename_unique_group_name_3] not online]

同时管理后台里也能正常查询了,看来都是这个时间戳问题导致的。

问题思考

这里rocketmq在这个逻辑的处理中用到了客户端时间,并且依赖客户端时间做了判断导致无法查询到具体的message,这里感觉不是特别好,整个查询过程做了一个假设和一个不恰当的判断:

  1. 假设客户端时间和服务器端时间是一致的,来确保bornTimestamp-1000ms<storeTimestamp
  2. 判断storeTimestamp,又使用bornTimestamp,而不直接使用storeTimestamp,即利用了第一个假设
    第一个假设并不能确保一定成立,服务器端程序应该健壮来避免这种假设;第二个判断中对一个变量storeTimestamp的判断,又使用了另一个变量bornTimestamp,这里感觉也不太好,也正是这里的判断使得问题暴露了出来
    在程序逻辑中,应该尽量避免这些问题,最好不对客户端情况做出假设,变量判断最好也不要引入额外变量使得问题复杂化
    最后顺便提一下,rocketmq这里的相关逻辑处理,只要系统时间差距不在1s以上就不会有问题,线上应用都应该使用对时服务来确保服务器时间差距不会在1s以上