Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.

Commit 8f9698c

Browse files
authored
refactor: remove async remote write decode logic (#1631)
## Rationale Deserialization is a CPU-intensive task, so introducing async does not provide significant benefits. In addition, using async causes function contagion issues. Therefore, it is necessary to remove the async decode logic. ## Detailed Changes - Remove `decode_async` interface - Change the back-end object pool dependency from `deadpool` to `object-pool` - Change benchmark logic accordingly ## Test Plan Manual test
1 parent 9cec563 commit 8f9698c

17 files changed

+1093
-875
lines changed

Cargo.lock

Lines changed: 758 additions & 139 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ columnar_storage = { path = "src/columnar_storage" }
5151
common = { path = "src/common" }
5252
criterion = "0.5"
5353
datafusion = "43"
54-
deadpool = "0.10"
5554
futures = "0.3"
55+
hotpath = "0.5.2"
5656
itertools = "0.3"
5757
lazy_static = "1"
5858
metric_engine = { path = "src/metric_engine" }
59+
num_cpus = "1"
60+
object-pool = "0.6"
5961
object_store = { version = "0.11" }
6062
once_cell = "1"
6163
parquet = { version = "53" }
-116 KB
Loading
-61.7 KB
Loading
-12.5 KB
Loading

src/benchmarks/Cargo.toml

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,40 +29,38 @@ description.workspace = true
2929
name = "parser_mem"
3030
path = "src/bin/parser_mem.rs"
3131

32-
[[bin]]
33-
name = "pool_stats"
34-
path = "src/bin/pool_stats.rs"
35-
3632
[features]
33+
default = ["hotpath", "hotpath-alloc-bytes-total"]
34+
hotpath = []
35+
hotpath-alloc-bytes-total = ["hotpath", "hotpath/hotpath-alloc-bytes-total"]
3736
unsafe-split = ["remote_write/unsafe-split"]
3837

3938
[dependencies]
39+
anyhow = { workspace = true }
4040
bytes = { workspace = true }
4141
columnar_storage = { workspace = true }
4242
common = { workspace = true }
43-
deadpool = { workspace = true }
43+
hotpath = { workspace = true }
44+
num_cpus = { workspace = true }
4445
pb_types = { workspace = true }
4546
prost = { workspace = true }
4647
protobuf = "3.7"
4748
quick-protobuf = "0.8"
4849
remote_write = { workspace = true }
4950
serde = { workspace = true }
5051
serde_json = { workspace = true }
51-
tikv-jemalloc-ctl = "0.5"
5252
tokio = { workspace = true }
5353
toml = { workspace = true }
5454
tracing = { workspace = true }
5555
tracing-subscriber = { workspace = true }
5656

5757
[target.'cfg(not(target_env = "msvc"))'.dependencies]
58-
tikv-jemallocator = "0.5"
5958

6059
[build-dependencies]
6160
protobuf-codegen = "3.7"
6261

6362
[dev-dependencies]
6463
criterion = { workspace = true }
65-
num_cpus = "1.16"
6664

6765
[[bench]]
6866
name = "bench"

src/benchmarks/benches/bench.rs

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ fn bench_remote_write(c: &mut Criterion) {
6464
let concurrent_scales = config.remote_write.concurrent_scales.clone();
6565
let bench = RefCell::new(RemoteWriteBench::new(config.remote_write));
6666

67-
let rt = tokio::runtime::Builder::new_multi_thread()
68-
.worker_threads(num_cpus::get())
69-
.enable_all()
70-
.build()
71-
.unwrap();
72-
7367
// Sequential parse bench.
7468
let mut group = c.benchmark_group("remote_write_sequential");
7569

@@ -85,10 +79,10 @@ fn bench_remote_write(c: &mut Criterion) {
8579

8680
group.bench_with_input(
8781
BenchmarkId::new("pooled", n),
88-
&(&bench, &rt, n),
89-
|b, (bench, rt, scale)| {
82+
&(&bench, n),
83+
|b, (bench, scale)| {
9084
let bench = bench.borrow();
91-
b.iter(|| rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap())
85+
b.iter(|| bench.pooled_parser_sequential(*scale).unwrap())
9286
},
9387
);
9488

@@ -118,43 +112,37 @@ fn bench_remote_write(c: &mut Criterion) {
118112
for &scale in &concurrent_scales {
119113
group.bench_with_input(
120114
BenchmarkId::new("prost", scale),
121-
&(&bench, &rt, scale),
122-
|b, (bench, rt, scale)| {
115+
&(&bench, scale),
116+
|b, (bench, scale)| {
123117
let bench = bench.borrow();
124-
b.iter(|| rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap())
118+
b.iter(|| bench.prost_parser_concurrent(*scale).unwrap())
125119
},
126120
);
127121

128122
group.bench_with_input(
129123
BenchmarkId::new("pooled", scale),
130-
&(&bench, &rt, scale),
131-
|b, (bench, rt, scale)| {
124+
&(&bench, scale),
125+
|b, (bench, scale)| {
132126
let bench = bench.borrow();
133-
b.iter(|| rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap())
127+
b.iter(|| bench.pooled_parser_concurrent(*scale).unwrap())
134128
},
135129
);
136130

137131
group.bench_with_input(
138132
BenchmarkId::new("quick_protobuf", scale),
139-
&(&bench, &rt, scale),
140-
|b, (bench, rt, scale)| {
133+
&(&bench, scale),
134+
|b, (bench, scale)| {
141135
let bench = bench.borrow();
142-
b.iter(|| {
143-
rt.block_on(bench.quick_protobuf_parser_concurrent(*scale))
144-
.unwrap()
145-
})
136+
b.iter(|| bench.quick_protobuf_parser_concurrent(*scale).unwrap())
146137
},
147138
);
148139

149140
group.bench_with_input(
150141
BenchmarkId::new("rust_protobuf", scale),
151-
&(&bench, &rt, scale),
152-
|b, (bench, rt, scale)| {
142+
&(&bench, scale),
143+
|b, (bench, scale)| {
153144
let bench = bench.borrow();
154-
b.iter(|| {
155-
rt.block_on(bench.rust_protobuf_parser_concurrent(*scale))
156-
.unwrap()
157-
})
145+
b.iter(|| bench.rust_protobuf_parser_concurrent(*scale).unwrap())
158146
},
159147
);
160148
}

0 commit comments

Comments
 (0)