Skip to content

Commit 706ac00

Browse files
humkum韩坤明
authored andcommitted
fix: slave delete all topic config and subGroup config when sync config from master
1 parent 9879968 commit 706ac00

File tree

2 files changed

+77
-13
lines changed

2 files changed

+77
-13
lines changed

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
package org.apache.rocketmq.broker.slave;
1818

1919
import java.io.IOException;
20-
import java.util.Iterator;
20+
import java.util.ArrayList;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentMap;
@@ -87,14 +88,13 @@ private void syncTopicConfig() {
8788
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();
8889

8990
//delete
90-
Iterator<Map.Entry<String, TopicConfig>> iterator = topicConfigTable.entrySet().iterator();
91-
while (iterator.hasNext()) {
92-
Map.Entry<String, TopicConfig> entry = iterator.next();
91+
List<String> toRemoveTopic = new ArrayList<>();
92+
for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) {
9393
if (!newTopicConfigTable.containsKey(entry.getKey())) {
94-
iterator.remove();
95-
topicConfigManager.deleteTopicConfig(entry.getKey());
94+
toRemoveTopic.add(entry.getKey());
9695
}
9796
}
97+
toRemoveTopic.forEach(topicConfigManager::deleteTopicConfig);
9898

9999
//update
100100
newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig);
@@ -181,14 +181,14 @@ private void syncSubscriptionGroupConfig() {
181181
ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable =
182182
subscriptionWrapper.getSubscriptionGroupTable();
183183
// delete
184-
Iterator<Map.Entry<String, SubscriptionGroupConfig>> iterator = curSubscriptionGroupTable.entrySet().iterator();
185-
while (iterator.hasNext()) {
186-
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
184+
List<String> toRemoveSubscriptionGroup = new ArrayList<>();
185+
for (Map.Entry<String, SubscriptionGroupConfig> configEntry : curSubscriptionGroupTable.entrySet()) {
187186
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
188-
iterator.remove();
189-
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
187+
toRemoveSubscriptionGroup.add(configEntry.getKey());
190188
}
191189
}
190+
toRemoveSubscriptionGroup.forEach(subscriptionGroupManager::deleteSubscriptionGroupConfig);
191+
192192
// update
193193
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);
194194
subscriptionGroupManager.setDataVersion(subscriptionWrapper.getDataVersion());

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,10 @@ public void testSyncAll() throws RemotingConnectException, RemotingSendRequestEx
143143
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
144144
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
145145

146-
TopicConfigManager topicConfigManager = new TopicConfigManager();
146+
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
147147
TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager);
148148
doNothing().when(spiedTopicConfigManager).persist();
149-
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager();
149+
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager(brokerController);
150150
SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager);
151151
doNothing().when(spiedGroupConfigManager).persist();
152152
when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager);
@@ -174,6 +174,70 @@ public void testSyncTimerCheckPoint() throws RemotingConnectException, RemotingS
174174
Assert.assertEquals(0, timerCheckpoint.getDataVersion().getStateVersion());
175175
}
176176

177+
@Test
178+
public void testSyncAllIncludesTopicConfig() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
179+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
180+
TopicConfig newTopicConfig = new TopicConfig("TestTopic");
181+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
182+
183+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
184+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
185+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
186+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
187+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
188+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
189+
190+
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
191+
TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager);
192+
doNothing().when(spiedTopicConfigManager).persist();
193+
when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager);
194+
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager(brokerController);
195+
SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager);
196+
doNothing().when(spiedGroupConfigManager).persist();
197+
when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager);
198+
199+
TopicConfigManager topicConfigManager1 = brokerController.getTopicConfigManager();
200+
DataVersion dataVersion = topicConfigManager1.getDataVersion();
201+
Assert.assertEquals(0, dataVersion.getStateVersion());
202+
slaveSynchronize.syncAll();
203+
204+
Assert.assertEquals(5, brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
205+
Assert.assertTrue(brokerController.getTopicConfigManager().getTopicConfigTable().containsKey("TestTopic"));
206+
}
207+
208+
@Test
209+
public void testSyncTopicConfigWithTopicDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
210+
MQBrokerException, InterruptedException, RemotingCommandException, UnsupportedEncodingException {
211+
TopicConfig localTopic = new TopicConfig("LocalTopic");
212+
ConcurrentHashMap<String, TopicConfig> localTable = new ConcurrentHashMap<>();
213+
localTable.put("LocalTopic", localTopic);
214+
215+
TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
216+
TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager);
217+
spiedTopicConfigManager.setTopicConfigTable(localTable);
218+
doNothing().when(spiedTopicConfigManager).persist();
219+
when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager);
220+
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager(brokerController);
221+
SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager);
222+
doNothing().when(spiedGroupConfigManager).persist();
223+
when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager);
224+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
225+
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
226+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
227+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
228+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
229+
230+
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
231+
TopicConfigAndMappingSerializeWrapper topicWrapper = createTopicConfigWrapper(newTopicConfig);
232+
233+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicWrapper);
234+
235+
slaveSynchronize.syncAll();
236+
237+
Assert.assertFalse(brokerController.getTopicConfigManager().getTopicConfigTable().containsKey("LocalTopic"));
238+
Assert.assertTrue(brokerController.getTopicConfigManager().getTopicConfigTable().containsKey("NewTopic"));
239+
}
240+
177241
private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
178242
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
179243
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());

0 commit comments

Comments
 (0)