实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset有关。本节主要分析Offset的存储位置,以及如何根据需要调整Offset的值。
首先来明确一下Offset的含义,RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个Message Queue(也可以设置成一个),Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。
如图1所示是Offset的类结构,主要分为本地文件类型和Broker代存的类型两种。对于DefaultMQPushConsumer来说,默认是CLUSTERING模式,也就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。
图1 OffsetStore的类结构
在DefaultMQPushConsumer里的BROADCASTING模式下,每个Consumer都收到这个Topic的全部消息,各个Consumer间相互没有干扰,RocketMQ使用LocalFileOffsetStore,把Offset存到本地。
OffsetStore使用Json格式存储,简洁明了,下面是个例子:
代码清单1 Offsetstore的内容示例
{"OffsetTable":{{"brokerName":"localhost", "QueueId":1,"Topic":"broker1" }: 1,{ "brokerName":"localhost", "QueueId":2,"Topic":"broker1" }:2, { "brokerName":"localhost", "QueueId":0, "Topic":"broker1" }:3 } }
在使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的事,但是如果PullConsumer,我们就要自己处理OffsetStore了。在前面文章中PullConsumer示例中,代码里把Offset存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失Offset,在实际应用中不推荐这样做。接下来给出在磁盘存储Offset的示例程序,参照LocalFileOffsetStore的源码编写,如代码清单2所示。
代码清单2 自定义持久存储OffsetStore
public class LocalOffsetStoreExt {
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> OffsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
public LocalOffsetStoreExt(String storePath, String groupName) {
this.groupName = groupName;
this.storePath = storePath;
}
public void load() {
OffsetSerializeWrapper OffsetSerializeWrapper = this.readLocal-Offset();
if (OffsetSerializeWrapper != null && OffsetSerializeWrapper.getOffsetTable() != null) {
OffsetTable.putAll(OffsetSerializeWrapper.getOffsetTable());
for (MessageQueue mq : OffsetSerializeWrapper.getOffsetTable().keySet()) {
AtomicLong Offset = OffsetSerializeWrapper.getOffset-Table().get(mq);
System.out.printf("load Consumer's Offset, {} {} {} \n", this.groupName, mq, Offset.get());
}
}
}
public void updateOffset(MessageQueue mq, long Offset) {
if (mq != null) {
AtomicLong OffsetOld = this.OffsetTable.get(mq);
if (null == OffsetOld) {
this.OffsetTable.putIfAbsent(mq, new AtomicLong(Offset));
} else {
OffsetOld.set(Offset);
}
}
}
public long readOffset(final MessageQueue mq) {
if (mq != null) {
AtomicLong Offset = this.OffsetTable.get(mq);
if (Offset != null) {
return Offset.get();
}
}
return 0;
}
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper OffsetSerializeWrapper = new Offset-SerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.OffsetTable.
entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong Offset = entry.getValue();
OffsetSerializeWrapper.getOffsetTable().put(entry.getKey(), Offset);
}
}
String jsonString = OffsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private OffsetSerializeWrapper readLocalOffset() {
String content = null;
try {
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
e.printStackTrace();
}
if (null == content || content.length() == 0) {
return null;
} else {
OffsetSerializeWrapper OffsetSerializeWrapper = null;
try {
OffsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, Offset-SerializeWrapper.class);
} catch (Exception e) {
e.printStackTrace();
}
return OffsetSerializeWrapper;
}
}
}
了解OffsetStore的存储机制以后,我们看看如何设置Consumer读取消息的初始位置。DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer.setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。注意设置读取位置不是每次都有效,它的优先级默认在Offset Store后面,比如在DefaultMQPushConsumer的BROADCASTING方式下,默认是从Broker里读取某个Topic对应ConsumerGroup的Offset,当读取不到Offset的时候,ConsumeFromWhere的设置才生效。大部分情况下这个设置在Consumer Group初次启动时有效。如果Consumer正常运行后被停止,然后再启动,会接着上次的Offset开始消费,ConsumeFromWhere的设置无效。