|
72 | 72 | import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; |
73 | 73 | import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; |
74 | 74 | import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; |
75 | | -import org.apache.rocketmq.common.BoundaryType; |
76 | 75 | import org.apache.rocketmq.common.BrokerConfig; |
77 | 76 | import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; |
78 | 77 | import org.apache.rocketmq.common.KeyBuilder; |
79 | 78 | import org.apache.rocketmq.common.LockCallback; |
80 | 79 | import org.apache.rocketmq.common.MQVersion; |
81 | 80 | import org.apache.rocketmq.common.MixAll; |
82 | | -import org.apache.rocketmq.common.Pair; |
83 | 81 | import org.apache.rocketmq.common.PlainAccessConfig; |
84 | 82 | import org.apache.rocketmq.common.TopicAttributes; |
85 | 83 | import org.apache.rocketmq.common.TopicConfig; |
|
223 | 221 | import org.apache.rocketmq.store.MessageStore; |
224 | 222 | import org.apache.rocketmq.store.PutMessageResult; |
225 | 223 | import org.apache.rocketmq.store.PutMessageStatus; |
226 | | -import org.apache.rocketmq.store.RocksDBMessageStore; |
227 | 224 | import org.apache.rocketmq.store.SelectMappedBufferResult; |
228 | 225 | import org.apache.rocketmq.store.StoreType; |
229 | 226 | import org.apache.rocketmq.store.config.BrokerRole; |
230 | 227 | import org.apache.rocketmq.store.exception.ConsumeQueueException; |
231 | 228 | import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; |
| 229 | +import org.apache.rocketmq.store.queue.CombineConsumeQueueStore; |
232 | 230 | import org.apache.rocketmq.store.queue.ConsumeQueueInterface; |
| 231 | +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; |
233 | 232 | import org.apache.rocketmq.store.queue.CqUnit; |
234 | 233 | import org.apache.rocketmq.store.queue.ReferredIterator; |
235 | 234 | import org.apache.rocketmq.store.timer.TimerCheckpoint; |
@@ -3479,129 +3478,24 @@ private boolean validateBlackListConfigExist(Properties properties) { |
3479 | 3478 | private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, |
3480 | 3479 | RemotingCommand request) throws RemotingCommandException { |
3481 | 3480 | CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); |
3482 | | - String requestTopic = requestHeader.getTopic(); |
3483 | 3481 | MessageStore messageStore = brokerController.getMessageStore(); |
3484 | 3482 | DefaultMessageStore defaultMessageStore; |
3485 | 3483 | if (messageStore instanceof AbstractPluginMessageStore) { |
3486 | 3484 | defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); |
3487 | 3485 | } else { |
3488 | 3486 | defaultMessageStore = (DefaultMessageStore) messageStore; |
3489 | 3487 | } |
3490 | | - RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); |
3491 | | - CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3488 | + ConsumeQueueStoreInterface consumeQueueStore = defaultMessageStore.getQueueStore(); |
3492 | 3489 |
|
3493 | | - if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) { |
3494 | | - result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check"); |
| 3490 | + if (!(consumeQueueStore instanceof CombineConsumeQueueStore)) { |
| 3491 | + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); |
| 3492 | + result.setCheckResult("is not CombineConsumeQueueStore, no need check"); |
3495 | 3493 | result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue()); |
3496 | 3494 | return result; |
3497 | 3495 | } |
3498 | 3496 |
|
3499 | | - if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { |
3500 | | - result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"); |
3501 | | - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3502 | | - return result; |
3503 | | - } |
3504 | | - |
3505 | | - ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable(); |
3506 | | - StringBuilder diffResult = new StringBuilder(); |
3507 | | - try { |
3508 | | - if (StringUtils.isNotBlank(requestTopic)) { |
3509 | | - boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime()); |
3510 | | - result.setCheckResult(diffResult.toString()); |
3511 | | - result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3512 | | - return result; |
3513 | | - } |
3514 | | - int successNum = 0; |
3515 | | - int checkSize = 0; |
3516 | | - for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) { |
3517 | | - boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime()); |
3518 | | - successNum += checkResult ? 1 : 0; |
3519 | | - checkSize++; |
3520 | | - } |
3521 | | - // check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.) |
3522 | | - // check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready |
3523 | | - boolean checkReady = successNum == checkSize; |
3524 | | - String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) : |
3525 | | - String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size()); |
3526 | | - diffResult.append("check all topic finish, ").append(checkResultString); |
3527 | | - result.setCheckResult(diffResult.toString()); |
3528 | | - result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue()); |
3529 | | - } catch (Exception e) { |
3530 | | - LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); |
3531 | | - result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace())); |
3532 | | - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()); |
3533 | | - } |
3534 | | - return result; |
3535 | | - } |
3536 | | - |
3537 | | - private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, |
3538 | | - RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, |
3539 | | - long checkpointByStoreTime) { |
3540 | | - boolean processResult = true; |
3541 | | - for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) { |
3542 | | - Integer queueId = queueEntry.getKey(); |
3543 | | - ConsumeQueueInterface jsonCq = queueEntry.getValue(); |
3544 | | - ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); |
3545 | | - if (printDetail) { |
3546 | | - String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", |
3547 | | - topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); |
3548 | | - diffResult.append(format).append("\n"); |
3549 | | - } |
3550 | | - |
3551 | | - long minOffsetByTime = 0L; |
3552 | | - try { |
3553 | | - minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER); |
3554 | | - } catch (Exception e) { |
3555 | | - // ignore |
3556 | | - } |
3557 | | - long minOffsetInQueue = kvCq.getMinOffsetInQueue(); |
3558 | | - long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime); |
3559 | | - long checkTo = jsonCq.getMaxOffsetInQueue() - 1; |
3560 | | - /* |
3561 | | - checkTo(maxOffsetInQueue - 1) |
3562 | | - v |
3563 | | - fileCq +------------------------------------------------------+ |
3564 | | - kvCq +----------------------------------------------+ |
3565 | | - ^ ^ |
3566 | | - minOffsetInQueue minOffsetByTime |
3567 | | - ^ |
3568 | | - checkFrom = max(minOffsetInQueue, minOffsetByTime) |
3569 | | - */ |
3570 | | - // The latest message is earlier than the check time |
3571 | | - Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo); |
3572 | | - if (fileLatestCq != null) { |
3573 | | - if (fileLatestCq.getObject2() < checkpointByStoreTime) { |
3574 | | - continue; |
3575 | | - } |
3576 | | - } |
3577 | | - for (long i = checkFrom; i <= checkTo; i++) { |
3578 | | - Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); |
3579 | | - Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i); |
3580 | | - if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { |
3581 | | - LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n", |
3582 | | - topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); |
3583 | | - processResult = false; |
3584 | | - break; |
3585 | | - } |
3586 | | - } |
3587 | | - } |
3588 | | - return processResult; |
3589 | | - } |
3590 | | - |
3591 | | - private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { |
3592 | | - if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { |
3593 | | - return false; |
3594 | | - } |
3595 | | - if (cqUnit1.getSize() != cqUnit2.getSize()) { |
3596 | | - return false; |
3597 | | - } |
3598 | | - if (cqUnit1.getPos() != cqUnit2.getPos()) { |
3599 | | - return false; |
3600 | | - } |
3601 | | - if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { |
3602 | | - return false; |
3603 | | - } |
3604 | | - return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); |
| 3497 | + return ((CombineConsumeQueueStore) consumeQueueStore). |
| 3498 | + doCheckCqWriteProgress(requestHeader.getTopic(), requestHeader.getCheckStoreTime(), StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB); |
3605 | 3499 | } |
3606 | 3500 |
|
3607 | 3501 | private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) { |
|
0 commit comments