Skip to content

Commit b3ab93f

Browse files
committed
fix: slave delete all topic config and subGroup config when sync config from master
1 parent 7fc5452 commit b3ab93f

File tree

2 files changed

+55
-10
lines changed

2 files changed

+55
-10
lines changed

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.apache.rocketmq.broker.slave;
1818

1919
import java.io.IOException;
20+
import java.util.ArrayList;
2021
import java.util.Iterator;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
2325
import java.util.concurrent.ConcurrentMap;
@@ -89,14 +91,13 @@ private void syncTopicConfig() {
8991
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();
9092

9193
//delete
92-
Iterator<Map.Entry<String, TopicConfig>> iterator = topicConfigTable.entrySet().iterator();
93-
while (iterator.hasNext()) {
94-
Map.Entry<String, TopicConfig> entry = iterator.next();
94+
List<String> toRemoveTopic = new ArrayList<>();
95+
for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) {
9596
if (!newTopicConfigTable.containsKey(entry.getKey())) {
96-
iterator.remove();
97+
toRemoveTopic.add(entry.getKey());
9798
}
98-
topicConfigManager.deleteTopicConfig(entry.getKey());
9999
}
100+
toRemoveTopic.forEach(topicConfigManager::deleteTopicConfig);
100101

101102
//update
102103
newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig);
@@ -184,14 +185,14 @@ private void syncSubscriptionGroupConfig() {
184185
ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable =
185186
subscriptionWrapper.getSubscriptionGroupTable();
186187
// delete
187-
Iterator<Map.Entry<String, SubscriptionGroupConfig>> iterator = curSubscriptionGroupTable.entrySet().iterator();
188-
while (iterator.hasNext()) {
189-
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
188+
List<String> toRemoveSubscriptionGroup = new ArrayList<>();
189+
for (Map.Entry<String, SubscriptionGroupConfig> configEntry : curSubscriptionGroupTable.entrySet()) {
190190
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
191-
iterator.remove();
191+
toRemoveSubscriptionGroup.add(configEntry.getKey());
192192
}
193-
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
194193
}
194+
toRemoveSubscriptionGroup.forEach(subscriptionGroupManager::deleteSubscriptionGroupConfig);
195+
195196
// update
196197
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);
197198
subscriptionGroupManager.updateDataVersion();

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,50 @@ public void testSyncTimerCheckPoint() throws RemotingConnectException, RemotingS
162162
Assert.assertEquals(0, timerCheckpoint.getDataVersion().getStateVersion());
163163
}
164164

165+
@Test
166+
public void testSyncAllIncludesTopicConfig() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
167+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
168+
TopicConfig newTopicConfig = new TopicConfig("TestTopic");
169+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
170+
171+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
172+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
173+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
174+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
175+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
176+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
177+
178+
Assert.assertEquals(0, topicConfigManager.getDataVersion().getStateVersion());
179+
slaveSynchronize.syncAll();
180+
181+
Assert.assertEquals(1, topicConfigManager.getDataVersion().getStateVersion());
182+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("TestTopic"));
183+
}
184+
185+
@Test
186+
public void testSyncTopicConfigWithTopicDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
187+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
188+
TopicConfig localTopic = new TopicConfig("LocalTopic");
189+
ConcurrentHashMap<String, TopicConfig> localTable = new ConcurrentHashMap<>();
190+
localTable.put("LocalTopic", localTopic);
191+
when(topicConfigManager.getTopicConfigTable()).thenReturn(localTable);
192+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
193+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
194+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
195+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
196+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
197+
198+
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
199+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
200+
201+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
202+
203+
slaveSynchronize.syncAll();
204+
205+
Assert.assertFalse(topicConfigManager.getTopicConfigTable().containsKey("LocalTopic"));
206+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("NewTopic"));
207+
}
208+
165209
private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
166210
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
167211
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());

0 commit comments

Comments
 (0)