+
Skip to content

zhaoli2333/rocketmq

Repository files navigation

RocketMq支持任意延迟的延时消息方案

主要特性

支持任意延迟的延时消息,精确到秒,最长延迟时间为1年。基于Apache RocketMQ 4.5.2版本。

使用方法

配置broker.conf

segmentScale=60  ##每个时间桶的时间范围,单位分钟,默认60,可选值0-60之间,延迟消息并发数越高,配置的值应该越小,如:10
dispatchLogKeepTime=72 ##dispatchLog过期后保存的小时数,默认72

producer

	DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        for (int i = 0; i < 100000; i++) {
            try {

                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

		/*
                 * 设置延迟时间,单位:秒
                 */
                msg.setDelayTime(5);
                
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();


consumer

和普通 consumer相同

设计思路

延时消息设计思路主要来自于去哪儿网的qmq,使用两层hash wheel来实现。第一层位于磁盘上,默认每个小时为一个刻度,可以根据实际情况在配置里进行调整(segmentScale)。每个刻度会生成一个日志文件(schedulelog),支持一年内的延迟消息,因此最多会生成8784个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(索引包括消息在schedulelog中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度。之所以存索引而不存消息本身是因为当消息并发量很大时把1小时的消息放到内存消耗太大不可接受,所以当投递时间到达时,需要根据消息索引再去schedulelog中回查出消息本身,再投递。

整体架构

主要文件

schedulelog:按照投递时间组织,文件名就是投递的时间区间(如2020020213)。写schedulelog的入口是在DefaultMessageStore的dispatcher链中新增了一环,当发现是延迟消息时,会修改原始消息的topic为“DELAY_TOPIC_XXXX”,且跳过consumequeue文件,直接写schedulelog。schedulelog里是包含完整的消息内容的,所以commitlog过期被删除也不影响延迟消息的投递。当延迟时间到达时再重新修改topic,投递到原本的commitlog和consumequeue。

dispatchlog:在延时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatchlog里写入的是消息在schedulelog的offset,不包含消息内容。其用途是当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递,没投递的则扔进内存的hash wheel准备投递。

schedule_offset_checkpoint:在broker正常退出时写入,主要记录每个schedulelog文件的当前写入位置wrotePosition和已写入的最新消息在commitlog中的位移maxCommitLogOffset。前者主要用于broker重启后快速校验schedulelog文件的完整性。后者是由于rocketmq broker重启后的消息重放任务ReputService会重放大量已投递过的消息,因此写schedulelog前必须做去重,去重方式就是如果收到的消息的commitlog offest小于等于maxCommitLogOffset,则表明是重复消息,无需再写。

主要修改点

MessageStoreConfig

扩展了broker.conf配置文件,新增了延迟消息相关配置

DefaultMessageStore

新增CommitLogDispatcherBuildDelayLog用于转发延迟消息保存任务。

org.apache.rocketmq.store

新增delay目录,提供delay log的存储和消费进度保存等功能。

About

RocketMq支持任意延迟时间的延时消息

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 165

Languages

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载