跟我学RocketMQ之消息消费源码解析(1)

news/2024/7/15 16:53:59 标签: java, 人工智能, netty

本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。

本章节重点讲解DefaultMQPushConsumer的代码逻辑。

DefaultMQPushConsumer使用样例

按照惯例还是先看一下DefaultMQPushConsumer的使用样例。

    @PostConstruct
    public void init() {
        defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
        defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
        // 从头开始消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 消费模式:集群模式
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册监听器
        defaultMQPushConsumer.registerMessageListener(messageListener);
        // 订阅所有消息
        try {
            defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
        }
        LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
    }复制代码

初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。

    @Component(value = "notifySendListenerImpl")
    public class NotifySendListenerImpl implements MessageListenerConcurrently {复制代码

        ...省略部分代码...复制代码
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {复制代码
            try {
                for (MessageExt msg : msgs) {
                    // 消息解码
                    String message = new String(msg.getBody());
                    // 消费次数
                    int reconsumeTimes = msg.getReconsumeTimes();
                    String msgId = msg.getMsgId();
                    String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;复制代码
                    LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
                    // 请求组装
                    OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
                    protocol.decode(message);
                    // 参数加签,获取用户privatekey
                    String privateKey = protocol.getPrivateKey();
                    String notifyUrl = protocol.getMerchantNotifyUrl();
                    String purseId = protocol.getPurseId();
                    ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
                    chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
                            .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
                            .setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
                            .setPlat_orderid(protocol.getOrderId())
                            .setSign(chargeNotifyRequest.sign(privateKey));
                    LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-订单结果通知入参:{},{}", chargeNotifyRequest.toString(), logSuffix);
                    // 通知发送
                    return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
                }
            } catch (Exception e) {
                LOGGER.error("[通知发送消息消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }复制代码

上面就是一个较为标准的在spring框架中使用RocektMQ的DefaultMQPushConsumer进行消费的主流程。

接下来我们重点分析一下源码实现。

初始化DefaultMQPushConsumer

首先看一下DefaultMQPushConsumer的初始化过程。

进入DefaultMQPushConsumer.java类,查看构造方法:

    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }复制代码

调用了它的同名构造,采用AllocateMessageQueueAveragely策略(平均散列队列算法)

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }复制代码

可以看到实际初始化是通过DefaultMQPushConsumerImpl实现的,DefaultMQPushConsumer持有一个defaultMQPushConsumerImpl的引用。

    [DefaultMQPushConsumerImpl.java]
    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
        // 初始化DefaultMQPushConsumerImpl,将defaultMQPushConsumer的实际引用传入
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        // 传入rpcHook并指向本类的引用
        this.rpcHook = rpcHook;
    }
复制代码

注册消费监听MessageListener

我们接着看一下注册消费监听器的流程。

消费监听接口MessageListener有两个具体的实现,分别为

    MessageListenerConcurrently     -- 并行消费监听
    MessageListenerOrderly          -- 顺序消费监听复制代码

本文以MessageListenerConcurrently为主要讲解的对象。

查看MessageListenerConcurrently的注册过程。

    @Override
    public void registerMessageListener(
                MessageListenerConcurrently messageListener) {
        // 将实现指向本类引用
        this.messageListener = messageListener;
        // 进行真实注册
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }复制代码

接着看defaultMQPushConsumerImpl.registerMessageListener

    DefaultMQPushConsumerImpl.java
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }复制代码

可以看到DefaultMQPushConsumerImpl将真实的messageListener实现指向它本类的messageListener引用。

订阅topic

接着看一下订阅topic的主流程。

topic订阅主要通过方法subscribe实现,首先看一下DefaultMQPushConsumer的subscribe实现

    @Override
    public void subscribe(String topic, String subExpression) 
                                        throws MQClientException {
        this.defaultMQPushConsumerImpl
            .subscribe(withNamespace(topic), subExpression);
    }复制代码

可以看到是调用了DefaultMQPushConsumerImpl的subscribe方法。

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            // 构建主题的订阅数据,默认为集群消费
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            // 将topic的订阅数据进行保存
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                // 如果MQClientInstance不为空,则向所有的broker发送心跳包,加锁
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }复制代码

看一下buildSubscriptionData代码逻辑

    [FilterAPI.java]
    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
        String subString) throws Exception {
        // 构造一个SubscriptionData实体,设置topic、表达式(tag)
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);复制代码
        // 如果tag为空或者为"*",统一设置为"*",即订阅所有消息
        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            // tag不为空,则先按照‘|’进行分割
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                // 遍历tag表达式数组
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            // 将每个tag的值设置到tagSet中
                            subscriptionData.getTagsSet().add(trimString);
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                // tag解析异常
                throw new Exception("subString split error");
            }
        }
        return subscriptionData;
    }复制代码

看一下sendHeartbeatToAllBrokerWithLock代码逻辑

    [MQClientInstance.java]
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                // 发送心跳包
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    }复制代码

可以看到,同步发送心跳包给所有的broker,而该过程是通过RemotingClient统一实现的,通过调用RemotingClient.invokeSync实现心跳包的发送,底层是通过Netty实现的。具体细节本文不进行展开。

启动消费客户端

上述初始化流程执行完毕之后,通过start()方法启动消费客户端。

    @Override
    public void start() throws MQClientException {
        // 设置消费者组
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 启动消费客户端
        this.defaultMQPushConsumerImpl.start();
        // trace处理逻辑
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }复制代码

关于trace的处理逻辑,本文不再展开,感兴趣的同学可以移步 跟我学RocketMQ之消息轨迹实战与源码分析

接着看defaultMQPushConsumerImpl.start()方法逻辑

    [DefaultMQPushConsumerImpl.java]
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={},
                 isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;复制代码

首次启动后,执行配置检查,该方法为前置校验方法,主要进行消费属性校验。

                this.checkConfig();复制代码

将订阅关系配置信息进行复制

                this.copySubscription();复制代码

如果当前为集群消费模式,修改实例名为pid

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }复制代码

创建一个新的MQClientInstance实例,如果已经存在直接使用该存在的MQClientInstance

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);复制代码

为消费者负载均衡实现rebalanceImpl设置属性

                // 设置消费者组
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                // 设置消费模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 设置队列分配策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                // 设置当前的MQClientInstance实例
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
复制代码
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                // 注册消息过滤钩子
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);复制代码

处理offset存储方式

                // offsetStore不为空则使用当前的offsetStore方式
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    // 否则根据消费方式选择具体的offsetStore方式存储offset
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        // 如果是广播方式,则使用本地存储方式
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        // 如果是集群方式,则使用远端broker存储方式存储offset
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                 // 加载当前的offset
                this.offsetStore.load();复制代码

根据MessageListener的具体实现方式选取具体的消息拉取线程实现。

                // 如果是MessageListenerOrderly顺序消费接口实现
                // 消息消费服务选择:ConsumeMessageOrderlyService(顺序消息消费服务)
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } 
                // 如果是MessageListenerConcurrently并行消息消费接口实现
                // 消息消费服务选择:ConsumeMessageConcurrentlyService(并行消息消费服务)
                else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }复制代码

选择并初始化完成具体的消息消费服务之后,启动消息消费服务。consumeMessageService主要负责对消息进行消费,它的内部维护了一个线程池。

                // 启动消息消费服务
                this.consumeMessageService.start();复制代码

接着向MQClientInstance注册消费者,并启动MQClientInstance。这里再次强调

一个JVM中所有消费者、生产者持有同一个MQClientInstance,且MQClientInstance只会启动一次

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }复制代码
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;复制代码

如果MQClientInstance已经启动,或者已经关闭,或者启动失败,重复调用start会报错。这里也能直观的反映出:MQClientInstance的启动只有一次

            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }复制代码

启动完成执行后续收尾工作

        // 订阅关系改变,更新Nameserver的订阅关系表
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        // 检查客户端状态
        this.mQClientFactory.checkClientInBroker();
        // 发送心跳包
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 唤醒执行消费者负载均衡
        this.mQClientFactory.rebalanceImmediately();
    }
复制代码

copySubscription(),消息重试topic处理逻辑

消费者启动流程较为重要,我们接着对其中的重点方法展开讲解。这部分内容可以暂时跳过,不影响对主流程的把控。

我们研究一下copySubscription方法的实现细节。

    [DefaultMQPushConsumerImpl.java]
    private void copySubscription() throws MQClientException {
        try {复制代码
            // 首先获取订阅信息
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }复制代码
            // 为defaultMQPushConsumer设置具体的MessageListener实现
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }复制代码

根据消费类型选择是否进行重试topic订阅

            switch (this.defaultMQPushConsumer.getMessageModel()) {复制代码
                // 如果是广播消费模式,则不进行任何处理,即无重试
                case BROADCASTING:
                    break;复制代码
                // 如果是集群消费模式,订阅重试主题消息
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }复制代码

如果是集群消费模式,会订阅重试主题消息

获取重试topic,规则为 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:"%RETRY%"+消费组名

为重试topic设置订阅关系,订阅所有的消息;

消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。

小结

到此,我们就DefaultMQPushConsumer的初始化、启动、校验以及topic订阅、重试等代码实现细节进行了较为详细的讲解。

下一章节,我将带领读者对消息消费线程 consumeMessageService 的实现进行分析,我们下篇文章见。


版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

转载于:https://juejin.im/post/5d5522f2518825378d5d68f8


http://www.niftyadmin.cn/n/1151552.html

相关文章

使用Nginx负载均衡搭建高性能.NETweb应用程序一

一、遇到的问题 当我们用IIS服务器部署了一个web应用以后&#xff0c;当很多用户高并发访问的时候&#xff0c;客户端响应就会很慢&#xff0c;客户的体验就会很差&#xff0c;由于IIS接受到客户端请求的 时候&#xff0c;就会创建一个线程&#xff0c;当线程达到几千个时候&am…

【writing-mode与absolute,auto】垂直居中

实现垂直方向margin&#xff1a;auto居中 writing-mode:vertical-lr;改变垂直方向 &#xff0c;从而使类son垂直居中&#xff0c;<!DOCTYPE html> <html><head><meta charset"utf-8" /><title></title><style type"text…

CCF-CIDR合并 java代码(仅供参考)修改

题目&#xff1a; 试题编号&#xff1a; 201812-3试题名称&#xff1a; CIDR合并时间限制&#xff1a; 1.0s内存限制&#xff1a; 512.0MB问题描述&#xff1a; 样例输入 212 样例输出 1.0.0.0/82.0.0.0/8 样例输入 210/910.128/9 样例输出 10.0.0.0/8 样例输入 20/1128/1 样例…

delete指针以后应赋值为NULL——QT deletelater指针以后也同样要马上赋值为NULL

delete p后&#xff0c;只是释放了指针指向的内存空间。p并不会自动被置为NULL&#xff0c;而且指针还在&#xff0c;同时还指向了之前的地址 delete NULL编译器不会报错&#xff08;因为delete空指针是合法的&#xff09; 例&#xff1a; 对一个非空指针delete后&#xff0c;若…

面包屑导航栏控件----------WinForm控件开发系列

/// <summary>/// 导航导航栏控件/// </summary>[ToolboxItem(true)][DefaultProperty("Items")][DefaultEvent("NavigationItemClick")][Description("导航导航栏控件")]public partial class NavigationBarExt : Control{public d…

Apache Spark简单介绍、安装及使用

Apache Spark简介 Apache Spark是一个高速的通用型计算引擎&#xff0c;用来实现分布式的大规模数据的处理任务。 分布式的处理方式可以使以前单台计算机面对大规模数据时处理不了的情况成为可能。 Apache Spark安装及配置&#xff08;OS X下的Ubuntu虚拟机&#xff09; 学习…

走马灯图片轮播控件----------WinForm控件开发系列

控件的播放原理在动画播放前计算好要播放特图片的信息。动画过程是利用定时器更新图片的信息。 控件播放界面显示的图片可以是一张或多张&#xff0c;每次移动只有一张滑动方的方向一共有四种。 有些人会看到 Images属性的类型是自定义DisplayImageCollection &#xff0c;而Di…

Spring Struts Hibernate的工作流程

2019独角兽企业重金招聘Python工程师标准>>> 其实这个知识点已经在我的博客&#xff1a;Spring必备知识中提到了&#xff0c;但是比较分散&#xff0c;这里整理出单独一篇文章。 Spring Web MVC 处理Http请求的大致过程&#xff1a; 一旦Http请求到来&#xff0c;D…