Skip to content

perf: move request submission off event loop thread in execute_concurrent#827

Draft
mykaul wants to merge 85 commits intoscylladb:masterfrom
mykaul:perf/concurrent-submitter-thread
Draft

perf: move request submission off event loop thread in execute_concurrent#827
mykaul wants to merge 85 commits intoscylladb:masterfrom
mykaul:perf/concurrent-submitter-thread

Conversation

@mykaul
Copy link
Copy Markdown

@mykaul mykaul commented Apr 21, 2026

Summary

  • Moves execute_async() calls in execute_concurrent from the event-loop callback thread to a dedicated submitter thread
  • The event loop callback now only appends to a deque and signals an Event, reducing per-callback overhead from ~27μs to ~100ns
  • The submitter thread drains the deque in batches and calls execute_async(), which includes serialization — keeping that CPU work off the event loop

v2: Reduce per-request lock overhead in ResponseFuture

Second commit (7cae6a14e) reduces lock/synchronization cost per request in the execute_concurrent hot path:

  1. Lazy Event creation: ResponseFuture._event starts as None instead of Event(). The Event is only materialized in result() (the synchronous path). For execute_concurrent, which never calls result() on individual futures, this eliminates ~620ns per request (351ns Event construction + 267ns Event.set()).

  2. Merged add_callbacks(): Registers both callback and errback under a single _callback_lock acquisition instead of two separate lock/unlock cycles. Saves ~80ns per request.

  3. _set_final_result / _set_final_exception: Capture _event reference under _callback_lock before calling .set() outside the lock. Skip .set() when Event was never created. Null-check callback/errback lists before building to_call tuple.

  4. _wait_for_result(): Checks result availability under _callback_lock before creating Event — avoids Event creation entirely when the result arrived before the caller waits.

  5. _on_speculative_execute: Checks _final_result/_final_exception directly instead of Event.is_set(), since Event may be None with lazy creation.

All changes are safe under both GIL and free-threaded (PEP 703) Python. No GIL assumptions.

Benchmark Results

On our vector ingestion benchmark (100K rows, 768-dim float32 vectors, ScyllaDB 2026.1.1):

  • Stock master + execute_concurrent: ~7,500 rows/s
  • Enhanced driver + this change: +6-9% throughput improvement (additive with Cython serializer gains)
  • The improvement is modest because serialization still dominates; with Cython serializers reducing serialization cost, this change becomes more impactful

How It Works

  • _ConcurrentExecutorBase spawns a daemon submitter thread alongside the existing callback mechanism
  • Callbacks do deque.append(1); event.set() — minimal work on the hot path
  • Submitter thread wakes on the event, drains pending count, and calls _execute_next() in a batch
  • Thread-safe via collections.deque (atomic append/popleft in CPython) + threading.Event
  • Graceful shutdown: sentinel None in deque signals the thread to exit; join() in wait()

Testing

  • 642 unit tests pass, 0 failures
  • All 10 existing test_concurrent.py unit tests pass
  • Tested with real ScyllaDB cluster under sustained load (100K+ inserts)

mykaul added 30 commits April 2, 2026 16:18
Cache lookup_casstype_simple() and parse_casstype_args() with
@functools.lru_cache() to avoid repeated string manipulation and
regex scanning when the same type strings are resolved multiple times
(common during schema parsing and query result deserialization).

Also fixes an unused variable warning (prev_names -> _).

Includes a pytest-benchmark comparison (cached vs uncached).
Add three fast paths to VectorType.serialize() for the common case of
fixed-size numeric subtypes (float, double, int, bigint):

1. bytes/bytearray passthrough – skip all conversion when the caller
   already holds a correctly-sized blob (e.g. from serialize_numpy_bulk).

2. NumPy ndarray fast path – convert a 1-D numpy array to big-endian
   bytes via asarray(dtype=...).tobytes() instead of 768+ individual
   struct.pack + BytesIO.write calls.

3. serialize_numpy_bulk() classmethod – byte-swap an entire 2-D array
   (N rows × dim columns) once and slice the raw buffer, yielding one
   bytes object per row with zero per-element overhead.

Benchmarks on 768-dim float32 vectors show 70-300× speedups depending
on the path, directly benefiting bulk-insert workloads such as loading
embeddings from Parquet files (VectorDBBench use case).

NumPy remains an optional dependency; all new code is guarded by
try/except ImportError.  Variable-size subtypes (smallint, tinyint,
text, etc.) are excluded and continue to use the original
element-by-element path.

Unit tests cover correctness, round-trip fidelity, error handling, and
fallback behavior for all three paths.
Standalone benchmark comparing the four serialization paths for
VectorType across dimensions (128, 768, 1536) and batch sizes
(1, 100, 10000):

  - list (element-by-element)   – baseline
  - numpy (per-row ndarray)     – single-row fast path
  - bulk (serialize_numpy_bulk) – batch fast path
  - bytes passthrough (bind)    – pre-serialized blob

Includes auto-calibrated iteration counts and correctness verification.
Add a new section to docs/performance.rst covering the three fast
serialization paths (single-row ndarray, bulk serialize_numpy_bulk,
bytes passthrough) with usage examples and supported subtype list.
Add a fast path in ResultSet.was_applied that skips batch detection
(isinstance checks + regex match) when the query has a known LWT status
from the server PREPARE response. For BoundStatement queries where
is_lwt() returns True, the batch_regex match on the query string is
entirely avoided.

This benefits the most common LWT use case: prepared INSERT/UPDATE IF
statements executed via BoundStatement, where the driver already knows
from the PREPARE response whether the statement is an LWT.

The slow path (isinstance + regex) is preserved for:
- BatchStatement queries (detected via isinstance)
- SimpleStatement batch queries (detected via regex)
- Any query where is_lwt() returns False

Also adds explicit tests for the fast path, non-LWT fallback, and
BatchStatement handling in was_applied.

Part of: scylladb#751
Cache the results of _CassandraType.apply_parameters() in a class-level
dict keyed by (cls, subtypes, names). This avoids the expensive type()
metaclass machinery on repeated calls with the same type signature,
which is the common case during result-set deserialization.

Benchmark: 31.7x speedup (6.48 us/call -> 0.20 us/call) for cached hits.
Defer list allocation for _callbacks and _errbacks from __init__ to
first use in add_callback()/add_errback(). On the synchronous execute
path (session.execute()), no callbacks are registered, so both lists
are never allocated — saving 112 bytes per request.

All access is under _callback_lock; _set_final_result and
_set_final_exception use 'or ()' guard to iterate safely when None.

Benchmark: 2.2x faster init (0.06 -> 0.03 us), 112 bytes saved/request.
Benchmark shows ~1.1x speedup for known-LWT fast path vs regex
batch detection slow path.
Replace per-call struct.pack(">H%dsB" % l, l, p, 0) with pre-compiled
uint16_pack(len(p)) + p + b'\\x00'. This eliminates the format string
interpolation and dynamic struct format creation on every call, using
the pre-compiled uint16_pack (struct.Struct('>H').pack) instead.

The routing key computation is called for every query when
TokenAwarePolicy is in use, making this a hot path.
_Frame is instantiated for every response frame received from the server.
Adding __slots__ eliminates the per-instance __dict__ allocation (~104 bytes
on CPython), reducing memory pressure on high-throughput workloads.

_Frame only has 6 fixed attributes (version, flags, stream, opcode,
body_offset, end_pos) and is never monkey-patched or dynamically extended.
In _set_final_result and _set_final_exception, skip building the
tuple of partial(fn, ...) when the callbacks/errbacks list is empty.

Most queries use the synchronous result() path and never register
callbacks or errbacks, so this avoids constructing an empty tuple
and the associated generator overhead on every query completion.
…n path

When compression is active (protocol v4 and below), encode_message
previously created two BytesIO objects: one for the uncompressed body,
then another to write the header + compressed body. The second BytesIO
is unnecessary -- use direct bytes concatenation (header + compressed_body)
instead, which avoids one BytesIO allocation per compressed message.

The non-compression path already used a single BytesIO and is unchanged.
…in ResponseFuture

- Remove 3 dead class attributes (default_timeout, _profile_manager,
  _warned_timeout) that were never read or written on ResponseFuture
- Add prepared_statement and _continuous_paging_state as class-level
  defaults (both None), skip __init__ assignment when parameter is None
- Conditionalize _metrics and _host assignments: only set when non-None
- Saves 4 STORE_ATTR operations per query on the common path (simple
  statements, no metrics, no host targeting, no continuous paging)
Replace the per-parameter write_value(f, param) loop in
_QueryMessage._write_query_params() with a buffer accumulation approach:
list.append + b"".join + single f.write().

This reduces the number of f.write() calls from 2*N+1 to 1, which is
significant for vector workloads with large parameters.

Also removes the redundant ExecuteMessage._write_query_params()
pass-through override to avoid extra MRO lookup per call.

Includes 14 unit tests covering normal, NULL, UNSET, empty, large vector,
and mixed parameter scenarios for both ExecuteMessage and QueryMessage.

Includes a benchmark script (benchmarks/bench_execute_write_params.py).
Replace per-write_value()/write_byte()/write_short() calls in
BatchMessage.send_body() with buffer accumulation (list.append +
b"".join + single f.write()), reducing f.write() calls from
Q*(4 + 2*P) + footer to 1 for Q queries with P params each.

Benchmark results (Python 3.14, Cython .so, 50K iters, best of 3,
quiet machine):

  Scenario                              Before    After    Speedup
  10 queries x 2 params (128D vec)      8364 ns   4475 ns  1.87x
  10 queries x 2 params (768D vec)      8081 ns   5516 ns  1.47x
  50 queries x 2 params (128D vec)     32368 ns  16271 ns  1.99x
  10 queries x 10 text params          19138 ns   9051 ns  2.11x
  50 queries x 10 text params          86845 ns  40020 ns  2.17x
  10 unprepared x 2 params              8666 ns   4252 ns  2.04x

Also updates test_batch_message_with_keyspace to use BytesIO for
byte-level verification (compatible with single-write output).

Adds 7 batch-specific unit tests covering prepared, unprepared, mixed,
empty, many-query, NULL/UNSET, and vector parameter scenarios.

Includes benchmark script benchmarks/bench_batch_send_body.py.
Replace per-call int32_pack(-1) and int32_pack(-2) with module-level
_INT32_NEG1 and _INT32_NEG2 constants. Avoids redundant struct packing
on every null or unset parameter in the inner write_value loop.

Benchmark: ~11% speedup on the parameter serialization loop for a
typical 12-param mix of values, nulls, and unsets.
…torType

Add cassandra/serializers.pyx and cassandra/serializers.pxd implementing
Cython-optimized serialization that mirrors the deserializers.pyx architecture.

Implements type-specialized serializers for the three subtypes commonly used
in vector columns:
- SerFloatType: 4-byte big-endian IEEE 754 float
- SerDoubleType: 8-byte big-endian double
- SerInt32Type: 4-byte big-endian signed int32

SerVectorType pre-allocates a contiguous buffer and uses C-level byte swapping
for float/double/int32 vectors, with a generic fallback for other subtypes.
GenericSerializer delegates to the Python-level cqltype.serialize() classmethod.

Factory functions find_serializer() and make_serializers() allow easy lookup
and batch creation of serializers for column types.

Benchmarks show ~30x speedup over the current io.BytesIO baseline and ~3x
speedup over Python struct.pack for Vector<float, 1536> serialization.

No setup.py changes needed - the existing cassandra/*.pyx glob already picks
up new .pyx files.
…nt.bind()

When Cython serializers (from cassandra.serializers) are available and no
column encryption policy is active, BoundStatement.bind() now uses
pre-built Serializer objects cached on the PreparedStatement instead of
calling cqltype classmethods. This avoids per-value Python method dispatch
overhead and enables the ~30x vector serialization speedup from the Cython
serializers module.

The bind loop is split into three paths:
1. Column encryption policy path (unchanged behavior)
2. Cython serializers path (new fast path)
3. Plain Python path (no CE, no Cython -- removes per-value ColDesc/CE check)

Depends on PR scylladb#748 (Cython serializers module) and PR scylladb#630 (CE-policy
bind split).
…sion

Implement cython_lz4.pyx that calls LZ4_compress_default() and
LZ4_decompress_safe() directly via Cython's cdef extern, bypassing
the Python lz4 module's object allocation overhead in the hot
compress/decompress path.

Key design decisions:
- Direct C linkage (cdef extern from "lz4.h") eliminates all
  intermediate Python object allocations for byte-order conversion
- Zero-copy compress: uses _PyBytes_Resize to shrink the output
  bytes object in-place (CPython-specific; documented and safe
  because the object has refcount=1 during construction)
- Wire-compatible with CQL binary protocol v4 format:
  [4 bytes big-endian uncompressed length][raw LZ4 compressed data]
- Safety guards: LZ4_MAX_INPUT_SIZE check (prevents Py_ssize_t→int
  truncation), INT32_MAX compressed payload check, 256 MiB
  decompressed size cap, result size verification
- bytes not None parameter typing rejects None/bytearray/memoryview
- PyPy-safe: this is a Cython module (CPython only); PyPy users
  automatically fall back to the pure-Python lz4 wrappers via the
  import chain in connection.py

Integration:
- connection.py: Cython import with fallback; also enables LZ4
  without the Python lz4 package when the Cython extension is built
- setup.py: separate Extension with libraries=['lz4'], excluded
  from the .pyx glob (which lacks the -llz4 link flag)

Benchmark results (taskset -c 0, CPython 3.14):
  Payload  Operation     Python (ns)  Cython (ns)  Speedup
  1KB      compress            596        360       1.66x
  1KB      decompress          313        136       2.30x
  8KB      compress           1192        722       1.65x
  8KB      decompress         1102        825       1.34x
  64KB     compress           8179       3976       2.06x
  64KB     decompress         6539       4890       1.34x
Add __slots__ to the Tablet class, removing the per-instance __dict__
allocation. Tablets are created frequently (one per token range per table)
and are long-lived, so the cumulative memory savings are significant.

Before: 416 bytes/tablet (48 instance + 96 __dict__ + 80 replicas + 192 tuples)
After:  328 bytes/tablet (56 instance +  0 __dict__ + 80 replicas + 192 tuples)
Saving: 88 bytes/tablet (21%)

Scale impact (3 replicas/tablet):
  12,800 tablets (100 tables x 128): saves 1.1 MB
 128,000 tablets (1000 tables x 128): saves 10.7 MB
 256,000 tablets (1000 tables x 256): saves 21.5 MB

Tablet.from_row construction also improves:
  Before: 186 ns/call
  After:  147 ns/call (1.27x faster, -21%)
Replicas are never mutated after Tablet construction; convert to tuple
in __init__ to save 8 bytes per tablet (list overallocates for future
appends that never happen) and communicate immutability.

Before: 328 bytes/tablet (replicas container: 80 bytes as list)
After:  320 bytes/tablet (replicas container: 72 bytes as tuple)
Saving: 8 bytes/tablet (2.4%)

Combined with __slots__ (commit 1), total savings so far: 96 bytes/tablet.

Scale impact (3 replicas/tablet):
  128,000 tablets: saves ~1.0 MB (tuple) + 10.7 MB (slots) = 11.7 MB total
  256,000 tablets: saves ~2.0 MB (tuple) + 21.5 MB (slots) = 23.5 MB total
Build a {host_id: shard_id} dict once at Tablet construction time so
that policies.py and pool.py can replace set(map(lambda ...)) and
linear scans with O(1) dict operations.

- Add _replica_dict to __slots__
- Build dict from the materialized tuple (not the raw replicas arg)
  to avoid double-consuming a one-shot iterator
- Update DCAwareRoundRobinPolicy to use tablet._replica_dict keys
- Update HostConnection to use tablet._replica_dict.get() for shard
- Rewrite replica_contains_host_id() to use dict membership
- Add 7 unit tests covering dict construction, lookup, host membership,
  tuple storage, and the iterator edge case
Remove the _is_valid_tablet staticmethod indirection and replace the
two-step from_row -> _is_valid_tablet -> Tablet() chain with a single
truthiness guard and direct construction.  Saves ~54 ns/call (12%)
by eliminating a staticmethod descriptor lookup, an extra function
call, and redundant 'is not None' check (replicas from CQL
deserialization is always a list or None).
Maintain parallel _first_tokens and _last_tokens dicts alongside
_tablets, each mapping (keyspace, table) to a plain list[int].  This
lets bisect_left run entirely in C on native ints instead of calling
an attrgetter callback on every comparison during binary search.

Follow-up to PR scylladb#757 which identified the opportunity: its own
benchmarks showed bisect_left without key= is 2.7-5.7x faster than
with key=attrgetter.

Results (best-of-5, Python 3.14):

  get_tablet_for_key (hit):
  Tablets    Before    After    Saved   Speedup
       10    293ns    216ns     78ns     1.36x
      100    351ns    233ns    118ns     1.51x
    1,000    448ns    267ns    181ns     1.68x
   10,000    537ns    282ns    255ns     1.90x

All three dicts are kept in sync by add_tablet, drop_tablets, and
drop_tablets_by_host_id.  The attrgetter imports are no longer needed
and have been removed.
Remove 'results = None' (never assigned or read in production code),
duplicate 'kind = None' (declared twice), and duplicate 'paging_state = None'
(declared at lines 663 and 681). The canonical declarations at lines 672-685
are kept.
When trace_id, custom_payload, and warnings are None (the common case
for non-traced, non-warned messages), skip the instance attribute
assignment. The class-level defaults on _MessageType already provide
None, so reading these attributes returns the correct value without
the per-instance __dict__ write.
mykaul added 21 commits April 20, 2026 14:34
In TokenAwarePolicy.make_query_plan(), the tablet replica lookup was
creating a new set via {r[0] for r in tablet.replicas} on every query.
The Tablet object already maintains _replica_dict (host_id -> shard_id)
which supports the same 'in' membership test. Reusing it eliminates
per-query set allocation and redundant UUID hashing (observed as ~200K
set creations and ~600K uuid.__hash__ calls in profiling).
When tablets are in use, get_tablet_for_key() was called twice per
request: once in TokenAwarePolicy.make_query_plan() to find the replica,
and again in HostConnection._get_connection_for_routing_key() to
determine the shard. Profiling showed ~400K bisect lookups for 200K
rows.

Stash the tablet found during query planning on the query object
(query._tablet) and pass it through to borrow_connection(), which
skips the second lookup when a tablet is already available. This
eliminates ~200K redundant bisect_left calls and associated dict
lookups per 200K requests.
…resolved)

Resolved conflict with skip-empty-callbacks (scylladb#803): kept the 'if
callbacks:' guard which is strictly better than 'or ()' — it skips
both tuple allocation and iteration when no callbacks are registered.
The __init__ lazy-init (None instead of []) from scylladb#795 was already in
place.
ConcurrentExecutorListResults now uses a dedicated submitter thread
instead of calling _execute_next inline from the event loop callback.
This decouples I/O completion processing from new request serialization
and enqueuing, yielding ~6-9% higher write throughput.

The callback now just signals a threading.Event; the submitter thread
drains a deque and calls session.execute_async in batches. This avoids
blocking the libev event loop thread with request preparation work
(query plan, serialization, tablet lookup) that takes ~27us per request.

Bug fixes included:
- Start submitter thread after initial batch (avoid race on _exec_count)
- Write _exec_count under _condition lock (avoid race with _current)
- Use exhausted flag to avoid repeated StopIteration on iterator
- Add warmup phase (--warmup-rows, default 1000) before timed runs
- Add multiple timed runs (--runs, default 3) with best/avg/worst
- Pre-load all data before timing to separate data prep from ingestion
- Add variant A2 (stock master + numpy) and A3 (stock master + decoupled)
- Fix venv isolation: use python -I flag in run_bench.sh to prevent
  the repo's cassandra/ directory from shadowing pip-installed packages
- Fix concurrency closure bug in DecoupledExecutor: store as
  self.concurrency instead of relying on outer function scope
- Refactor: extract ingest_execute_concurrent() and ingest_decoupled()
  as reusable functions, decouple from connection/schema setup
- Remove unused --batch-size arg, default --max-rows to 100000
- Update run_bench.sh summary table for new JSON format (best/avg/worst)
@mykaul mykaul force-pushed the perf/concurrent-submitter-thread branch 2 times, most recently from fd9be81 to b759d4a Compare April 21, 2026 11:00
@mykaul
Copy link
Copy Markdown
Author

mykaul commented Apr 21, 2026

v2 changes: reduce per-request lock overhead in ResponseFuture

New commit 7cae6a14e on top of the submitter thread change. Focuses on reducing lock/synchronization cost per request in the execute_concurrent hot path.

Changes

  1. Lazy Event creation (cluster.py): ResponseFuture._event starts as None instead of Event(). The Event is only materialized in _wait_for_result() (the synchronous result() path). For execute_concurrent, which never calls result() on individual futures, this eliminates ~620ns per request (351ns Event construction + 267ns Event.set()).

  2. Merged add_callbacks() (cluster.py): Registers both callback and errback under a single _callback_lock acquisition instead of two separate lock/unlock cycles. Saves ~80ns per request.

  3. _set_final_result / _set_final_exception (cluster.py): Capture _event reference under _callback_lock before calling .set() outside the lock. Skips .set() when Event was never created. Null-checks callback/errback lists before building to_call tuple. All safe under free-threaded Python (PEP 703).

  4. _wait_for_result() (cluster.py): New extracted method. Checks result availability under _callback_lock before creating Event — avoids Event creation entirely when the result arrived before the caller waits. Thread-safe under both GIL and no-GIL.

  5. _on_speculative_execute (cluster.py): Checks _final_result/_final_exception directly instead of relying on Event.is_set(), since Event may be None with lazy creation.

  6. Properties / paging (cluster.py): warnings, custom_payload properties and start_fetching_next_page handle _event is None.

Design notes

  • All changes are safe under both GIL and free-threaded (PEP 703) Python. No GIL assumptions.
  • _callback_lock is reused as the synchronization point for lazy Event creation (no new locks).
  • pool.py is not modified — an earlier attempt to optimize _stream_available_condition.notify() was reverted due to lost-wakeup risk under free-threaded Python.

Test results

@mykaul
Copy link
Copy Markdown
Author

mykaul commented Apr 21, 2026

Correction on test failures: The 6 "pre-existing failures" reported in the v2 comment were all caused by a stale Cython .so in the working tree taking precedence over the .py source. After rebuilding (uv sync --reinstall-package scylla-driver):

642 passed, 0 failures, 8 skipped

No pre-existing failures. Clean test run.

mykaul added 3 commits April 22, 2026 09:57
Lazy Event: defer Event() creation until result() is actually called.
For execute_concurrent (which never calls result()), this eliminates
~620ns of Event construction + Event.set() per request.

Merged add_callbacks: register both callback and errback under a single
_callback_lock acquisition instead of two separate ones (~80ns saved).

_set_final_result/_set_final_exception: capture _event reference under
_callback_lock for free-threaded Python safety; skip .set() when Event
was never created.

_wait_for_result: check result availability under _callback_lock before
creating Event, avoiding Event creation entirely when the result arrived
before the caller waits.

_on_speculative_execute: check _final_result/_final_exception directly
instead of relying on Event.is_set(), since Event may be None.

All changes are safe under both GIL and free-threaded (PEP 703) Python.
@mykaul mykaul force-pushed the perf/concurrent-submitter-thread branch from 7cae6a1 to d054886 Compare April 22, 2026 07:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant