Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.

Commit 1607d1d

Browse files
authored
feat: query requests dedup (#1100)
## Rationale In order to reduce the execution of the same query requests,then reduce memory consumption when querying. ## Detailed Changes - Use hashmap to dedup query requests. - Only the first query request will be execute, and other same query requests will wait for the execution result of the first request. ## Test Plan Exsiting tests
1 parent 951b2f7 commit 1607d1d

12 files changed

Lines changed: 339 additions & 27 deletions

File tree

common_types/src/column.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub enum Error {
8787

8888
pub type Result<T> = std::result::Result<T, Error>;
8989

90-
#[derive(Debug)]
90+
#[derive(Debug, Clone)]
9191
pub struct NullColumn(NullArray);
9292

9393
impl NullColumn {
@@ -109,7 +109,7 @@ impl NullColumn {
109109
macro_rules! define_numeric_column {
110110
($($Kind: ident), *) => {
111111
$(paste! {
112-
#[derive(Debug)]
112+
#[derive(Debug, Clone)]
113113
pub struct [<$Kind Column>]([<$Kind Array>]);
114114

115115
#[inline]
@@ -131,24 +131,24 @@ define_numeric_column!(
131131
Float, Double, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, Boolean
132132
);
133133

134-
#[derive(Debug)]
134+
#[derive(Debug, Clone)]
135135
pub struct TimestampColumn(TimestampMillisecondArray);
136136

137-
#[derive(Debug)]
137+
#[derive(Debug, Clone)]
138138
pub struct VarbinaryColumn(BinaryArray);
139139

140-
#[derive(Debug)]
140+
#[derive(Debug, Clone)]
141141
pub struct StringColumn(StringArray);
142142

143143
/// dictionary encode type is difference from other types, need implement
144144
/// without macro
145-
#[derive(Debug)]
145+
#[derive(Debug, Clone)]
146146
pub struct StringDictionaryColumn(DictionaryArray<Int32Type>);
147147

148-
#[derive(Debug)]
148+
#[derive(Debug, Clone)]
149149
pub struct DateColumn(DateArray);
150150

151-
#[derive(Debug)]
151+
#[derive(Debug, Clone)]
152152
pub struct TimeColumn(TimeArray);
153153

154154
#[inline]
@@ -749,7 +749,7 @@ impl_column_block!(
749749
macro_rules! define_column_block {
750750
($($Kind: ident), *) => {
751751
paste! {
752-
#[derive(Debug)]
752+
#[derive(Debug, Clone)]
753753
pub enum ColumnBlock {
754754
Null(NullColumn),
755755
StringDictionary(StringDictionaryColumn),

common_types/src/projected_schema.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl ProjectedSchema {
134134
self.0.is_all_projection()
135135
}
136136

137+
pub fn projection(&self) -> Option<Vec<usize>> {
138+
self.0.projection()
139+
}
140+
137141
/// Returns the [RowProjector] to project the rows with source schema to
138142
/// rows with [RecordSchemaWithKey].
139143
///
@@ -260,6 +264,10 @@ impl ProjectedSchemaInner {
260264
self.projection.is_none()
261265
}
262266

267+
fn projection(&self) -> Option<Vec<usize>> {
268+
self.projection.clone()
269+
}
270+
263271
// TODO(yingwen): We can fill missing not null column with default value instead
264272
// of returning error.
265273
fn try_project_with_key(&self, source_schema: &Schema) -> Result<RowProjector> {

common_types/src/record_batch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub enum Error {
102102

103103
pub type Result<T> = std::result::Result<T, Error>;
104104

105-
#[derive(Debug)]
105+
#[derive(Debug, Clone)]
106106
pub struct RecordBatchData {
107107
arrow_record_batch: ArrowRecordBatch,
108108
column_blocks: Vec<ColumnBlock>,
@@ -192,7 +192,7 @@ impl TryFrom<ArrowRecordBatch> for RecordBatchData {
192192

193193
// TODO(yingwen): The schema in RecordBatch should be much simple because it may
194194
// lack some information.
195-
#[derive(Debug)]
195+
#[derive(Debug, Clone)]
196196
pub struct RecordBatch {
197197
schema: RecordSchema,
198198
data: RecordBatchData,

server/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ pub struct ServerConfig {
121121

122122
/// Config of remote engine client
123123
pub remote_client: remote_engine_client::Config,
124+
125+
/// Whether to deduplicate requests
126+
pub enable_query_dedup: bool,
124127
}
125128

126129
impl Default for ServerConfig {
@@ -140,6 +143,7 @@ impl Default for ServerConfig {
140143
route_cache: router::RouteCacheConfig::default(),
141144
hotspot: hotspot::Config::default(),
142145
remote_client: remote_engine_client::Config::default(),
146+
enable_query_dedup: false,
143147
}
144148
}
145149
}

server/src/dedup_requests.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
2+
3+
use std::{collections::HashMap, hash::Hash, sync::RwLock};
4+
5+
use tokio::sync::mpsc::Sender;
6+
7+
type Notifier<T> = Sender<T>;
8+
9+
#[derive(Debug)]
10+
struct Notifiers<T> {
11+
notifiers: RwLock<Vec<Notifier<T>>>,
12+
}
13+
14+
impl<T> Notifiers<T> {
15+
pub fn new(notifier: Notifier<T>) -> Self {
16+
let notifiers = vec![notifier];
17+
Self {
18+
notifiers: RwLock::new(notifiers),
19+
}
20+
}
21+
22+
pub fn add_notifier(&self, notifier: Notifier<T>) {
23+
self.notifiers.write().unwrap().push(notifier);
24+
}
25+
}
26+
27+
#[derive(Debug)]
28+
pub struct RequestNotifiers<K, T>
29+
where
30+
K: PartialEq + Eq + Hash,
31+
{
32+
notifiers_by_key: RwLock<HashMap<K, Notifiers<T>>>,
33+
}
34+
35+
impl<K, T> Default for RequestNotifiers<K, T>
36+
where
37+
K: PartialEq + Eq + Hash,
38+
{
39+
fn default() -> Self {
40+
Self {
41+
notifiers_by_key: RwLock::new(HashMap::new()),
42+
}
43+
}
44+
}
45+
46+
impl<K, T> RequestNotifiers<K, T>
47+
where
48+
K: PartialEq + Eq + Hash,
49+
{
50+
/// Insert a notifier for the given key.
51+
pub fn insert_notifier(&self, key: K, notifier: Notifier<T>) -> RequestResult {
52+
// First try to read the notifiers, if the key exists, add the notifier to the
53+
// notifiers.
54+
let notifiers_by_key = self.notifiers_by_key.read().unwrap();
55+
if let Some(notifiers) = notifiers_by_key.get(&key) {
56+
notifiers.add_notifier(notifier);
57+
return RequestResult::Wait;
58+
}
59+
drop(notifiers_by_key);
60+
61+
// If the key does not exist, try to write the notifiers.
62+
let mut notifiers_by_key = self.notifiers_by_key.write().unwrap();
63+
// double check, if the key exists, add the notifier to the notifiers.
64+
if let Some(notifiers) = notifiers_by_key.get(&key) {
65+
notifiers.add_notifier(notifier);
66+
return RequestResult::Wait;
67+
}
68+
69+
//the key is not existed, insert the key and the notifier.
70+
notifiers_by_key.insert(key, Notifiers::new(notifier));
71+
RequestResult::First
72+
}
73+
74+
/// Take the notifiers for the given key, and remove the key from the map.
75+
pub fn take_notifiers(&self, key: &K) -> Option<Vec<Notifier<T>>> {
76+
self.notifiers_by_key
77+
.write()
78+
.unwrap()
79+
.remove(key)
80+
.map(|notifiers| notifiers.notifiers.into_inner().unwrap())
81+
}
82+
}
83+
84+
pub enum RequestResult {
85+
// The first request for this key, need to handle this request.
86+
First,
87+
// There are other requests for this key, just wait for the result.
88+
Wait,
89+
}

server/src/grpc/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ make_auto_flush_static_metric! {
4444
write_succeeded_row,
4545
write_failed_row,
4646
query_succeeded_row,
47+
dedupped_stream_query,
4748
}
4849

4950
pub struct RemoteEngineGrpcHandlerCounterVec: LocalIntCounter {

server/src/grpc/mod.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use ceresdbproto::{
1616
storage::storage_service_server::StorageServiceServer,
1717
};
1818
use cluster::ClusterRef;
19-
use common_types::column_schema;
19+
use common_types::{column_schema, record_batch::RecordBatch};
2020
use futures::FutureExt;
2121
use generic_error::GenericError;
2222
use log::{info, warn};
@@ -34,9 +34,13 @@ use table_engine::engine::EngineRuntimes;
3434
use tokio::sync::oneshot::{self, Sender};
3535
use tonic::transport::Server;
3636

37-
use crate::grpc::{
38-
meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl,
39-
storage_service::StorageServiceImpl,
37+
use crate::{
38+
dedup_requests::RequestNotifiers,
39+
grpc::{
40+
meta_event_service::MetaServiceImpl,
41+
remote_engine_service::{error, RemoteEngineServiceImpl, StreamReadReqKey},
42+
storage_service::StorageServiceImpl,
43+
},
4044
};
4145

4246
mod meta_event_service;
@@ -196,6 +200,7 @@ pub struct Builder<Q> {
196200
cluster: Option<ClusterRef>,
197201
opened_wals: Option<OpenedWals>,
198202
proxy: Option<Arc<Proxy<Q>>>,
203+
request_notifiers: Option<Arc<RequestNotifiers<StreamReadReqKey, error::Result<RecordBatch>>>>,
199204
}
200205

201206
impl<Q> Builder<Q> {
@@ -208,6 +213,7 @@ impl<Q> Builder<Q> {
208213
cluster: None,
209214
opened_wals: None,
210215
proxy: None,
216+
request_notifiers: None,
211217
}
212218
}
213219

@@ -246,6 +252,13 @@ impl<Q> Builder<Q> {
246252
self.proxy = Some(proxy);
247253
self
248254
}
255+
256+
pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self {
257+
if enable_query_dedup {
258+
self.request_notifiers = Some(Arc::new(RequestNotifiers::default()));
259+
}
260+
self
261+
}
249262
}
250263

251264
impl<Q: QueryExecutor + 'static> Builder<Q> {
@@ -269,6 +282,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
269282
let service = RemoteEngineServiceImpl {
270283
instance,
271284
runtimes: runtimes.clone(),
285+
request_notifiers: self.request_notifiers,
272286
};
273287
RemoteEngineServiceServer::new(service)
274288
};

0 commit comments

Comments
 (0)