Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.

Commit 951b2f7

Browse files
authored
chore: remove space_and_table from the table_impl (#1114)
## Rationale Some fields are not necessary in the `TableImpl`, so let's remove them. ## Detailed Changes Remove verbose fields in the `TableImpl`. ## Test Plan Existing tests.
1 parent 1a47fd9 commit 951b2f7

5 files changed

Lines changed: 30 additions & 43 deletions

File tree

analytic_engine/src/instance/read.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use crate::{
3737
merge::{MergeBuilder, MergeConfig, MergeIterator},
3838
IterOptions, RecordBatchWithKeyIterator,
3939
},
40-
space::SpaceAndTable,
4140
sst::factory::{ReadFrequency, SstReadOptions},
4241
table::{
4342
data::TableData,
@@ -85,20 +84,15 @@ impl Instance {
8584
/// `read_parallelism` output streams.
8685
pub async fn partitioned_read_from_table(
8786
&self,
88-
space_table: &SpaceAndTable,
87+
table_data: &TableData,
8988
request: ReadRequest,
9089
) -> Result<PartitionedStreams> {
9190
debug!(
9291
"Instance read from table, space_id:{}, table:{}, table_id:{:?}, request:{:?}",
93-
space_table.space().id,
94-
space_table.table_data().name,
95-
space_table.table_data().id,
96-
request
92+
table_data.space_id, table_data.name, table_data.id, request
9793
);
9894

99-
let table_data = space_table.table_data();
10095
let table_options = table_data.table_options();
101-
10296
// Collect metrics.
10397
table_data.metrics.on_read_request_begin();
10498
let need_merge_sort = need_merge_sort_streams(&table_options, &request);

analytic_engine/src/instance/write.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
},
2727
memtable::{key::KeySequence, PutContext},
2828
payload::WritePayload,
29-
space::{SpaceAndTable, SpaceRef},
29+
space::SpaceRef,
3030
table::{data::TableDataRef, version::MemTableForWrite},
3131
};
3232

@@ -249,15 +249,17 @@ pub struct Writer<'a> {
249249
impl<'a> Writer<'a> {
250250
pub fn new(
251251
instance: InstanceRef,
252-
space_table: SpaceAndTable,
252+
space: SpaceRef,
253+
table_data: TableDataRef,
253254
serial_exec: &'a mut TableOpSerialExecutor,
254255
) -> Writer<'a> {
255-
assert_eq!(space_table.table_data().id, serial_exec.table_id());
256+
// Ensure the writer has permission to handle the write of the table.
257+
assert_eq!(table_data.id, serial_exec.table_id());
256258

257259
Self {
258260
instance,
259-
space: space_table.space().clone(),
260-
table_data: space_table.table_data().clone(),
261+
space,
262+
table_data,
261263
serial_exec,
262264
}
263265
}

analytic_engine/src/memtable/factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct Options {
2424
pub arena_block_size: u32,
2525
/// Log sequence at the memtable creation.
2626
pub creation_sequence: SequenceNumber,
27-
/// Memory usage colllector
27+
/// Memory usage collector
2828
pub collector: CollectorRef,
2929
}
3030

analytic_engine/src/space.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use crate::{
1919
table::data::{TableDataRef, TableDataSet},
2020
};
2121

22+
pub type SpaceId = u32;
23+
2224
/// Holds references to the table data and its space
2325
///
2426
/// REQUIRE: The table must belongs to the space
@@ -69,10 +71,6 @@ impl fmt::Debug for SpaceAndTable {
6971
}
7072
}
7173

72-
/// Space id
73-
// TODO(yingwen): Or just use something like uuid as space id?
74-
pub type SpaceId = u32;
75-
7674
#[derive(Debug)]
7775
pub struct SpaceContext {
7876
/// Catalog name

analytic_engine/src/table/mod.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use trace_metric::MetricsCollector;
3838
use self::data::TableDataRef;
3939
use crate::{
4040
instance::{alter::Alterer, write::Writer, InstanceRef},
41-
space::{SpaceAndTable, SpaceId},
41+
space::{SpaceAndTable, SpaceRef},
4242
};
4343

4444
pub mod data;
@@ -53,21 +53,21 @@ const GET_METRICS_COLLECTOR_NAME: &str = "get";
5353
const ADDITIONAL_PENDING_WRITE_CAP_RATIO: usize = 10;
5454

5555
struct WriteRequests {
56-
pub space_table: SpaceAndTable,
57-
pub instance: InstanceRef,
56+
pub space: SpaceRef,
5857
pub table_data: TableDataRef,
58+
pub instance: InstanceRef,
5959
pub pending_writes: Arc<Mutex<PendingWriteQueue>>,
6060
}
6161

6262
impl WriteRequests {
6363
pub fn new(
64-
space_table: SpaceAndTable,
6564
instance: InstanceRef,
65+
space: SpaceRef,
6666
table_data: TableDataRef,
6767
pending_writes: Arc<Mutex<PendingWriteQueue>>,
6868
) -> Self {
6969
Self {
70-
space_table,
70+
space,
7171
instance,
7272
table_data,
7373
pending_writes,
@@ -77,15 +77,12 @@ impl WriteRequests {
7777

7878
/// Table trait implementation
7979
pub struct TableImpl {
80-
space_table: SpaceAndTable,
80+
space: SpaceRef,
8181
/// Instance
8282
instance: InstanceRef,
8383
/// Engine type
8484
engine_type: String,
8585

86-
space_id: SpaceId,
87-
table_id: TableId,
88-
8986
/// Holds a strong reference to prevent the underlying table from being
9087
/// dropped when this handle exist.
9188
table_data: TableDataRef,
@@ -98,13 +95,11 @@ impl TableImpl {
9895
pub fn new(instance: InstanceRef, space_table: SpaceAndTable) -> Self {
9996
let pending_writes = Mutex::new(PendingWriteQueue::new(instance.max_rows_in_write_queue));
10097
let table_data = space_table.table_data().clone();
101-
let space_id = space_table.space().space_id();
98+
let space = space_table.space().clone();
10299
Self {
103-
space_table,
100+
space,
104101
instance,
105102
engine_type: ANALYTIC_ENGINE_TYPE.to_string(),
106-
space_id,
107-
table_id: table_data.id,
108103
table_data,
109104
pending_writes: Arc::new(pending_writes),
110105
}
@@ -114,8 +109,8 @@ impl TableImpl {
114109
impl fmt::Debug for TableImpl {
115110
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116111
f.debug_struct("TableImpl")
117-
.field("space_id", &self.space_id)
118-
.field("table_id", &self.table_id)
112+
.field("space_id", &self.space.id)
113+
.field("table_id", &self.table_data.id)
119114
.finish()
120115
}
121116
}
@@ -285,8 +280,8 @@ impl TableImpl {
285280
// take responsibilities for merging and writing the
286281
// requests in the queue.
287282
let write_requests = WriteRequests::new(
288-
self.space_table.clone(),
289283
self.instance.clone(),
284+
self.space.clone(),
290285
self.table_data.clone(),
291286
self.pending_writes.clone(),
292287
);
@@ -341,7 +336,8 @@ impl TableImpl {
341336

342337
let mut writer = Writer::new(
343338
write_requests.instance,
344-
write_requests.space_table,
339+
write_requests.space,
340+
write_requests.table_data.clone(),
345341
&mut serial_exec,
346342
);
347343
let write_res = writer
@@ -424,11 +420,7 @@ impl Table for TableImpl {
424420
}
425421

426422
async fn write(&self, request: WriteRequest) -> Result<usize> {
427-
let _timer = self
428-
.space_table
429-
.table_data()
430-
.metrics
431-
.start_table_total_timer();
423+
let _timer = self.table_data.metrics.start_table_total_timer();
432424

433425
if self.should_queue_write_request(&request) {
434426
return self.write_with_pending_queue(request).await;
@@ -437,7 +429,8 @@ impl Table for TableImpl {
437429
let mut serial_exec = self.table_data.serial_exec.lock().await;
438430
let mut writer = Writer::new(
439431
self.instance.clone(),
440-
self.space_table.clone(),
432+
self.space.clone(),
433+
self.table_data.clone(),
441434
&mut serial_exec,
442435
);
443436
writer
@@ -451,7 +444,7 @@ impl Table for TableImpl {
451444
request.opts.read_parallelism = 1;
452445
let mut streams = self
453446
.instance
454-
.partitioned_read_from_table(&self.space_table, request)
447+
.partitioned_read_from_table(&self.table_data, request)
455448
.await
456449
.box_err()
457450
.context(Scan { table: self.name() })?;
@@ -542,7 +535,7 @@ impl Table for TableImpl {
542535
async fn partitioned_read(&self, request: ReadRequest) -> Result<PartitionedStreams> {
543536
let streams = self
544537
.instance
545-
.partitioned_read_from_table(&self.space_table, request)
538+
.partitioned_read_from_table(&self.table_data, request)
546539
.await
547540
.box_err()
548541
.context(Scan { table: self.name() })?;

0 commit comments

Comments
 (0)