perf: move request submission off event loop thread in execute_concurrent#827
perf: move request submission off event loop thread in execute_concurrent#827mykaul wants to merge 85 commits intoscylladb:masterfrom
Conversation
Cache lookup_casstype_simple() and parse_casstype_args() with @functools.lru_cache() to avoid repeated string manipulation and regex scanning when the same type strings are resolved multiple times (common during schema parsing and query result deserialization). Also fixes an unused variable warning (prev_names -> _). Includes a pytest-benchmark comparison (cached vs uncached).
Add three fast paths to VectorType.serialize() for the common case of fixed-size numeric subtypes (float, double, int, bigint): 1. bytes/bytearray passthrough – skip all conversion when the caller already holds a correctly-sized blob (e.g. from serialize_numpy_bulk). 2. NumPy ndarray fast path – convert a 1-D numpy array to big-endian bytes via asarray(dtype=...).tobytes() instead of 768+ individual struct.pack + BytesIO.write calls. 3. serialize_numpy_bulk() classmethod – byte-swap an entire 2-D array (N rows × dim columns) once and slice the raw buffer, yielding one bytes object per row with zero per-element overhead. Benchmarks on 768-dim float32 vectors show 70-300× speedups depending on the path, directly benefiting bulk-insert workloads such as loading embeddings from Parquet files (VectorDBBench use case). NumPy remains an optional dependency; all new code is guarded by try/except ImportError. Variable-size subtypes (smallint, tinyint, text, etc.) are excluded and continue to use the original element-by-element path. Unit tests cover correctness, round-trip fidelity, error handling, and fallback behavior for all three paths.
Standalone benchmark comparing the four serialization paths for VectorType across dimensions (128, 768, 1536) and batch sizes (1, 100, 10000): - list (element-by-element) – baseline - numpy (per-row ndarray) – single-row fast path - bulk (serialize_numpy_bulk) – batch fast path - bytes passthrough (bind) – pre-serialized blob Includes auto-calibrated iteration counts and correctness verification.
Add a new section to docs/performance.rst covering the three fast serialization paths (single-row ndarray, bulk serialize_numpy_bulk, bytes passthrough) with usage examples and supported subtype list.
Add a fast path in ResultSet.was_applied that skips batch detection (isinstance checks + regex match) when the query has a known LWT status from the server PREPARE response. For BoundStatement queries where is_lwt() returns True, the batch_regex match on the query string is entirely avoided. This benefits the most common LWT use case: prepared INSERT/UPDATE IF statements executed via BoundStatement, where the driver already knows from the PREPARE response whether the statement is an LWT. The slow path (isinstance + regex) is preserved for: - BatchStatement queries (detected via isinstance) - SimpleStatement batch queries (detected via regex) - Any query where is_lwt() returns False Also adds explicit tests for the fast path, non-LWT fallback, and BatchStatement handling in was_applied. Part of: scylladb#751
Cache the results of _CassandraType.apply_parameters() in a class-level dict keyed by (cls, subtypes, names). This avoids the expensive type() metaclass machinery on repeated calls with the same type signature, which is the common case during result-set deserialization. Benchmark: 31.7x speedup (6.48 us/call -> 0.20 us/call) for cached hits.
Defer list allocation for _callbacks and _errbacks from __init__ to first use in add_callback()/add_errback(). On the synchronous execute path (session.execute()), no callbacks are registered, so both lists are never allocated — saving 112 bytes per request. All access is under _callback_lock; _set_final_result and _set_final_exception use 'or ()' guard to iterate safely when None. Benchmark: 2.2x faster init (0.06 -> 0.03 us), 112 bytes saved/request.
Benchmark shows ~1.1x speedup for known-LWT fast path vs regex batch detection slow path.
Replace per-call struct.pack(">H%dsB" % l, l, p, 0) with pre-compiled
uint16_pack(len(p)) + p + b'\\x00'. This eliminates the format string
interpolation and dynamic struct format creation on every call, using
the pre-compiled uint16_pack (struct.Struct('>H').pack) instead.
The routing key computation is called for every query when
TokenAwarePolicy is in use, making this a hot path.
_Frame is instantiated for every response frame received from the server. Adding __slots__ eliminates the per-instance __dict__ allocation (~104 bytes on CPython), reducing memory pressure on high-throughput workloads. _Frame only has 6 fixed attributes (version, flags, stream, opcode, body_offset, end_pos) and is never monkey-patched or dynamically extended.
In _set_final_result and _set_final_exception, skip building the tuple of partial(fn, ...) when the callbacks/errbacks list is empty. Most queries use the synchronous result() path and never register callbacks or errbacks, so this avoids constructing an empty tuple and the associated generator overhead on every query completion.
…n path When compression is active (protocol v4 and below), encode_message previously created two BytesIO objects: one for the uncompressed body, then another to write the header + compressed body. The second BytesIO is unnecessary -- use direct bytes concatenation (header + compressed_body) instead, which avoids one BytesIO allocation per compressed message. The non-compression path already used a single BytesIO and is unchanged.
…in ResponseFuture - Remove 3 dead class attributes (default_timeout, _profile_manager, _warned_timeout) that were never read or written on ResponseFuture - Add prepared_statement and _continuous_paging_state as class-level defaults (both None), skip __init__ assignment when parameter is None - Conditionalize _metrics and _host assignments: only set when non-None - Saves 4 STORE_ATTR operations per query on the common path (simple statements, no metrics, no host targeting, no continuous paging)
Replace the per-parameter write_value(f, param) loop in _QueryMessage._write_query_params() with a buffer accumulation approach: list.append + b"".join + single f.write(). This reduces the number of f.write() calls from 2*N+1 to 1, which is significant for vector workloads with large parameters. Also removes the redundant ExecuteMessage._write_query_params() pass-through override to avoid extra MRO lookup per call. Includes 14 unit tests covering normal, NULL, UNSET, empty, large vector, and mixed parameter scenarios for both ExecuteMessage and QueryMessage. Includes a benchmark script (benchmarks/bench_execute_write_params.py).
Replace per-write_value()/write_byte()/write_short() calls in BatchMessage.send_body() with buffer accumulation (list.append + b"".join + single f.write()), reducing f.write() calls from Q*(4 + 2*P) + footer to 1 for Q queries with P params each. Benchmark results (Python 3.14, Cython .so, 50K iters, best of 3, quiet machine): Scenario Before After Speedup 10 queries x 2 params (128D vec) 8364 ns 4475 ns 1.87x 10 queries x 2 params (768D vec) 8081 ns 5516 ns 1.47x 50 queries x 2 params (128D vec) 32368 ns 16271 ns 1.99x 10 queries x 10 text params 19138 ns 9051 ns 2.11x 50 queries x 10 text params 86845 ns 40020 ns 2.17x 10 unprepared x 2 params 8666 ns 4252 ns 2.04x Also updates test_batch_message_with_keyspace to use BytesIO for byte-level verification (compatible with single-write output). Adds 7 batch-specific unit tests covering prepared, unprepared, mixed, empty, many-query, NULL/UNSET, and vector parameter scenarios. Includes benchmark script benchmarks/bench_batch_send_body.py.
Replace per-call int32_pack(-1) and int32_pack(-2) with module-level _INT32_NEG1 and _INT32_NEG2 constants. Avoids redundant struct packing on every null or unset parameter in the inner write_value loop. Benchmark: ~11% speedup on the parameter serialization loop for a typical 12-param mix of values, nulls, and unsets.
…torType Add cassandra/serializers.pyx and cassandra/serializers.pxd implementing Cython-optimized serialization that mirrors the deserializers.pyx architecture. Implements type-specialized serializers for the three subtypes commonly used in vector columns: - SerFloatType: 4-byte big-endian IEEE 754 float - SerDoubleType: 8-byte big-endian double - SerInt32Type: 4-byte big-endian signed int32 SerVectorType pre-allocates a contiguous buffer and uses C-level byte swapping for float/double/int32 vectors, with a generic fallback for other subtypes. GenericSerializer delegates to the Python-level cqltype.serialize() classmethod. Factory functions find_serializer() and make_serializers() allow easy lookup and batch creation of serializers for column types. Benchmarks show ~30x speedup over the current io.BytesIO baseline and ~3x speedup over Python struct.pack for Vector<float, 1536> serialization. No setup.py changes needed - the existing cassandra/*.pyx glob already picks up new .pyx files.
…nt.bind() When Cython serializers (from cassandra.serializers) are available and no column encryption policy is active, BoundStatement.bind() now uses pre-built Serializer objects cached on the PreparedStatement instead of calling cqltype classmethods. This avoids per-value Python method dispatch overhead and enables the ~30x vector serialization speedup from the Cython serializers module. The bind loop is split into three paths: 1. Column encryption policy path (unchanged behavior) 2. Cython serializers path (new fast path) 3. Plain Python path (no CE, no Cython -- removes per-value ColDesc/CE check) Depends on PR scylladb#748 (Cython serializers module) and PR scylladb#630 (CE-policy bind split).
…sion Implement cython_lz4.pyx that calls LZ4_compress_default() and LZ4_decompress_safe() directly via Cython's cdef extern, bypassing the Python lz4 module's object allocation overhead in the hot compress/decompress path. Key design decisions: - Direct C linkage (cdef extern from "lz4.h") eliminates all intermediate Python object allocations for byte-order conversion - Zero-copy compress: uses _PyBytes_Resize to shrink the output bytes object in-place (CPython-specific; documented and safe because the object has refcount=1 during construction) - Wire-compatible with CQL binary protocol v4 format: [4 bytes big-endian uncompressed length][raw LZ4 compressed data] - Safety guards: LZ4_MAX_INPUT_SIZE check (prevents Py_ssize_t→int truncation), INT32_MAX compressed payload check, 256 MiB decompressed size cap, result size verification - bytes not None parameter typing rejects None/bytearray/memoryview - PyPy-safe: this is a Cython module (CPython only); PyPy users automatically fall back to the pure-Python lz4 wrappers via the import chain in connection.py Integration: - connection.py: Cython import with fallback; also enables LZ4 without the Python lz4 package when the Cython extension is built - setup.py: separate Extension with libraries=['lz4'], excluded from the .pyx glob (which lacks the -llz4 link flag) Benchmark results (taskset -c 0, CPython 3.14): Payload Operation Python (ns) Cython (ns) Speedup 1KB compress 596 360 1.66x 1KB decompress 313 136 2.30x 8KB compress 1192 722 1.65x 8KB decompress 1102 825 1.34x 64KB compress 8179 3976 2.06x 64KB decompress 6539 4890 1.34x
Add __slots__ to the Tablet class, removing the per-instance __dict__ allocation. Tablets are created frequently (one per token range per table) and are long-lived, so the cumulative memory savings are significant. Before: 416 bytes/tablet (48 instance + 96 __dict__ + 80 replicas + 192 tuples) After: 328 bytes/tablet (56 instance + 0 __dict__ + 80 replicas + 192 tuples) Saving: 88 bytes/tablet (21%) Scale impact (3 replicas/tablet): 12,800 tablets (100 tables x 128): saves 1.1 MB 128,000 tablets (1000 tables x 128): saves 10.7 MB 256,000 tablets (1000 tables x 256): saves 21.5 MB Tablet.from_row construction also improves: Before: 186 ns/call After: 147 ns/call (1.27x faster, -21%)
Replicas are never mutated after Tablet construction; convert to tuple in __init__ to save 8 bytes per tablet (list overallocates for future appends that never happen) and communicate immutability. Before: 328 bytes/tablet (replicas container: 80 bytes as list) After: 320 bytes/tablet (replicas container: 72 bytes as tuple) Saving: 8 bytes/tablet (2.4%) Combined with __slots__ (commit 1), total savings so far: 96 bytes/tablet. Scale impact (3 replicas/tablet): 128,000 tablets: saves ~1.0 MB (tuple) + 10.7 MB (slots) = 11.7 MB total 256,000 tablets: saves ~2.0 MB (tuple) + 21.5 MB (slots) = 23.5 MB total
Build a {host_id: shard_id} dict once at Tablet construction time so
that policies.py and pool.py can replace set(map(lambda ...)) and
linear scans with O(1) dict operations.
- Add _replica_dict to __slots__
- Build dict from the materialized tuple (not the raw replicas arg)
to avoid double-consuming a one-shot iterator
- Update DCAwareRoundRobinPolicy to use tablet._replica_dict keys
- Update HostConnection to use tablet._replica_dict.get() for shard
- Rewrite replica_contains_host_id() to use dict membership
- Add 7 unit tests covering dict construction, lookup, host membership,
tuple storage, and the iterator edge case
Remove the _is_valid_tablet staticmethod indirection and replace the two-step from_row -> _is_valid_tablet -> Tablet() chain with a single truthiness guard and direct construction. Saves ~54 ns/call (12%) by eliminating a staticmethod descriptor lookup, an extra function call, and redundant 'is not None' check (replicas from CQL deserialization is always a list or None).
Maintain parallel _first_tokens and _last_tokens dicts alongside _tablets, each mapping (keyspace, table) to a plain list[int]. This lets bisect_left run entirely in C on native ints instead of calling an attrgetter callback on every comparison during binary search. Follow-up to PR scylladb#757 which identified the opportunity: its own benchmarks showed bisect_left without key= is 2.7-5.7x faster than with key=attrgetter. Results (best-of-5, Python 3.14): get_tablet_for_key (hit): Tablets Before After Saved Speedup 10 293ns 216ns 78ns 1.36x 100 351ns 233ns 118ns 1.51x 1,000 448ns 267ns 181ns 1.68x 10,000 537ns 282ns 255ns 1.90x All three dicts are kept in sync by add_tablet, drop_tablets, and drop_tablets_by_host_id. The attrgetter imports are no longer needed and have been removed.
Remove 'results = None' (never assigned or read in production code), duplicate 'kind = None' (declared twice), and duplicate 'paging_state = None' (declared at lines 663 and 681). The canonical declarations at lines 672-685 are kept.
When trace_id, custom_payload, and warnings are None (the common case for non-traced, non-warned messages), skip the instance attribute assignment. The class-level defaults on _MessageType already provide None, so reading these attributes returns the correct value without the per-instance __dict__ write.
…into perf/all-merged
…o perf/all-merged
…st' into perf/all-merged
…p' into perf/all-merged
In TokenAwarePolicy.make_query_plan(), the tablet replica lookup was
creating a new set via {r[0] for r in tablet.replicas} on every query.
The Tablet object already maintains _replica_dict (host_id -> shard_id)
which supports the same 'in' membership test. Reusing it eliminates
per-query set allocation and redundant UUID hashing (observed as ~200K
set creations and ~600K uuid.__hash__ calls in profiling).
When tablets are in use, get_tablet_for_key() was called twice per request: once in TokenAwarePolicy.make_query_plan() to find the replica, and again in HostConnection._get_connection_for_routing_key() to determine the shard. Profiling showed ~400K bisect lookups for 200K rows. Stash the tablet found during query planning on the query object (query._tablet) and pass it through to borrow_connection(), which skips the second lookup when a tablet is already available. This eliminates ~200K redundant bisect_left calls and associated dict lookups per 200K requests.
…resolved) Resolved conflict with skip-empty-callbacks (scylladb#803): kept the 'if callbacks:' guard which is strictly better than 'or ()' — it skips both tuple allocation and iteration when no callbacks are registered. The __init__ lazy-init (None instead of []) from scylladb#795 was already in place.
ConcurrentExecutorListResults now uses a dedicated submitter thread instead of calling _execute_next inline from the event loop callback. This decouples I/O completion processing from new request serialization and enqueuing, yielding ~6-9% higher write throughput. The callback now just signals a threading.Event; the submitter thread drains a deque and calls session.execute_async in batches. This avoids blocking the libev event loop thread with request preparation work (query plan, serialization, tablet lookup) that takes ~27us per request. Bug fixes included: - Start submitter thread after initial batch (avoid race on _exec_count) - Write _exec_count under _condition lock (avoid race with _current) - Use exhausted flag to avoid repeated StopIteration on iterator
- Add warmup phase (--warmup-rows, default 1000) before timed runs - Add multiple timed runs (--runs, default 3) with best/avg/worst - Pre-load all data before timing to separate data prep from ingestion - Add variant A2 (stock master + numpy) and A3 (stock master + decoupled) - Fix venv isolation: use python -I flag in run_bench.sh to prevent the repo's cassandra/ directory from shadowing pip-installed packages - Fix concurrency closure bug in DecoupledExecutor: store as self.concurrency instead of relying on outer function scope - Refactor: extract ingest_execute_concurrent() and ingest_decoupled() as reusable functions, decouple from connection/schema setup - Remove unused --batch-size arg, default --max-rows to 100000 - Update run_bench.sh summary table for new JSON format (best/avg/worst)
fd9be81 to
b759d4a
Compare
v2 changes: reduce per-request lock overhead in ResponseFutureNew commit Changes
Design notes
Test results
|
|
Correction on test failures: The 6 "pre-existing failures" reported in the v2 comment were all caused by a stale Cython 642 passed, 0 failures, 8 skipped No pre-existing failures. Clean test run. |
Lazy Event: defer Event() creation until result() is actually called. For execute_concurrent (which never calls result()), this eliminates ~620ns of Event construction + Event.set() per request. Merged add_callbacks: register both callback and errback under a single _callback_lock acquisition instead of two separate ones (~80ns saved). _set_final_result/_set_final_exception: capture _event reference under _callback_lock for free-threaded Python safety; skip .set() when Event was never created. _wait_for_result: check result availability under _callback_lock before creating Event, avoiding Event creation entirely when the result arrived before the caller waits. _on_speculative_execute: check _final_result/_final_exception directly instead of relying on Event.is_set(), since Event may be None. All changes are safe under both GIL and free-threaded (PEP 703) Python.
7cae6a1 to
d054886
Compare
Summary
execute_async()calls inexecute_concurrentfrom the event-loop callback thread to a dedicated submitter threadexecute_async(), which includes serialization — keeping that CPU work off the event loopv2: Reduce per-request lock overhead in ResponseFuture
Second commit (
7cae6a14e) reduces lock/synchronization cost per request in theexecute_concurrenthot path:Lazy
Eventcreation:ResponseFuture._eventstarts asNoneinstead ofEvent(). The Event is only materialized inresult()(the synchronous path). Forexecute_concurrent, which never callsresult()on individual futures, this eliminates ~620ns per request (351ns Event construction + 267ns Event.set()).Merged
add_callbacks(): Registers both callback and errback under a single_callback_lockacquisition instead of two separate lock/unlock cycles. Saves ~80ns per request._set_final_result/_set_final_exception: Capture_eventreference under_callback_lockbefore calling.set()outside the lock. Skip.set()when Event was never created. Null-check callback/errback lists before buildingto_calltuple._wait_for_result(): Checks result availability under_callback_lockbefore creating Event — avoids Event creation entirely when the result arrived before the caller waits._on_speculative_execute: Checks_final_result/_final_exceptiondirectly instead ofEvent.is_set(), since Event may beNonewith lazy creation.All changes are safe under both GIL and free-threaded (PEP 703) Python. No GIL assumptions.
Benchmark Results
On our vector ingestion benchmark (100K rows, 768-dim float32 vectors, ScyllaDB 2026.1.1):
How It Works
_ConcurrentExecutorBasespawns a daemon submitter thread alongside the existing callback mechanismdeque.append(1); event.set()— minimal work on the hot path_execute_next()in a batchcollections.deque(atomic append/popleft in CPython) +threading.EventNonein deque signals the thread to exit;join()inwait()Testing
test_concurrent.pyunit tests pass