|
73 | 73 | import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; |
74 | 74 | import org.apache.rocketmq.broker.filter.ConsumerFilterManager; |
75 | 75 | import org.apache.rocketmq.broker.latency.BrokerFastFailure; |
| 76 | +import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager; |
| 77 | +import org.apache.rocketmq.broker.lite.LiteEventDispatcher; |
| 78 | +import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry; |
| 79 | +import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistryImpl; |
| 80 | +import org.apache.rocketmq.broker.lite.LiteLifecycleManager; |
| 81 | +import org.apache.rocketmq.broker.lite.LiteSharding; |
| 82 | +import org.apache.rocketmq.broker.lite.LiteShardingImpl; |
| 83 | +import org.apache.rocketmq.broker.lite.RocksDBLiteLifecycleManager; |
76 | 84 | import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService; |
77 | 85 | import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener; |
78 | 86 | import org.apache.rocketmq.broker.longpolling.PullRequestHoldService; |
|
93 | 101 | import org.apache.rocketmq.broker.processor.ClientManageProcessor; |
94 | 102 | import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; |
95 | 103 | import org.apache.rocketmq.broker.processor.EndTransactionProcessor; |
| 104 | +import org.apache.rocketmq.broker.processor.LiteManagerProcessor; |
| 105 | +import org.apache.rocketmq.broker.processor.LiteSubscriptionCtlProcessor; |
96 | 106 | import org.apache.rocketmq.broker.processor.NotificationProcessor; |
97 | 107 | import org.apache.rocketmq.broker.processor.PeekMessageProcessor; |
98 | 108 | import org.apache.rocketmq.broker.processor.PollingInfoProcessor; |
99 | 109 | import org.apache.rocketmq.broker.processor.PopInflightMessageCounter; |
| 110 | +import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor; |
100 | 111 | import org.apache.rocketmq.broker.processor.PopMessageProcessor; |
101 | 112 | import org.apache.rocketmq.broker.processor.PullMessageProcessor; |
102 | 113 | import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor; |
@@ -206,12 +217,19 @@ public class BrokerController { |
206 | 217 | protected final PullMessageProcessor pullMessageProcessor; |
207 | 218 | protected final PeekMessageProcessor peekMessageProcessor; |
208 | 219 | protected final PopMessageProcessor popMessageProcessor; |
| 220 | + protected final PopLiteMessageProcessor popLiteMessageProcessor; |
209 | 221 | protected final AckMessageProcessor ackMessageProcessor; |
210 | 222 | protected final ChangeInvisibleTimeProcessor changeInvisibleTimeProcessor; |
211 | 223 | protected final NotificationProcessor notificationProcessor; |
212 | 224 | protected final PollingInfoProcessor pollingInfoProcessor; |
213 | 225 | protected final QueryAssignmentProcessor queryAssignmentProcessor; |
214 | 226 | protected final ClientManageProcessor clientManageProcessor; |
| 227 | + protected final LiteSubscriptionCtlProcessor liteSubscriptionCtlProcessor; |
| 228 | + protected final LiteSharding liteSharding; |
| 229 | + protected final AbstractLiteLifecycleManager liteLifecycleManager; |
| 230 | + protected final LiteSubscriptionRegistry liteSubscriptionRegistry; |
| 231 | + protected final LiteEventDispatcher liteEventDispatcher; |
| 232 | + protected final LiteManagerProcessor liteManagerProcessor; |
215 | 233 | protected final SendMessageProcessor sendMessageProcessor; |
216 | 234 | protected final RecallMessageProcessor recallMessageProcessor; |
217 | 235 | protected final ReplyMessageProcessor replyMessageProcessor; |
@@ -376,18 +394,27 @@ public BrokerController( |
376 | 394 | this.topicQueueMappingManager = new TopicQueueMappingManager(this); |
377 | 395 | this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig); |
378 | 396 | this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig); |
| 397 | + this.topicRouteInfoManager = new TopicRouteInfoManager(this); |
| 398 | + this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager); |
| 399 | + this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ? |
| 400 | + new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding); |
| 401 | + this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager); |
| 402 | + this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry); |
| 403 | + this.liteEventDispatcher = new LiteEventDispatcher(this, this.liteSubscriptionRegistry, this.liteLifecycleManager); |
| 404 | + this.liteManagerProcessor = new LiteManagerProcessor(this, liteLifecycleManager, liteSharding); |
379 | 405 | this.pullMessageProcessor = new PullMessageProcessor(this); |
380 | 406 | this.peekMessageProcessor = new PeekMessageProcessor(this); |
381 | 407 | this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this); |
382 | 408 | this.popMessageProcessor = new PopMessageProcessor(this); |
| 409 | + this.popLiteMessageProcessor = new PopLiteMessageProcessor(this, this.liteEventDispatcher); |
383 | 410 | this.notificationProcessor = new NotificationProcessor(this); |
384 | 411 | this.pollingInfoProcessor = new PollingInfoProcessor(this); |
385 | 412 | this.ackMessageProcessor = new AckMessageProcessor(this); |
386 | 413 | this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this); |
387 | 414 | this.sendMessageProcessor = new SendMessageProcessor(this); |
388 | 415 | this.recallMessageProcessor = new RecallMessageProcessor(this); |
389 | 416 | this.replyMessageProcessor = new ReplyMessageProcessor(this); |
390 | | - this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor); |
| 417 | + this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor, this.liteEventDispatcher); |
391 | 418 | this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); |
392 | 419 | this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig); |
393 | 420 | this.producerManager = new ProducerManager(this.brokerStatsManager); |
@@ -466,8 +493,6 @@ public boolean online(String instanceId, String group, String topic) { |
466 | 493 |
|
467 | 494 | this.escapeBridge = new EscapeBridge(this); |
468 | 495 |
|
469 | | - this.topicRouteInfoManager = new TopicRouteInfoManager(this); |
470 | | - |
471 | 496 | if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) { |
472 | 497 | this.brokerPreOnlineService = new BrokerPreOnlineService(this); |
473 | 498 | } |
@@ -950,6 +975,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException { |
950 | 975 |
|
951 | 976 | initialRequestPipeline(); |
952 | 977 |
|
| 978 | + initLiteService(); |
| 979 | + |
953 | 980 | if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { |
954 | 981 | // Register a listener to reload SslContext |
955 | 982 | try { |
@@ -1045,6 +1072,21 @@ public PutMessageResult executeBeforePutMessage(MessageExt msg) { |
1045 | 1072 | } |
1046 | 1073 | }); |
1047 | 1074 |
|
| 1075 | + putMessageHookList.add(new PutMessageHook() { |
| 1076 | + @Override |
| 1077 | + public String hookName() { |
| 1078 | + return "handleLmqQuota"; |
| 1079 | + } |
| 1080 | + |
| 1081 | + @Override |
| 1082 | + public PutMessageResult executeBeforePutMessage(MessageExt msg) { |
| 1083 | + if (msg instanceof MessageExtBrokerInner) { |
| 1084 | + return HookUtils.handleLmqQuota(BrokerController.this, (MessageExtBrokerInner) msg); |
| 1085 | + } |
| 1086 | + return null; |
| 1087 | + } |
| 1088 | + }); |
| 1089 | + |
1048 | 1090 | SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() { |
1049 | 1091 | @Override |
1050 | 1092 | public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) { |
@@ -1111,6 +1153,11 @@ private void initialRequestPipeline() { |
1111 | 1153 | } |
1112 | 1154 | } |
1113 | 1155 |
|
| 1156 | + private void initLiteService() { |
| 1157 | + this.liteEventDispatcher.init(); |
| 1158 | + this.liteLifecycleManager.init(); |
| 1159 | + } |
| 1160 | + |
1114 | 1161 | public void registerProcessor() { |
1115 | 1162 | RemotingServer remotingServer = remotingServerMap.get(TCP_REMOTING_SERVER); |
1116 | 1163 | RemotingServer fastRemotingServer = remotingServerMap.get(FAST_REMOTING_SERVER); |
@@ -1145,6 +1192,7 @@ public void registerProcessor() { |
1145 | 1192 | * PopMessageProcessor |
1146 | 1193 | */ |
1147 | 1194 | remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor); |
| 1195 | + remotingServer.registerProcessor(RequestCode.POP_LITE_MESSAGE, this.popLiteMessageProcessor, this.pullMessageExecutor); |
1148 | 1196 |
|
1149 | 1197 | /** |
1150 | 1198 | * AckMessageProcessor |
@@ -1196,10 +1244,12 @@ public void registerProcessor() { |
1196 | 1244 | remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); |
1197 | 1245 | remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); |
1198 | 1246 | remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); |
| 1247 | + remotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor); |
1199 | 1248 |
|
1200 | 1249 | fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); |
1201 | 1250 | fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); |
1202 | 1251 | fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); |
| 1252 | + fastRemotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor); |
1203 | 1253 |
|
1204 | 1254 | /** |
1205 | 1255 | * ConsumerManageProcessor |
@@ -1227,6 +1277,23 @@ public void registerProcessor() { |
1227 | 1277 | remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); |
1228 | 1278 | fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); |
1229 | 1279 |
|
| 1280 | + /* |
| 1281 | + * lite admin |
| 1282 | + */ |
| 1283 | + remotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1284 | + remotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1285 | + remotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1286 | + remotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1287 | + remotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1288 | + remotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor); |
| 1289 | + |
| 1290 | + fastRemotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1291 | + fastRemotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1292 | + fastRemotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1293 | + fastRemotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1294 | + fastRemotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor); |
| 1295 | + fastRemotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor); |
| 1296 | + |
1230 | 1297 | /* |
1231 | 1298 | * Default |
1232 | 1299 | */ |
@@ -1409,6 +1476,10 @@ public PopMessageProcessor getPopMessageProcessor() { |
1409 | 1476 | return popMessageProcessor; |
1410 | 1477 | } |
1411 | 1478 |
|
| 1479 | + public PopLiteMessageProcessor getPopLiteMessageProcessor() { |
| 1480 | + return popLiteMessageProcessor; |
| 1481 | + } |
| 1482 | + |
1412 | 1483 | public NotificationProcessor getNotificationProcessor() { |
1413 | 1484 | return notificationProcessor; |
1414 | 1485 | } |
@@ -1437,6 +1508,14 @@ public ChangeInvisibleTimeProcessor getChangeInvisibleTimeProcessor() { |
1437 | 1508 | return changeInvisibleTimeProcessor; |
1438 | 1509 | } |
1439 | 1510 |
|
| 1511 | + public LiteSubscriptionRegistry getLiteSubscriptionRegistry() { |
| 1512 | + return liteSubscriptionRegistry; |
| 1513 | + } |
| 1514 | + |
| 1515 | + public AbstractLiteLifecycleManager getLiteLifecycleManager() { |
| 1516 | + return liteLifecycleManager; |
| 1517 | + } |
| 1518 | + |
1440 | 1519 | protected void shutdownBasicService() { |
1441 | 1520 |
|
1442 | 1521 | shutdown = true; |
@@ -1474,6 +1553,13 @@ protected void shutdownBasicService() { |
1474 | 1553 | this.popMessageProcessor.getPopLongPollingService().shutdown(); |
1475 | 1554 | } |
1476 | 1555 |
|
| 1556 | + if (this.popLiteMessageProcessor != null) { |
| 1557 | + this.popLiteMessageProcessor.stopPopLiteLockManager(); |
| 1558 | + if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) { |
| 1559 | + this.popLiteMessageProcessor.getPopLiteLongPollingService().shutdown(); |
| 1560 | + } |
| 1561 | + } |
| 1562 | + |
1477 | 1563 | if (this.popMessageProcessor.getQueueLockManager() != null) { |
1478 | 1564 | this.popMessageProcessor.getQueueLockManager().shutdown(); |
1479 | 1565 | } |
@@ -1637,6 +1723,18 @@ protected void shutdownBasicService() { |
1637 | 1723 | this.coldDataCgCtrService.shutdown(); |
1638 | 1724 | } |
1639 | 1725 |
|
| 1726 | + if (this.liteEventDispatcher != null) { |
| 1727 | + this.liteEventDispatcher.shutdown(); |
| 1728 | + } |
| 1729 | + |
| 1730 | + if (this.liteLifecycleManager != null) { |
| 1731 | + this.liteLifecycleManager.shutdown(); |
| 1732 | + } |
| 1733 | + |
| 1734 | + if (this.liteSubscriptionRegistry != null) { |
| 1735 | + this.liteSubscriptionRegistry.shutdown(); |
| 1736 | + } |
| 1737 | + |
1640 | 1738 | shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService); |
1641 | 1739 | shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService); |
1642 | 1740 |
|
@@ -1777,6 +1875,13 @@ protected void startBasicService() throws Exception { |
1777 | 1875 | this.popMessageProcessor.getQueueLockManager().start(); |
1778 | 1876 | } |
1779 | 1877 |
|
| 1878 | + if (this.popLiteMessageProcessor != null) { |
| 1879 | + this.popLiteMessageProcessor.startPopLiteLockManager(); |
| 1880 | + if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) { |
| 1881 | + this.popLiteMessageProcessor.getPopLiteLongPollingService().start(); |
| 1882 | + } |
| 1883 | + } |
| 1884 | + |
1780 | 1885 | if (this.ackMessageProcessor != null) { |
1781 | 1886 | if (brokerConfig.isPopConsumerFSServiceInit()) { |
1782 | 1887 | this.ackMessageProcessor.startPopReviveService(); |
@@ -1838,6 +1943,18 @@ protected void startBasicService() throws Exception { |
1838 | 1943 | if (this.coldDataCgCtrService != null) { |
1839 | 1944 | this.coldDataCgCtrService.start(); |
1840 | 1945 | } |
| 1946 | + |
| 1947 | + if (this.liteEventDispatcher != null) { |
| 1948 | + this.liteEventDispatcher.start(); |
| 1949 | + } |
| 1950 | + |
| 1951 | + if (this.liteLifecycleManager != null) { |
| 1952 | + this.liteLifecycleManager.start(); |
| 1953 | + } |
| 1954 | + |
| 1955 | + if (this.liteSubscriptionRegistry != null) { |
| 1956 | + this.liteSubscriptionRegistry.start(); |
| 1957 | + } |
1841 | 1958 | } |
1842 | 1959 |
|
1843 | 1960 | public void start() throws Exception { |
@@ -2699,4 +2816,8 @@ public ConfigContext getConfigContext() { |
2699 | 2816 | public void setConfigContext(ConfigContext configContext) { |
2700 | 2817 | this.configContext = configContext; |
2701 | 2818 | } |
| 2819 | + |
| 2820 | + public LiteEventDispatcher getLiteEventDispatcher() { |
| 2821 | + return liteEventDispatcher; |
| 2822 | + } |
2702 | 2823 | } |
0 commit comments