Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.

Commit 5cc2541

Browse files
authored
fix: obkv wal open (#1117)
## Rationale In some cases(such as `Manifest::do_snapshot`), `actual_next_sequence` maybe less than `start_sequence`, and it will check this then return error in OBKV WAl. In this pr, I refactor this open logic. ## Detailed Changes Refactor wal open logic to allow `actual_next_sequence` less than `start_sequence`. ## Test Plan Test by exist tests.
1 parent 1607d1d commit 5cc2541

3 files changed

Lines changed: 39 additions & 18 deletions

File tree

wal/src/kv_encoder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ pub struct MetaKeyEncoder {
263263

264264
#[derive(Clone, Debug)]
265265
pub struct MetaKey {
266-
pub region_id: u64,
266+
pub table_id: u64,
267267
}
268268

269269
impl MetaKeyEncoder {
@@ -291,7 +291,7 @@ impl Encoder<MetaKey> for MetaKeyEncoder {
291291
buf.try_put_u8(self.namespace as u8)
292292
.context(EncodeMetaKey)?;
293293
buf.try_put_u8(self.key_type as u8).context(EncodeMetaKey)?;
294-
buf.try_put_u64(meta_key.region_id).context(EncodeMetaKey)?;
294+
buf.try_put_u64(meta_key.table_id).context(EncodeMetaKey)?;
295295
buf.try_put_u8(self.version).context(EncodeMetaKey)?;
296296

297297
Ok(())
@@ -326,7 +326,7 @@ impl Decoder<MetaKey> for MetaKeyEncoder {
326326
}
327327
);
328328

329-
let region_id = buf.try_get_u64().context(DecodeMetaKey)?;
329+
let table_id = buf.try_get_u64().context(DecodeMetaKey)?;
330330

331331
// check version
332332
let version = buf.try_get_u8().context(DecodeMetaKey)?;
@@ -338,7 +338,7 @@ impl Decoder<MetaKey> for MetaKeyEncoder {
338338
}
339339
);
340340

341-
Ok(MetaKey { region_id })
341+
Ok(MetaKey { table_id })
342342
}
343343
}
344344

wal/src/rocks_impl/manager.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl TableUnit {
134134
.context(Delete)?;
135135

136136
// Update the max sequence number.
137-
let meta_key = MetaKey { region_id: self.id };
137+
let meta_key = MetaKey { table_id: self.id };
138138
let meta_value = MaxSeqMetaValue { max_seq };
139139
let (mut meta_key_buf, mut meta_value_buf) = (BytesMut::new(), BytesMut::new());
140140
self.max_seq_meta_encoding
@@ -440,7 +440,7 @@ impl RocksImpl {
440440
&self,
441441
table_max_seqs: &mut HashMap<TableId, SequenceNumber>,
442442
) -> Result<()> {
443-
let meta_key = MetaKey { region_id: 0 };
443+
let meta_key = MetaKey { table_id: 0 };
444444
let mut start_boundary_key_buf = BytesMut::new();
445445
self.max_seq_meta_encoding
446446
.encode_key(&mut start_boundary_key_buf, &meta_key)?;
@@ -462,9 +462,24 @@ impl RocksImpl {
462462

463463
let meta_key = self.max_seq_meta_encoding.decode_key(iter.key())?;
464464
let meta_value = self.max_seq_meta_encoding.decode_value(iter.value())?;
465+
#[rustfmt::skip]
466+
// FIXME: In some cases, the `flushed sequence`
467+
// may be greater than the `actual last sequence of written logs`.
468+
//
469+
// Such as following case:
470+
// + Write wal logs failed(last sequence stored in memory will increase when write failed).
471+
// + Get last sequence from memory(greater then actual last sequence now).
472+
// + Mark the got last sequence as flushed sequence.
465473
table_max_seqs
466-
.entry(meta_key.region_id)
474+
.entry(meta_key.table_id)
467475
.and_modify(|v| {
476+
if meta_value.max_seq > *v {
477+
warn!(
478+
"RocksDB WAL found flushed_seq greater than actual_last_sequence,
479+
flushed_sequence:{}, actual_last_sequence:{}, table_id:{}",
480+
meta_value.max_seq, *v, meta_key.table_id
481+
);
482+
}
468483
*v = meta_value.max_seq.max(*v);
469484
})
470485
.or_insert(meta_value.max_seq);

wal/src/table_kv_impl/table_unit.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::{
1616
use bytes_ext::BytesMut;
1717
use common_types::table::TableId;
1818
use generic_error::{BoxError, GenericError};
19-
use log::debug;
19+
use log::{debug, warn};
2020
use macros::define_result;
2121
use runtime::{self, Runtime};
2222
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
@@ -462,16 +462,22 @@ impl TableUnit {
462462
region_id,
463463
table_id,
464464
)? {
465-
ensure!(
466-
sequence + 1 >= start_sequence,
467-
LoadLastSequence {
468-
table_id,
469-
msg: format!(
470-
"found last sequence, but last sequence + 1 < start sequence,
471-
last sequence:{sequence}, start sequence:{start_sequence}",
472-
)
473-
}
474-
);
465+
#[rustfmt::skip]
466+
// FIXME: In some cases, the `flushed sequence`
467+
// may be greater than the `actual last sequence of written logs`.
468+
//
469+
// Such as following case:
470+
// + Write wal logs failed(last sequence stored in memory will increase when write failed).
471+
// + Get last sequence from memory(greater then actual last sequence now).
472+
// + Mark the got last sequence as flushed sequence.
473+
let actual_next_sequence = sequence + 1;
474+
if actual_next_sequence < start_sequence {
475+
warn!("TableKv WAL found start_sequence greater than actual_next_sequence,
476+
start_sequence:{start_sequence}, actual_next_sequence:{actual_next_sequence}, table_id:{table_id}, region_id:{region_id}");
477+
478+
break;
479+
}
480+
475481
return Ok(sequence);
476482
}
477483
}

0 commit comments

Comments
 (0)