@@ -320,15 +320,15 @@ public boolean getLastMappedFile(final long startOffset) {
320320 *
321321 * @throws RocksDBException only in rocksdb mode
322322 */
323- public void recoverNormally (long maxPhyOffsetOfConsumeQueue ) throws RocksDBException {
323+ public void recoverNormally (long dispatchFromPhyOffset ) throws RocksDBException {
324324 boolean checkCRCOnRecover = this .defaultMessageStore .getMessageStoreConfig ().isCheckCRCOnRecover ();
325325 boolean checkDupInfo = this .defaultMessageStore .getMessageStoreConfig ().isDuplicationEnable ();
326326 final List <MappedFile > mappedFiles = this .mappedFileQueue .getMappedFiles ();
327327 if (!mappedFiles .isEmpty ()) {
328328 int index = mappedFiles .size () - 1 ;
329329 while (index > 0 ) {
330330 MappedFile mappedFile = mappedFiles .get (index );
331- if (mappedFile . getFileFromOffset () <= maxPhyOffsetOfConsumeQueue ) {
331+ if (isMappedFileMatchedRecover ( mappedFile , true ) ) {
332332 // It's safe to recover from this mapped file
333333 break ;
334334 }
@@ -344,7 +344,7 @@ public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExcep
344344 while (true ) {
345345 DispatchRequest dispatchRequest = this .checkMessageAndReturnSize (byteBuffer , checkCRCOnRecover , checkDupInfo );
346346 int size = dispatchRequest .getMsgSize ();
347- boolean doDispatch = dispatchRequest .getCommitLogOffset () > maxPhyOffsetOfConsumeQueue ;
347+ boolean doDispatch = dispatchRequest .getCommitLogOffset () > dispatchFromPhyOffset ;
348348 // Normal data
349349 if (dispatchRequest .isSuccess () && size > 0 ) {
350350 lastValidMsgPhyOffset = processOffset + mappedFileOffset ;
@@ -394,10 +394,7 @@ else if (!dispatchRequest.isSuccess()) {
394394 }
395395
396396 // Clear ConsumeQueue redundant data
397- if (maxPhyOffsetOfConsumeQueue >= processOffset ) {
398- log .warn ("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files" , maxPhyOffsetOfConsumeQueue , processOffset );
399- this .defaultMessageStore .truncateDirtyLogicFiles (processOffset );
400- }
397+ this .defaultMessageStore .truncateDirtyLogicFiles (processOffset );
401398
402399 this .mappedFileQueue .setFlushedWhere (processOffset );
403400 this .mappedFileQueue .setCommittedWhere (processOffset );
@@ -715,7 +712,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
715712 MappedFile mappedFile = null ;
716713 for (; index >= 0 ; index --) {
717714 mappedFile = mappedFiles .get (index );
718- if (this .isMappedFileMatchedRecover (mappedFile )) {
715+ if (this .isMappedFileMatchedRecover (mappedFile , false )) {
719716 log .info ("recover from this mapped file " + mappedFile .getFileName ());
720717 break ;
721718 }
@@ -802,10 +799,7 @@ else if (size == 0) {
802799 }
803800
804801 // Clear ConsumeQueue redundant data
805- if (maxPhyOffsetOfConsumeQueue >= processOffset ) {
806- log .warn ("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files" , maxPhyOffsetOfConsumeQueue , processOffset );
807- this .defaultMessageStore .truncateDirtyLogicFiles (processOffset );
808- }
802+ this .defaultMessageStore .truncateDirtyLogicFiles (processOffset );
809803
810804 this .mappedFileQueue .setFlushedWhere (processOffset );
811805 this .mappedFileQueue .setCommittedWhere (processOffset );
@@ -839,7 +833,8 @@ protected void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult
839833 this .getMessageStore ().onCommitLogAppend (msg , result , commitLogFile );
840834 }
841835
842- private boolean isMappedFileMatchedRecover (final MappedFile mappedFile ) throws RocksDBException {
836+ private boolean isMappedFileMatchedRecover (final MappedFile mappedFile ,
837+ boolean recoverNormally ) throws RocksDBException {
843838 ByteBuffer byteBuffer = mappedFile .sliceByteBuffer ();
844839
845840 int magicCode = byteBuffer .getInt (MessageDecoder .MESSAGE_MAGIC_CODE_POSITION );
@@ -854,16 +849,16 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) throws R
854849 if (0 == storeTimestamp ) {
855850 return false ;
856851 }
852+ long phyOffset = byteBuffer .getLong (MessageDecoder .MESSAGE_PHYSIC_OFFSET_POSITION );
857853
858- if (this .defaultMessageStore .getMessageStoreConfig ().isMessageIndexEnable ()
859- && this .defaultMessageStore .getMessageStoreConfig ().isMessageIndexSafe ()
860- && storeTimestamp > this .defaultMessageStore .getStoreCheckpoint ().getIndexMsgTimestamp ()) {
854+ if (this .defaultMessageStore .getMessageStoreConfig ().isMessageIndexEnable () &&
855+ this .defaultMessageStore .getMessageStoreConfig ().isMessageIndexSafe () &&
856+ storeTimestamp > this .defaultMessageStore .getStoreCheckpoint ().getIndexMsgTimestamp ()) {
861857 return false ;
862858 }
863859
864- long phyOffset = byteBuffer .getLong (MessageDecoder .MESSAGE_PHYSIC_OFFSET_POSITION );
865860 return this .defaultMessageStore .getQueueStore ()
866- .isMappedFileMatchedRecover (phyOffset , storeTimestamp );
861+ .isMappedFileMatchedRecover (phyOffset , storeTimestamp , recoverNormally );
867862 }
868863
869864 public boolean resetOffset (long offset ) {
0 commit comments