如何采集网站文章私人浏览器
文章目录
- 1. 前言
- 2. 消费者启动
- 2.1 DefaultMQPushConsumer#start 启动消费者
- 2.2 DefaultMQPushConsumerImpl#start
- 2.2.1 checkConfig 检查消费者配置
- 2.2.2 copySubscription 拷贝订阅关系
- 2.2.3 加载消费偏移量
- 2.2.4 更新消费者订阅 topic 路由信息
- 2.2.5 checkClientInBroker 校验 SQL92 表达式
- 2.2.6 broker 处理 CHECK_CLIENT_CONFIG 请求
- 3. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
上面几篇文章我们探讨了生产者的启动和发送消息的源码,这篇文章开始我们就来看下消费者的启动源码,也开始消费者的源码分析。
2. 消费者启动
在上面文章《同步、异步、单向发送消费消息》中我们也讲述了生产者如何发送消息,消费者如何消费消息,当然在文章中我们还是以 Push 模式为主,那所谓的消费者 Push 模式就是服务端主动推送消息给客户端,其实说是服务端推送,实际上 Push 就是 Pull 的一层封装, Pull 模式是用户自己去拉取消息,Push 模式就是客户端帮用户控制拉取频率,消费者需要做的就是消费,当然了这样坏处就是客户端如果没有做好流控,就有可能导致消息堆积导致系统崩溃,消费者的源码还是以 Push 为主,也就是 DefaultMQPushConsumer。
2.1 DefaultMQPushConsumer#start 启动消费者
/*** 启动消费者** @throws MQClientException if there is any client error.*/
@Override
public void start() throws MQClientException {// 根据命名空间设置消费者组setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));// 默认消费者启动this.defaultMQPushConsumerImpl.start();if (null != traceDispatcher) {try {// 消息轨迹追踪服务启动traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}
启动消费者的逻辑和生产者差不多,直接看 defaultMQPushConsumerImpl 的 start 方法。
2.2 DefaultMQPushConsumerImpl#start
/**
* 消费者启动
* @throws MQClientException
*/
public synchronized void start() throws MQClientException {// 根据不同状态走不同逻辑switch (this.serviceState) {// 消费服务刚刚创建出来就是这个状态case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());// 首先修改下服务状态为 START_FAILED, 等启动成功之后再修改服务状态为 RUNNINGthis.serviceState = ServiceState.START_FAILED;// 校验消费者配置this.checkConfig();// 拷贝订阅关系this.copySubscription();// 如果是集群消费模式,如果 instanceName 为默认值 "DEFAULT",那么改成 UtilAll.getPid() + "#" + System.nanoTime()// 简单来说就是集群模式下消费者的实例名称修改成进程 PID#当前时间, 也就是永远不会重复, 这玩意就是用来构建 clientID 的, 而// clientID 和消费者的消息队列分配有关, 一个 topic 会将消息队列分配给对应集群下面的消费者, 根据不同的策略进行分配, 如果是// 广播模式, 由于每一个消费者都会消费同一条消息, 自然就不需要考虑这些, clientID 有大用, 这里重点计下if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 获取 MQClientInstance,在里面会根据 clientId 获取或者创建 MQClientInstance 实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);// 设置负载均衡相关的属性: 消费者组、消费模式、分配逻辑、消费者客户端this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);// 消息拉取服务, 在里面去进行消息拉取以及处理消息拉取的结果this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);// 根据消息模式设置不同的 OffsetStore,用于实现消费者的消息消费偏移量 offset 的管理if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式会将消费者的消费进度存储到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:// 如果是集群模式消费者的消费进度会存储到 broker 上面this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}// 设置消费进度管理类 OffsetStorethis.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}// 加载消费偏移量,LocalFileOffsetStore 会加载本地磁盘中的数据,RemoteBrokerOffsetStore 没有实现, RemoteBrokerOffsetStore// 的消费偏移量是存储到 broker 的, 所以不会在这里加载this.offsetStore.load();// 如果消费者的监听器是 MessageListenerOrderly, 说明这个消费者是顺序消费, 那么消费的逻辑就通过 ConsumeMessageOrderlyService 完成if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 如果消费者的监听器是 MessageListenerConcurrently, 说明这个消费者是并发消费, 那么消费的逻辑就通过 ConsumeMessageConcurrentlyService 完成this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// 启动消息消费服务this.consumeMessageService.start();// 将当前消费者注册到 MQClientInstance 的 consumerTable 中,key 是消费者组,这里也透露出一件事,一个进程里面的消费// 者必须都是不同消费者组的boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {// 这里就是发现了有多个 DefaultMQConsumerImpl 往一个消费者组里面配置this.serviceState = ServiceState.CREATE_JUST;// 关闭消费服务this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());// 抛出异常throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 启动 MQClientInstance 客户端实例mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());// 设置消费者服务状态为 RUNNINGthis.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确this.mQClientFactory.checkClientInBroker();// 发送心跳信息给 broker, 当消费者把自己的信息上报给 broker 之后, broker 会通知其他消费者进行负载均衡this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 唤醒负载均衡服务, 主动进行重平衡this.mQClientFactory.rebalanceImmediately();
}
跟生产者一样,首先需要检查下消费者配置,也就是 checkConfig 方法,这里面会去校验一些消费者的属性是否合法,当然里面的内容有点多,留到后面再去讲。
然后下面是 copySubscription,这个方法用于拷贝订阅关系,DefaultMQPushConsumer 里面有一个属性 subscription,这个就是消费者订阅关系,这个方法会把订阅关系拷贝到 RebalanceImpl 下面的 subscriptionInner 里面,消费者重平衡分配队列就是通过 RebalanceImpl 来实现的,且注意每个消费者都有自己的 RebalanceImpl,RebalanceImpl 是一个 protected 属性,然后顺便在 copySubscription 中将这个消费者所在消费者组的重传 topic 也设置到 subscriptionInner 中,这个也是后面再细说。
接下来如果是集群消费模式,如果 instanceName 为默认值 “DEFAULT”,那么改成 UtilAll.getPid() + “#” + System.nanoTime(),简单来说就是集群模式下消费者的实例名称修改成进程 PID#当前时间,也就是永远不会重复,这玩意就是用来构建 clientID 的,而 clientID 和消费者的消息队列分配有关,一个 topic 会将消息队列分配给对应集群下面的消费者,根据不同的策略进行分配,如果是广播模式,由于每一个消费者都会消费同一条消息,自然就不需要考虑这些。
接下来获取 MQClientInstance,在里面会根据 clientId 获取或者创建 MQClientInstance 实例,注意这里如果生产者和消费者运行在同一个进程中,比如启动一个 jar 包,这个 jar 包里面生产者消费者也不会共用一个 MQClientInstance,因为这个版本的 RocketMQ 设置 instanceName 的时候加上了当前时间,所以获取的 MQClientInstance 也是不同的,对于 MQClientInstance 如果想要了解更多的可以看这篇文章:【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)。
下面是负载均衡的一些相关属性,消费者组、消费模式、分配逻辑、消费者客户端,负载均衡是消费者很重要的一个功能,决定了消费者可以消费这个 topic 下面的哪些队列,关于负载均衡的原理后面文章会去细说。
创建消息拉取服务 pullAPIWrapper,在里面去进行消息拉取以及处理消息拉取的结果,消息拉取服务是消费者的重要服务,首先消费者不可能每一次拉取消息都向 broker 实时请求, 所以消费者都会有一个定时任务定时拉取消息下来存储到本地消息集合中,消费消息都是消费本地消息集合的。
下面根据消息模式设置不同的 OffsetStore,用于实现消费者的消息消费偏移量 offset 的管理,注意下这里消费者的偏移量值得是下一条要消费的消息在 ConsumeQueue 中的索引下标,如果要找到具体的消息在 ConsumeQueue 的位置,还需要 * 20。
接下来继续,最重要的来了,如果消费者的监听器是 MessageListenerOrderly, 说明这个消费者是顺序消费, 那么消费的逻辑就通过 ConsumeMessageOrderlyService 完成,也就是创建 ConsumeMessageOrderlyService,而如果消费者的监听器是 MessageListenerConcurrently, 说明这个消费者是并发消费, 那么消费的逻辑就通过 ConsumeMessageConcurrentlyService 完成,这两个消费模式有什么区别后面讲里面的消费源码的时候也会说的,接下来通过 this.consumeMessageService.start() 启动消费服务开始消费消息。
将当前消费者注册到 MQClientInstance 的 consumerTable 中,key 是消费者组,这里也透露出一件事,一个进程里面的消费者必须都是不同消费者组的。
最后通过 mQClientFactory.start()
启动客户端实例,设置消费者服务状态为 RUNNING,使用 updateTopicSubscribeInfoWhenSubscriptionChanged
方法从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存,同时随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确,调用 sendHeartbeatToAllBrokerWithLock
上报心跳信息(消费者的心跳信息就是订阅信息)。最后唤醒负载均衡服务, 主动进行重平衡。
上面的流程还是比较多的,重点是这个方法跟生产者启动的确实有点像,下面来单个看下里面的逻辑。
2.2.1 checkConfig 检查消费者配置
上面也说过了 checkConfig 会检查消费者启动的一些配置信息,首先就是校验消费者组是否合法,比如不能为空,长度不能超过 255,是否包含了非法字符,合法的字符只有下面中括号里面这些 [%|a-zA-Z0-9_-]。
// 校验消费者组属性是否合法(长度合法、字符合法)
Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());// 如果消费者组为空, 抛出异常
if (null == this.defaultMQPushConsumer.getConsumerGroup()) {throw new MQClientException("consumerGroup is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
接下来判断如果消费者组等于 DEFAULT_CONSUMER, 也抛出异常, 这是系统默认组名,用户不能设置消费者组成这个。
// 如果消费者组等于 DEFAULT_CONSUMER, 也抛出异常, 这是系统默认组名
if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException("consumerGroup can not equal "+ MixAll.DEFAULT_CONSUMER_GROUP+ ", please specify another one."+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
然后判断如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING,代表集群模式还是广播模式,当然默认就是集群模式,也就是不设置的话默认就是集群。
// 如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING
if (null == this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
接下来判断消费者的消费点位不能为空,消费者启动默认消费模式是 CONSUME_FROM_LAST_OFFSET,也就是默认从上次消费的点位开始继续消费。消费点位意思是消费者从哪个地方开始消费。
// 如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING
if (null == this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException("messageModel is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
然后下面判断 consumeTimestamp 属性,一个新的订阅组第一次启动从指定时间点开始消费,和 consumer.setConsumeTimestamp() 配合使用,默认是启动的时间往前退半个小时,当消费模式使用 CONSUME_FROM_TIMESTAMP 的时候会用到这玩意来计算下一次消费者从哪里开始拉取消息。
// 一个新的订阅组第一次启动从指定时间点开始消费,和 consumer.setConsumeTimestamp() 配合使用,默认是半个小时以前
Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS);
if (null == dt) {throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received "+ this.defaultMQPushConsumer.getConsumeTimestamp()+ " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
}
接着判断消费者的 MessageQueue 分配策略,这玩意默认是 AllocateMessageQueueAveragely,用来消费者重平衡分配队列。
// 消费者的 MessageQueue 分配策略
if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException("allocateMessageQueueStrategy is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
继续判断消费者的订阅信息是否为空,消费者订阅消息肯定不能为空的,毕竟消费者需要依靠订阅信息来获取消息消费,不过这个配置项应该不用了,但是这里也不用管,因为这个属性默认就是不为空的。
/*** 消费者订阅信息 topic -> 订阅表达式*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();// 消费者的订阅信息不能为空
if (null == this.defaultMQPushConsumer.getSubscription()) {throw new MQClientException("subscription is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
然后判断消息监听器,当有消息的时候就可以回调,不过一般都是实现这玩意的子接口 MessageListenerOrderly,回调 consumeMessage 方法。
// 消息监听器, 当有消息的时候就可以回调, 不过一般都是实现这玩意的子接口 MessageListenerOrderly, 回调 consumeMessage 方法
if (null == this.defaultMQPushConsumer.getMessageListener()) {throw new MQClientException("messageListener is null"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
消息监听器可以通过 register 方法注册。
接下来判断消息监听器有没有设置成 MessageListenerOrderly 或者 MessageListenerConcurrently 的子类,分别是顺序消费和并发消费。
// 这里就是判断消息监听器是 MessageListenerOrderly 或者 MessageListenerConcurrently 的子类, 分别是顺序消费和并发消费
boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
if (!orderly && !concurrently) {throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
接着往下看,下面就是消费者的线程数判断,消费者消费线程最小值在 [1, 1000] 之间,消费者线程最大值在 [1, 1000] 之间,且消费者线程最小值不能大于消费者线程最大值。
// 消费者线程最小值在 [1, 1000] 之间
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {throw new MQClientException("consumeThreadMin Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}// 消费者线程最大值在 [1, 1000] 之间
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {throw new MQClientException("consumeThreadMax Out of range [1, 1000]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}// 消费者线程最小值不能大于消费者线程最大值
if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",null);
}
接下来判断并发消费的最大拉取跨度要在 [1, 65535] 之间,并发消费拉取消息的跨度意思是消费者会把消息存储到一个 msgTree 中,里面消息的偏移量跨度如果默认超过 2000,那么 RocketMQ 就会先暂停拉取消息, 避免消息堆积过多,用户可以自己设置,不设置默认就是 2000。
// 并发消费的最大拉取跨度要在 [1, 65535] 之间
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
除了上面的拉取跨度,还有流控阈值,消息的流控,主要是为了防止消费者处理消息的速度跟不上消息的生产速度,导致消息在消费者端过度堆积,从而影响系统性能甚至导致系统崩溃,默认情况下每个消息队列会缓存 1000 条消息,所以消费者最多会将 1000 条消息缓存起来等待处理,但是消息拉取也是要结合 pullBatchSize 的,这个参数是消费者一次拉取的消息数量,假设是 200,当缓存的消息数量达到 900 条时,再次拉取 200 条消息,此时缓存的消息数量就会达到 1100 条,超过了 1000 条的限制, 所以这里要用的话得考虑好这两个参数的关系。这里检查这个参数也是需要在 [1,65535] 之间。
// 消息队列流控阈值需要在 [1, 65535] 之间
if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
上面是消息队列的流控,下面还有 topic 层面的流控,所谓主题层面的流控,就是对消费者从整个主题中拉取并缓存的消息总量进行限制。其默认值为 -1,意味着不做限制,即消费者可以无限制地从该主题中拉取消息并缓存,若 pullThresholdForTopic 被设置为一个非 -1 的有效数值,也就是启用了主题级别的流控,那么上面的 pullThresholdForQueue(队列级别的流控阈值)会被覆盖, 新的 pullThresholdForQueue 值会根据 pullThresholdForTopic 的值和分配给该消费者的消息队列数量重新计算得出。
// topic 层面的流控, 默认是 -1, 就是不限制从某个 topic 上拉取的消息数, 范围是 [1, 6553500]
if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}
}
除了流控,还有消息缓存的总大小,这里是从队列层面对缓存的消息大小做限制, 但是由于消费者一次可能会拉取多个消息(即批量拉取), 在拉取瞬间,消息队列中缓存的消息大小可能会超过 100 MiB,例如,当缓存的消息大小接近 100 MiB 时,若一次拉取的消息大小较大,就会使瞬时的缓存消息大小超过这个限制,这里是校验消费者拉取的消息缓存的总大小不能超过 pullThresholdSizeForQueue, 范围是 [1, 1024] MB。
// 消费者拉取的消息缓存的总大小不能超过 pullThresholdSizeForQueue, 范围是 [1, 1024] MB
if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
上面是队列层面的,当然处理队列层面的还有 topic 层面的,topic 层面的限制就是 -1,也就是默认不限制这个 topic 下面的消息缓存大小。
// 这里就是 topic 层面去限制缓存中拉取的消息大小, 默认是 -1 就是不限
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {// 如果限制, 这个值的范围需要在 [1, 102400] MBif (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);}
}
消息拉取,需要有一定的时间间隔,下面的 pullInterval 就是用来控制时间间隔的,当然默认是 0,就是立刻提交消息拉取请求,后面讲消息拉取源码的时候也会讲到,这里先有个概念。
// 消息拉取时间间隔需要在 [0, 65535]
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {throw new MQClientException("pullInterval Out of range [0, 65535]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
然后就是一次性可以批量消费多少条消息,消费者批量消费的消息数在 [1, 1024] 之间。
// 消费者批量消费的消息数在 [1, 1024] 之间
if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
最后是消费者批量拉取消息的数量,也就是上面有提到的 pullBatchSize,配合上面的 pullThresholdSizeForQueue 来使用,数量需要在 [1, 1024] 之间。
// 消费者批量拉取消息的数量需要在 [1, 1024] 之间
if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {throw new MQClientException("pullBatchSize Out of range [1, 1024]"+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
2.2.2 copySubscription 拷贝订阅关系
这里的拷贝订阅关系其实是将 DefaultMQPushConsumer 中的 subscription 拷贝到 RebalanceImpl 的 subscriptionInner 属性中,但是 subscription 看了下源码里面设置的方法就是通过 set 方法去设置,而这个 set 方法已经是弃用了,所以这个方法的前面这段可以不看了。
下面来看下这个方法的源码。
/*** 拷贝订阅关系* @throws MQClientException*/
private void copySubscription() throws MQClientException {try {// 获取消费者的订阅表达式, 这里的表达式可以通过 consumer.setSubscription(xxx); 设置, 但是这个方法已经是 @DeprecatedMap<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {// 遍历所有订阅表达式for (final Map.Entry<String, String> entry : sub.entrySet()) {// topic 名final String topic = entry.getKey();// 订阅表达式final String subString = entry.getValue();// 创建订阅信息SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);// 添加到 subscriptionInner 订阅信息集合中this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}// 设置消息监听器, 消费者可以通过 consumer.setMessageListener(); 来设置监听器if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}// 判断消费者的消费模式switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式, 广播模式下所有消费者都会消费同一条消息, 所以没有什么重传的, 万一有一个消费失败重传导致其他消费者又重新消费一遍就完蛋了break;case CLUSTERING:// 集群模式会把对这个消费者组的重传 topic 创建订阅信息, 当然是订阅所有 TAG 了final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);// 设置到 subscriptionInner 里面this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}
我们主要关注的还是后面的源码,首先就是设置消息的监听器,这个监听器在创建消费者的时候就会设置了,这里是将 DefaultMQPushConsumer 的 messageListenerInner 设置到 DefaultMQPushConsumerImpl 的 messageListenerInner,这个监听器就是用来创建并发(ConsumeMessageConcurrentlyService)还是顺序(ConsumeMessageOrderlyService)消费的类时候传进去的。
最后判断消费者的消费模式来设置重传 topic (%RETRY% + consumerGroup
)的订阅信息,如果消费者是广播模式,广播模式下所有消费者都会消费同一条消息,所以没有什么重传的,万一有一个消费失败重传导致其他消费者又重新消费一遍就重复消费了。但是在集群模式会对这个消费者的消费者组创建一个订阅信息,注意重传 topic 是消费者组维度的而不是消费者维度,也就是说一个消费者组里面的消费者都会共用一个重传 topic 来存储消息,重传 topic 肯定是默认订阅所有 tag 了。
2.2.3 加载消费偏移量
this.offsetStore.load()
用于加载消费偏移量,LocalFileOffsetStore 会加载本地磁盘中的数据,RemoteBrokerOffsetStore 没有实现,RemoteBrokerOffsetStore 的消费偏移量是存储到 broker 的,所以不会在这里加载,所以下面就看下 LocalFileOffsetStore 的加载偏移量源码。
/*** 本地偏移量加载* @throws MQClientException*/
@Override
public void load() throws MQClientException {// 从 #{user.home}/.rocketmq_offsets/{clientId}/{groupName}/{offsets.json} 中加载偏移量到 offsetTableOffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {AtomicLong offset = mqEntry.getValue();log.info("load consumer's offset, {} {} {}",this.groupName,mqEntry.getKey(),offset.get());}}
}
readLocalOffset 就不进去看了,里面就是从 {user.home}/.rocketmq_offsets/{clientId}/{groupName}/{offsets.json}
中加载偏移量到 offsetTable,不过一般我们都是向 broker 拉取偏移量,所以这里很少用到。
2.2.4 更新消费者订阅 topic 路由信息
updateTopicSubscribeInfoWhenSubscriptionChanged 就是从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存,那这个方法源码如下:
/*** 从 NameServer 中拉取 topic 的订阅信息来更新本地缓存*/
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {// 获取消费者的订阅信息Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();// 从 NameServer 中拉取这个 topic 的订阅信息并且尝试去更新本地的相关缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}
}
updateTopicRouteInfoFromNameServer 这个方法在前面的文章 【RocketMQ 生产者和消费者】- 生产者启动源码 - MQClientInstance 定时任务(4) 已经有详细介绍过了,所以这里不再多说,可以去看这篇文章,顺便说一下就是 MQClientInstance 定时任务不单单是生产者的,也是消费者的。
2.2.5 checkClientInBroker 校验 SQL92 表达式
这个方法就是随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确。
/*** 随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确* @throws MQClientException*/
public void checkClientInBroker() throws MQClientException {// 获取所有消费者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();// 遍历所有消费者while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();// 获取这个消费者的所有订阅信息, 一个消费者可以订阅多个 topic 信息的// 订阅第一个 topic,* 表示订阅该 topic 下的所有消息// consumer.subscribe("TopicTest1", "*");// 订阅第二个 topic// consumer.subscribe("TopicTest2", "*");Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();if (subscriptionInner == null || subscriptionInner.isEmpty()) {return;}// 遍历所有的订阅信息for (SubscriptionData subscriptionData : subscriptionInner) {// 如果是订阅 TAG 类型的就不校验if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {continue;}// may need to check one broker every cluster...// assume that the configs of every broker in cluster are the the same.// 根据 topic 随机获取一个 broker 地址, 一个 topic 可以存储到多个 broker 集群下面, 所以这里会随机选择一个 broker 集群// 下面的主节点, 如果选不到主节点呢?那就随便选一个从节点地址String addr = findBrokerAddrByTopic(subscriptionData.getTopic());if (addr != null) {try {// 给 broker 发送请求判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确this.getMQClientAPIImpl().checkClientInBroker(addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout());} catch (Exception e) {if (e instanceof MQClientException) {throw (MQClientException) e;} else {throw new MQClientException("Check client in broker error, maybe because you use "+ subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"+ "This error would not affect the launch of consumer, but may has impact on message receiving if you " +"have use the new features which are not supported by server, please check the log!", e);}}}}}
}
consumerTable 中存储当前进程里面的消费者,但是这个应该是每一个消费者的 consumerTable 都不同的,因为 consumerTable 在 MQClientInstance 中,而 MQClientInstance 在启动消费者的时候会创建出来,由于 instanceName 使用了当前时间去标识,所以每一个消费者启动的 instanceName 都会不同,所以 consumerTable 这里存储的应该就是启动的消费者信息。
findBrokerAddrByTopic 随意获取这个 topic 存储的 broker 集群中的其中一个,然后获取这个集群的主节点,给这个 broker 发送消息,来简单看下 findBrokerAddrByTopic 的源码。
/*** 一个 topic 可以存储在多个 broker, 这里就是随机获取一个 broker* @param topic* @return*/
public String findBrokerAddrByTopic(final String topic) {// 获取 topic 路由配置TopicRouteData topicRouteData = this.topicRouteTable.get(topic);if (topicRouteData != null) {// 这里就是存储着 topic 存储的 broker 信息List<BrokerData> brokers = topicRouteData.getBrokerDatas();if (!brokers.isEmpty()) {// 随机挑选一个int index = random.nextInt(brokers.size());BrokerData bd = brokers.get(index % brokers.size());// 有主节点就选主节点, 没有主节点就随便选一个地址return bd.selectBrokerAddr();}}return null;
}
可以看到这个方法就是直接从本地 topicRouteTable 缓存中获取路由信息,然后再从里面获取 broker,因为上面 2.2.4 节已经从 broker 中拉取了 topic 的路由信息来更新本地的 topicRouteTable 集合,所以这里可以直接从 topicRouteTable 中获取。
接下来就调用 checkClientInBroker 发送校验接口,下面就来看下这个方法。
/*** 给 broker 发送请求判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确* @param brokerAddr* @param consumerGroup* @param clientId* @param subscriptionData* @param timeoutMillis* @throws InterruptedException* @throws RemotingTimeoutException* @throws RemotingSendRequestException* @throws RemotingConnectException* @throws MQClientException*/
public void checkClientInBroker(final String brokerAddr, final String consumerGroup,final String clientId, final SubscriptionData subscriptionData,final long timeoutMillis)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,RemotingConnectException, MQClientException {// 请求 CODE 是 CHECK_CLIENT_CONFIGRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);// 构造消息体CheckClientRequestBody requestBody = new CheckClientRequestBody();// 设置消费者的客户端 IDrequestBody.setClientId(clientId);// 设置消费者组requestBody.setGroup(consumerGroup);// 设置消费者的订阅信息requestBody.setSubscriptionData(subscriptionData);request.setBody(requestBody.encode());// 通过 VIP 通道发送请求RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);assert response != null;if (ResponseCode.SUCCESS != response.getCode()) {// 如果 SQL92 校验失败, 抛出异常throw new MQClientException(response.getCode(), response.getRemark());}
}
可以看到请求 code 是 CHECK_CLIENT_CONFIG
,发送的时候传递的消息有三个:
- 消费者的客户端 ID。
- 消费者组。
- 消费者订阅信息,一个消费者组下面的消费者订阅是一样的。
发送的时候通过 VIP 通道发送,VIP 通道就是 10909 端口,性能会高一点,算是内部请求通道。
2.2.6 broker 处理 CHECK_CLIENT_CONFIG 请求
接着上面 2.2.5 小节,这一小节来看下 broker 如何处理 CHECK_CLIENT_CONFIG 请求。
/*** 校验消费者的 SQL92 配置* @param ctx* @param request* @return* @throws RemotingCommandException*/
public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),CheckClientRequestBody.class);if (requestBody != null && requestBody.getSubscriptionData() != null) {// 获取消费者订阅信息SubscriptionData subscriptionData = requestBody.getSubscriptionData();// 如果消费者订阅类型是 TAG 类型就直接返回 SUCCESSif (ExpressionType.isTagType(subscriptionData.getExpressionType())) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 判断 broker 是否支持 SQL92 过滤if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {// 如果不支持就返回错误码 SYSTEM_ERRORresponse.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}try {// 通过 SqlFilter 校验 SQL92 表达式是否正确FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());} catch (Exception e) {log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());// 出异常就返回结果 SUBSCRIPTION_PARSE_FAILEDresponse.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark(e.getMessage());return response;}}// 校验通过了response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}
可以看到对于 Tag 过滤的就直接返回 SUCCESS,如果是 SQL92 过滤,先看 broker 有没有开启 enablePropertyFilter 这个配置项,在之前的文章 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息 我们也说过,如果 broker 要支持 SQL92 过滤,就需要在配置文件中设置 enablePropertyFilter=true
。
最后通过 compile 方法去解析 SQL92 过滤表达式,这个表达式后面再用文章单独说,这里就记住有这么个方法,如果校验不通过会抛出异常,然后就在里面返回 SUBSCRIPTION_PARSE_FAILED,返回之后在 checkClientInBroker 方法中由于不是 SUCCESS,就抛出 MQClientException 异常代表 SQL92 表达式校验失败。
3. 小结
好了,这篇文章就讲述了消费者启动的源码,可以看到里面大部分的方法都是跟生产者启动一样的,2.2 小结里面这些则是一些消费者的方法,不过像消费者重平衡服务,这些就留到后面讲,文章字数也差不多了。
如有错误,欢迎指出!!!!