Skip to content

Commit 92f0ca7

Browse files
committed
fix: slave delete all topic config and subGroup config when sync config from master
1 parent 7fc5452 commit 92f0ca7

File tree

2 files changed

+55
-11
lines changed

2 files changed

+55
-11
lines changed

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
package org.apache.rocketmq.broker.slave;
1818

1919
import java.io.IOException;
20-
import java.util.Iterator;
20+
import java.util.ArrayList;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentMap;
@@ -89,14 +90,13 @@ private void syncTopicConfig() {
8990
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();
9091

9192
//delete
92-
Iterator<Map.Entry<String, TopicConfig>> iterator = topicConfigTable.entrySet().iterator();
93-
while (iterator.hasNext()) {
94-
Map.Entry<String, TopicConfig> entry = iterator.next();
93+
List<String> toRemoveTopic = new ArrayList<>();
94+
for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) {
9595
if (!newTopicConfigTable.containsKey(entry.getKey())) {
96-
iterator.remove();
96+
toRemoveTopic.add(entry.getKey());
9797
}
98-
topicConfigManager.deleteTopicConfig(entry.getKey());
9998
}
99+
toRemoveTopic.forEach(topicConfigManager::deleteTopicConfig);
100100

101101
//update
102102
newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig);
@@ -184,14 +184,14 @@ private void syncSubscriptionGroupConfig() {
184184
ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable =
185185
subscriptionWrapper.getSubscriptionGroupTable();
186186
// delete
187-
Iterator<Map.Entry<String, SubscriptionGroupConfig>> iterator = curSubscriptionGroupTable.entrySet().iterator();
188-
while (iterator.hasNext()) {
189-
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
187+
List<String> toRemoveSubscriptionGroup = new ArrayList<>();
188+
for (Map.Entry<String, SubscriptionGroupConfig> configEntry : curSubscriptionGroupTable.entrySet()) {
190189
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
191-
iterator.remove();
190+
toRemoveSubscriptionGroup.add(configEntry.getKey());
192191
}
193-
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
194192
}
193+
toRemoveSubscriptionGroup.forEach(subscriptionGroupManager::deleteSubscriptionGroupConfig);
194+
195195
// update
196196
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);
197197
subscriptionGroupManager.updateDataVersion();

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,50 @@ public void testSyncTimerCheckPoint() throws RemotingConnectException, RemotingS
162162
Assert.assertEquals(0, timerCheckpoint.getDataVersion().getStateVersion());
163163
}
164164

165+
@Test
166+
public void testSyncAllIncludesTopicConfig() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
167+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
168+
TopicConfig newTopicConfig = new TopicConfig("TestTopic");
169+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
170+
171+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
172+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
173+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
174+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
175+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
176+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
177+
178+
Assert.assertEquals(0, topicConfigManager.getDataVersion().getStateVersion());
179+
slaveSynchronize.syncAll();
180+
181+
Assert.assertEquals(1, topicConfigManager.getDataVersion().getStateVersion());
182+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("TestTopic"));
183+
}
184+
185+
@Test
186+
public void testSyncTopicConfigWithTopicDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
187+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
188+
TopicConfig localTopic = new TopicConfig("LocalTopic");
189+
ConcurrentHashMap<String, TopicConfig> localTable = new ConcurrentHashMap<>();
190+
localTable.put("LocalTopic", localTopic);
191+
when(topicConfigManager.getTopicConfigTable()).thenReturn(localTable);
192+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
193+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
194+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
195+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
196+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
197+
198+
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
199+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
200+
201+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
202+
203+
slaveSynchronize.syncAll();
204+
205+
Assert.assertFalse(topicConfigManager.getTopicConfigTable().containsKey("LocalTopic"));
206+
Assert.assertTrue(topicConfigManager.getTopicConfigTable().containsKey("NewTopic"));
207+
}
208+
165209
private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
166210
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
167211
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());

0 commit comments

Comments
 (0)