Skip to content

Commit cd2862f

Browse files
committed
fix: slave delete all topic config and subGroup config when sync config from master
1 parent 7fc5452 commit cd2862f

File tree

2 files changed

+46
-4
lines changed

2 files changed

+46
-4
lines changed

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,8 @@ private void syncTopicConfig() {
9393
while (iterator.hasNext()) {
9494
Map.Entry<String, TopicConfig> entry = iterator.next();
9595
if (!newTopicConfigTable.containsKey(entry.getKey())) {
96-
iterator.remove();
96+
topicConfigManager.deleteTopicConfig(entry.getKey());
9797
}
98-
topicConfigManager.deleteTopicConfig(entry.getKey());
9998
}
10099

101100
//update
@@ -188,9 +187,8 @@ private void syncSubscriptionGroupConfig() {
188187
while (iterator.hasNext()) {
189188
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
190189
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
191-
iterator.remove();
190+
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
192191
}
193-
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
194192
}
195193
// update
196194
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,50 @@ public void testSyncTimerCheckPoint() throws RemotingConnectException, RemotingS
162162
Assert.assertEquals(0, timerCheckpoint.getDataVersion().getStateVersion());
163163
}
164164

165+
@Test
166+
public void testSyncAllIncludesTopicConfig() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
167+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
168+
TopicConfig newTopicConfig = new TopicConfig("TestTopic");
169+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
170+
171+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
172+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
173+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
174+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
175+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
176+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
177+
178+
Assert.assertEquals(0, topicConfigManager.getDataVersion().getStateVersion());
179+
slaveSynchronize.syncAll();
180+
181+
Assert.assertEquals(1, topicConfigManager.getDataVersion().getStateVersion());
182+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("TestTopic"));
183+
}
184+
185+
@Test
186+
public void testSyncTopicConfigWithTopicDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
187+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
188+
TopicConfig localTopic = new TopicConfig("LocalTopic");
189+
ConcurrentHashMap<String, TopicConfig> localTable = new ConcurrentHashMap<>();
190+
localTable.put("LocalTopic", localTopic);
191+
when(topicConfigManager.getTopicConfigTable()).thenReturn(localTable);
192+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
193+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
194+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
195+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
196+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
197+
198+
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
199+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
200+
201+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
202+
203+
slaveSynchronize.syncAll();
204+
205+
Assert.assertFalse(topicConfigManager.getTopicConfigTable().containsKey("LocalTopic"));
206+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("NewTopic"));
207+
}
208+
165209
private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
166210
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
167211
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());

0 commit comments

Comments
 (0)