|
69 | 69 | import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; |
70 | 70 | import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; |
71 | 71 | import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; |
72 | | -import org.apache.rocketmq.common.BoundaryType; |
73 | 72 | import org.apache.rocketmq.common.BrokerConfig; |
74 | 73 | import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; |
75 | 74 | import org.apache.rocketmq.common.KeyBuilder; |
76 | 75 | import org.apache.rocketmq.common.LockCallback; |
77 | 76 | import org.apache.rocketmq.common.MQVersion; |
78 | 77 | import org.apache.rocketmq.common.MixAll; |
79 | | -import org.apache.rocketmq.common.Pair; |
80 | 78 | import org.apache.rocketmq.common.TopicAttributes; |
81 | 79 | import org.apache.rocketmq.common.TopicConfig; |
82 | 80 | import org.apache.rocketmq.common.UnlockCallback; |
|
214 | 212 | import org.apache.rocketmq.store.MessageStore; |
215 | 213 | import org.apache.rocketmq.store.PutMessageResult; |
216 | 214 | import org.apache.rocketmq.store.PutMessageStatus; |
217 | | -import org.apache.rocketmq.store.RocksDBMessageStore; |
218 | 215 | import org.apache.rocketmq.store.SelectMappedBufferResult; |
219 | 216 | import org.apache.rocketmq.store.StoreType; |
220 | 217 | import org.apache.rocketmq.store.config.BrokerRole; |
221 | 218 | import org.apache.rocketmq.store.exception.ConsumeQueueException; |
222 | 219 | import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; |
| 220 | +import org.apache.rocketmq.store.queue.CombineConsumeQueueStore; |
223 | 221 | import org.apache.rocketmq.store.queue.ConsumeQueueInterface; |
| 222 | +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; |
224 | 223 | import org.apache.rocketmq.store.queue.CqUnit; |
225 | 224 | import org.apache.rocketmq.store.queue.ReferredIterator; |
226 | 225 | import org.apache.rocketmq.store.timer.TimerCheckpoint; |
@@ -3321,129 +3320,24 @@ private boolean validateBlackListConfigExist(Properties properties) { |
3321 | 3320 | private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, |
3322 | 3321 | RemotingCommand request) throws RemotingCommandException { |
3323 | 3322 | CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); |
3324 | | - String requestTopic = requestHeader.getTopic(); |
3325 | 3323 | MessageStore messageStore = brokerController.getMessageStore(); |
3326 | 3324 | DefaultMessageStore defaultMessageStore; |
3327 | 3325 | if (messageStore instanceof AbstractPluginMessageStore) { |
3328 | 3326 | defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); |
3329 | 3327 | } else { |
3330 | 3328 | defaultMessageStore = (DefaultMessageStore) messageStore; |
3331 | 3329 | } |
3332 | | - RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); |
3333 | | - CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3330 | + ConsumeQueueStoreInterface consumeQueueStore = defaultMessageStore.getQueueStore(); |
3334 | 3331 |
|
3335 | | - if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) { |
3336 | | - result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check"); |
| 3332 | + if (!(consumeQueueStore instanceof CombineConsumeQueueStore)) { |
| 3333 | + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3334 | + result.setCheckResult("It is not CombineConsumeQueueStore, no need check"); |
3337 | 3335 | result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue()); |
3338 | 3336 | return result; |
3339 | 3337 | } |
3340 | 3338 |
|
3341 | | - if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { |
3342 | | - result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"); |
3343 | | - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3344 | | - return result; |
3345 | | - } |
3346 | | - |
3347 | | - ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); |
3348 | | - StringBuilder diffResult = new StringBuilder(); |
3349 | | - try { |
3350 | | - if (StringUtils.isNotBlank(requestTopic)) { |
3351 | | - boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime()); |
3352 | | - result.setCheckResult(diffResult.toString()); |
3353 | | - result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3354 | | - return result; |
3355 | | - } |
3356 | | - int successNum = 0; |
3357 | | - int checkSize = 0; |
3358 | | - for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { |
3359 | | - boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime()); |
3360 | | - successNum += checkResult ? 1 : 0; |
3361 | | - checkSize++; |
3362 | | - } |
3363 | | - // check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.) |
3364 | | - // check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready |
3365 | | - boolean checkReady = successNum == checkSize; |
3366 | | - String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) : |
3367 | | - String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size()); |
3368 | | - diffResult.append("check all topic finish, ").append(checkResultString); |
3369 | | - result.setCheckResult(diffResult.toString()); |
3370 | | - result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3371 | | - } catch (Exception e) { |
3372 | | - LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); |
3373 | | - result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace())); |
3374 | | - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()); |
3375 | | - } |
3376 | | - return result; |
3377 | | - } |
3378 | | - |
3379 | | - private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, |
3380 | | - RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, |
3381 | | - long checkpointByStoreTime) { |
3382 | | - boolean processResult = true; |
3383 | | - for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { |
3384 | | - Integer queueId = queueEntry.getKey(); |
3385 | | - ConsumeQueueInterface jsonCq = queueEntry.getValue(); |
3386 | | - ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); |
3387 | | - if (printDetail) { |
3388 | | - String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", |
3389 | | - topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); |
3390 | | - diffResult.append(format).append("\n"); |
3391 | | - } |
3392 | | - |
3393 | | - long minOffsetByTime = 0L; |
3394 | | - try { |
3395 | | - minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER); |
3396 | | - } catch (Exception e) { |
3397 | | - // ignore |
3398 | | - } |
3399 | | - long minOffsetInQueue = kvCq.getMinOffsetInQueue(); |
3400 | | - long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime); |
3401 | | - long checkTo = jsonCq.getMaxOffsetInQueue() - 1; |
3402 | | - /* |
3403 | | - checkTo(maxOffsetInQueue - 1) |
3404 | | - v |
3405 | | - fileCq +------------------------------------------------------+ |
3406 | | - kvCq +----------------------------------------------+ |
3407 | | - ^ ^ |
3408 | | - minOffsetInQueue minOffsetByTime |
3409 | | - ^ |
3410 | | - checkFrom = max(minOffsetInQueue, minOffsetByTime) |
3411 | | - */ |
3412 | | - // The latest message is earlier than the check time |
3413 | | - Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo); |
3414 | | - if (fileLatestCq != null) { |
3415 | | - if (fileLatestCq.getObject2() < checkpointByStoreTime) { |
3416 | | - continue; |
3417 | | - } |
3418 | | - } |
3419 | | - for (long i = checkFrom; i <= checkTo; i++) { |
3420 | | - Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); |
3421 | | - Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); |
3422 | | - if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { |
3423 | | - LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n", |
3424 | | - topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); |
3425 | | - processResult = false; |
3426 | | - break; |
3427 | | - } |
3428 | | - } |
3429 | | - } |
3430 | | - return processResult; |
3431 | | - } |
3432 | | - |
3433 | | - private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { |
3434 | | - if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { |
3435 | | - return false; |
3436 | | - } |
3437 | | - if (cqUnit1.getSize() != cqUnit2.getSize()) { |
3438 | | - return false; |
3439 | | - } |
3440 | | - if (cqUnit1.getPos() != cqUnit2.getPos()) { |
3441 | | - return false; |
3442 | | - } |
3443 | | - if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { |
3444 | | - return false; |
3445 | | - } |
3446 | | - return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); |
| 3339 | + return ((CombineConsumeQueueStore) consumeQueueStore). |
| 3340 | + doCheckCqWriteProgress(requestHeader.getTopic(), requestHeader.getCheckStoreTime(), StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB); |
3447 | 3341 | } |
3448 | 3342 |
|
3449 | 3343 | private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) { |
|
0 commit comments