When you query Cassandra for potentially large result sets, you need to understand how data flows from Cassandra to your application. This guide explains how async-cassandra's streaming works with Cassandra's native paging to efficiently process large amounts of data.
- How Cassandra Paging Works
- The Memory Problem
- How Streaming Solves This
- Basic Usage
- Understanding Fetch Size
- Page-by-Page Processing
- Advanced Patterns
- Performance Guidelines
- Common Pitfalls
- True Async Paging
First, let's understand what happens when you query Cassandra:
sequenceDiagram
participant App as Your App
participant Driver as Cassandra Driver
participant Cassandra as Cassandra DB
App->>Driver: execute("SELECT * FROM large_table")
Driver->>Cassandra: Request (fetch_size=5000 = page size)
Note over Cassandra: Reads 5000 rows<br/>(one page)
Cassandra-->>Driver: Page 1 (5000 rows) + has_more_pages=true
Driver-->>App: ResultSet with Page 1
Note over App: Process rows...
App->>Driver: Get next page
Driver->>Cassandra: Request next page
Note over Cassandra: Reads next 5000 rows
Cassandra-->>Driver: Page 2 (5000 rows) + has_more_pages=true
Driver-->>App: ResultSet with Page 2
Note over App: Continue until has_more_pages=false
- Fetch Size = Page Size: The
fetch_sizeparameter controls how many rows Cassandra returns in each "page" (default: 5000). This is the same as settingstatement.fetch_sizein the standard driver. - Page: A batch of rows returned by Cassandra
- Paging State: A token that tells Cassandra where to continue reading
When you iterate over a result set from execute(), the driver fetches pages transparently as you iterate. However, if you materialize the entire result (using .all() or list()), ALL pages are fetched into memory:
# With the standard cassandra-driver (sync):
result = session.execute("SELECT * FROM billion_row_table")
# Iteration fetches pages on-demand (good for memory)
for row in result:
process(row) # Pages fetched as needed
# BUT if you materialize the result:
all_rows = result.all() # Driver fetches ALL pages! 💥
# or
all_rows = list(result) # Same problem! 💥
# With async-cassandra (async):
result = await session.execute("SELECT * FROM billion_row_table")
# SAME BEHAVIOR - materialization fetches all pages
all_rows = result.all() # Still fetches all pages into memory 💥According to the DataStax documentation, "Whenever there are no more rows in the current page, the next page will be fetched transparently."
Important: The driver does NOT prefetch pages. Based on our analysis of the driver source code, pages are fetched on-demand only when:
- You've consumed all rows in the current page
- You attempt to get the next row (causing a StopIteration exception internally)
- Only then does the driver synchronously fetch the next page
This means there's no background prefetching or read-ahead behavior - each page is fetched exactly when needed, blocking the iteration until the page arrives.
Performance Implications:
- There's a network round-trip delay between finishing one page and starting the next
- No wasted bandwidth fetching pages that might not be needed
- Memory usage is predictable - only one page at a time
- For async applications, this blocking fetch is why we need streaming wrappers
Here's what happens behind the scenes:
sequenceDiagram
participant App as Your App
participant Driver as Driver Memory
participant Cassandra
App->>Driver: result.all() or list(result)
Note over Driver: Starts iterating internally
loop Automatically fetches ALL pages
Driver->>Cassandra: Get next page (5000 rows)
Cassandra-->>Driver: Page data
Note over Driver: Accumulates in memory<br/>(not releasing previous pages!)
end
Driver-->>App: List with ALL rows in memory
Note over App: 💥 Out of Memory!
The cassandra-driver does provide manual paging control. Note that even with manual control, there's no prefetching:
# Using the driver's manual paging (WITHOUT async-cassandra)
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'])
session = cluster.connect()
# Set fetch size (page size)
statement = SimpleStatement("SELECT * FROM billion_row_table")
statement.fetch_size = 1000 # Each page will have 1000 rows
# Execute and get first page
result = session.execute(statement)
page1 = list(result.current_rows) # First 1000 rows
# Manually fetch next page
if result.has_more_pages:
result.fetch_next_page() # Blocks until page arrives
page2 = list(result.current_rows) # Next 1000 rows
# Alternative: start_fetching_next_page() - but still not true prefetch
if result.has_more_pages:
result.response_future.start_fetching_next_page() # "Async" start
# But you still must wait for it to complete:
result.response_future.result() # Blocks here
page2 = list(result.current_rows)This works, but has limitations:
- Not truly async -
fetch_next_page()blocks the event loop - Complex iteration - You must manually manage page fetching
- Error-prone - Easy to accidentally load all pages into memory
To understand why manual paging doesn't work well with async applications, you need to understand how async/await works:
In async Python applications, there's a single "event loop" that manages all concurrent operations:
# Simplified view of what happens in an async app
async def handle_request_1():
data = await database.query() # Event loop switches to other tasks
return process(data)
async def handle_request_2():
result = await api.call() # Event loop switches to other tasks
return format(result)
# Event loop runs both concurrently by switching between themThe magic happens when you use await - it tells the event loop "I'm waiting for something, go handle other requests!"
Here's what happens when you try manual paging in an async application:
# ❌ BAD: Blocks the event loop!
async def process_with_manual_paging():
# Even with async-cassandra, manual paging isn't truly async
result = await session.execute("SELECT * FROM large_table")
while True:
# Process current page
for row in result.current_rows:
await process_row(row) # ✅ Async - doesn't block
if not result.has_more_pages:
break
# This BLOCKS the event loop! 😱
result.fetch_next_page() # ❌ Synchronous - blocks EVERYTHINGWhen fetch_next_page() is called:
- It makes a network request to Cassandra (could take 10-100ms)
- During this time, the event loop is FROZEN
- NO other requests can be processed
- Your entire application becomes unresponsive
sequenceDiagram
participant R1 as Request 1
participant R2 as Request 2
participant EL as Event Loop
participant DB as Cassandra
Note over EL: With blocking fetch_next_page()
R1->>EL: Process page 1
R2->>EL: New request arrives
EL->>DB: fetch_next_page() [BLOCKING]
Note over EL,R2: Event loop frozen!<br/>Request 2 must wait!
DB-->>EL: Page 2 (after 50ms)
EL-->>R1: Continue processing
EL-->>R2: Finally handle request 2
Note over EL: With async streaming
R1->>EL: Process page 1
R2->>EL: New request arrives
EL->>DB: await fetch_next (non-blocking)
EL->>R2: Handle request 2 immediately
DB-->>EL: Page 2 ready
EL-->>R1: Continue with page 2
Imagine a web server handling 100 requests/second:
- Each
fetch_next_page()blocks for 50ms - During those 50ms, 5 new requests arrive but can't be processed
- Those requests timeout or pile up
- Your server appears slow or unresponsive
This is why fetch_next_page() not having an async version is a critical limitation!
With execute_stream(), you get automatic async page management:
# DO THIS for large tables
result = await session.execute_stream(
"SELECT * FROM billion_row_table",
stream_config=StreamConfig(fetch_size=1000)
)
# Rows are fetched on-demand as you iterate
async for row in result:
process_row(row) # Process one row at a time
# Old rows are garbage collectedHere's the streaming flow (showing how async-cassandra bridges sync to async):
sequenceDiagram
participant App as Your App
participant Stream as Async Streaming
participant ThreadPool as Thread Pool
participant Driver as Cassandra Driver
participant Cassandra
App->>Stream: execute_stream()
Stream->>ThreadPool: Run execute() in thread
ThreadPool->>Driver: execute() with fetch_size
Driver->>Cassandra: Request first page (1000 rows)
Cassandra-->>Driver: Page 1
Driver-->>ThreadPool: ResultSet
ThreadPool-->>Stream: ResultSet (wrapped)
loop For each row in page
App->>Stream: async: Get next row
Stream->>ThreadPool: Get row from ResultSet
ThreadPool-->>Stream: Row data
Stream-->>App: Return row
Note over App: Process single row
end
Note over Stream: Page 1 exhausted
App->>Stream: async: Get next row
Stream->>ThreadPool: Continue iteration
Note over ThreadPool: ResultSet.__next__() called
ThreadPool->>Driver: fetch_next_page() (blocking)
Driver->>Cassandra: Request page 2
Cassandra-->>Driver: Page 2
Driver-->>ThreadPool: Page 2 ready
ThreadPool-->>Stream: First row of page 2
Stream-->>App: Return row
Key Points About This Flow:
- async-cassandra runs the synchronous driver operations in a thread pool
- The thread pool prevents blocking the event loop
- Page fetching still happens synchronously within the thread
- But from the app's perspective, it's all async/await
IMPORTANT: Streaming result sets MUST be properly closed to prevent memory leaks. The streaming implementation uses callbacks that create circular references between the result set and the response future. Failing to close the result properly will cause memory leaks.
The context manager pattern is the preferred and safest approach as it guarantees cleanup even if exceptions occur:
# ✅ BEST PRACTICE: Using context manager (ALWAYS RECOMMENDED)
async with await session.execute_stream(query) as result:
async for row in result:
await process_row(row)
# Automatically closed when exiting the context, even on exceptions!If you cannot use a context manager for some reason, you MUST ensure cleanup with try/finally:
# ✅ ALTERNATIVE: Manual close with try/finally
result = await session.execute_stream(query)
try:
async for row in result:
await process_row(row)
if some_condition:
break # Even with early exit, finally ensures cleanup
finally:
await result.close() # CRITICAL: Always close in finally block!# ❌ WRONG: No cleanup - MEMORY LEAK!
result = await session.execute_stream(query)
async for row in result:
process_row(row)
# Result not closed - callbacks remain in memory forever!
# ❌ WRONG: Close without try/finally - risky!
result = await session.execute_stream(query)
async for row in result:
process_row(row) # If this throws, close() is never called!
await result.close() # Won't run if exception occurs aboveThe streaming implementation registers callbacks with the Cassandra driver's response future. These callbacks hold references to the result set, creating a circular reference:
AsyncStreamingResultSet ←→ Callbacks ←→ ResponseFuture
Without explicit cleanup, Python's garbage collector cannot break this cycle, causing both the result set and any data it holds to remain in memory indefinitely.
📖 Deep Dive: For a comprehensive explanation of how context managers solve this problem and why they're guaranteed to work, see Understanding Context Managers.
from async_cassandra import AsyncCluster
from async_cassandra.streaming import StreamConfig
async def process_large_table():
cluster = AsyncCluster(['localhost'])
session = await cluster.connect('my_keyspace')
# Configure streaming
config = StreamConfig(
fetch_size=1000 # Same as statement.fetch_size - sets page size to 1000 rows
)
# Start streaming query - ALWAYS USE CONTEXT MANAGER
async with await session.execute_stream(
"SELECT user_id, email, created_at FROM users",
stream_config=config
) as result:
# Process rows one at a time
rows_processed = 0
async for row in result:
# At this point, only one page (1000 rows) is in memory
# When we finish a page, the next one is fetched automatically
await send_email(row.email)
rows_processed += 1
if rows_processed % 1000 == 0:
print(f"Processed {rows_processed} users...")
print(f"Done! Processed {rows_processed} total users")
# Result automatically closed here
await cluster.shutdown()- Query executes with
fetch_size=1000 - Cassandra returns first 1000 rows (Page 1)
- You iterate through these 1000 rows
- When you need row 1001, the driver fetches Page 2
- Page 1 can now be garbage collected
- Process continues until no more pages
The fetch_size parameter in StreamConfig is exactly the same as the fetch_size on Cassandra statements - it controls the page size. This parameter is crucial for performance:
# Small fetch_size = More network requests, less memory
config_small = StreamConfig(fetch_size=100)
# - Fetches 100 rows at a time (page size = 100)
# - Good for: Large rows, limited memory
# - Bad for: Network latency, small rows
# Large fetch_size = Fewer network requests, more memory
config_large = StreamConfig(fetch_size=10000)
# - Fetches 10,000 rows at a time (page size = 10,000)
# - Good for: Small rows, good network
# - Bad for: Large rows, limited memory
# This is equivalent to:
# statement = SimpleStatement("SELECT * FROM table")
# statement.fetch_size = 10000
# How to choose?
row_size_bytes = 1024 # Estimate your average row size
memory_per_page = row_size_bytes * fetch_size
# For 1KB rows and fetch_size=5000: 5MB per pagegraph LR
subgraph "fetch_size=100"
A1[Page 1<br/>100 rows] --> A2[Page 2<br/>100 rows] --> A3[Page 3<br/>100 rows] --> A4[...]
end
subgraph "fetch_size=5000"
B1[Page 1<br/>5000 rows] --> B2[Page 2<br/>5000 rows] --> B3[...]
end
subgraph "Memory Usage"
A1 -.-> M1[~100KB]
B1 -.-> M2[~5MB]
end
Sometimes you want to process entire pages at once (e.g., bulk operations):
async def bulk_process_users():
config = StreamConfig(fetch_size=1000)
# ALWAYS use context manager for streaming
async with await session.execute_stream(
"SELECT * FROM users WHERE active = true",
stream_config=config
) as result:
# Process entire pages instead of individual rows
async for page in result.pages():
# 'page' is a list of up to 1000 rows
print(f"Processing page {result.page_number} with {len(page)} rows")
# Bulk operation on entire page
user_ids = [row.user_id for row in page]
await bulk_update_last_seen(user_ids)
# The page is garbage collected after this iteration
# Result automatically closed heresequenceDiagram
participant App
participant Stream
participant Cassandra
App->>Stream: async for page in pages()
Stream->>Cassandra: Fetch page (1000 rows)
Cassandra-->>Stream: Page data
Stream-->>App: List of 1000 rows
Note over App: Process entire page
App->>Stream: Next page
Stream->>Cassandra: Fetch next page
Note over Stream: Previous page can be<br/>garbage collected
async def export_with_progress():
# First, get total count (if needed)
count_result = await session.execute(
"SELECT COUNT(*) FROM events WHERE year = 2024"
)
total_rows = count_result.one()[0]
print(f"Exporting {total_rows:,} events...")
# Stream with progress callback
rows_processed = 0
def progress_callback(page_num: int, total_fetched: int):
percent = (total_fetched / total_rows * 100) if total_rows > 0 else 0
print(f"Page {page_num}: {total_fetched:,}/{total_rows:,} ({percent:.1f}%)")
config = StreamConfig(
fetch_size=5000,
page_callback=progress_callback
)
async with await session.execute_stream(
"SELECT * FROM events WHERE year = 2024",
stream_config=config
) as result:
async for row in result:
await process_event(row)
rows_processed += 1async def export_to_parquet(table_name: str, output_file: str):
"""Export large table to Parquet without loading all data in memory"""
import pyarrow as pa
import pyarrow.parquet as pq
config = StreamConfig(fetch_size=10000)
schema = None
writer = None
try:
# ALWAYS use context manager for streaming
async with await session.execute_stream(
f"SELECT * FROM {table_name}",
stream_config=config
) as result:
# Process page by page for efficient memory use
async for page in result.pages():
# Convert page to PyArrow table
if schema is None:
# Define schema from first page
schema = pa.schema([
(field, pa.string()) for field in page[0]._fields
])
writer = pq.ParquetWriter(output_file, schema)
# Convert page to columnar format
data = {
field: [getattr(row, field) for row in page]
for field in page[0]._fields
}
# Write batch to Parquet
batch = pa.record_batch(data, schema=schema)
writer.write_batch(batch)
print(f"Exported page {result.page_number} "
f"({result.total_rows_fetched:,} rows total)")
# Result automatically closed here
finally:
if writer:
writer.close()async def parallel_partition_processing():
"""Process multiple partitions in parallel with controlled concurrency"""
partitions = ['2024-01', '2024-02', '2024-03', '2024-04']
async def process_partition(partition: str):
config = StreamConfig(
fetch_size=2000,
page_callback=lambda p, t: print(
f"Partition {partition}: Page {p}, {t:,} rows"
)
)
stmt = await session.prepare(
"SELECT * FROM events WHERE partition_date = ?"
)
async with await session.execute_stream(
stmt,
parameters=[partition],
stream_config=config
) as result:
count = 0
async for row in result:
await process_event(row)
count += 1
return partition, count
# Process up to 3 partitions concurrently
# Each partition streams independently
from asyncio import Semaphore
sem = Semaphore(3) # Limit concurrent partitions
async def limited_process(partition):
async with sem:
return await process_partition(partition)
results = await asyncio.gather(*[
limited_process(p) for p in partitions
])
for partition, count in results:
print(f"Processed {count:,} events from partition {partition}")| Scenario | Recommended fetch_size | Reasoning |
|---|---|---|
| Small rows (<100 bytes) | 5000-10000 | Minimize network round trips |
| Medium rows (100-1KB) | 1000-5000 | Balance memory and network |
| Large rows (>1KB) | 100-1000 | Prevent memory spikes |
| Limited memory | 100-500 | Keep pages small |
| High latency network | 10000+ | Reduce round trips |
def calculate_memory_usage(avg_row_size_bytes: int, fetch_size: int) -> dict:
"""Calculate memory usage for different fetch sizes"""
page_memory_mb = (avg_row_size_bytes * fetch_size) / (1024 * 1024)
# For a table with 1 million rows
total_rows = 1_000_000
total_pages = total_rows // fetch_size
return {
"memory_per_page_mb": round(page_memory_mb, 2),
"total_pages": total_pages,
"network_round_trips": total_pages,
"max_memory_mb": round(page_memory_mb, 2), # Only one page in memory
}
# Example calculations
small_rows = calculate_memory_usage(100, 5000) # 100-byte rows
print(f"Small rows: {small_rows}")
# Output: {'memory_per_page_mb': 0.48, 'total_pages': 200,
# 'network_round_trips': 200, 'max_memory_mb': 0.48}
large_rows = calculate_memory_usage(10240, 500) # 10KB rows
print(f"Large rows: {large_rows}")
# Output: {'memory_per_page_mb': 4.88, 'total_pages': 2000,
# 'network_round_trips': 2000, 'max_memory_mb': 4.88}# ❌ WRONG - Memory leak!
result = await session.execute_stream("SELECT * FROM huge_table")
async for row in result:
process_row(row)
# Callbacks not cleaned up - circular reference remains!
# ✅ CORRECT - Always use context manager
async with await session.execute_stream("SELECT * FROM huge_table") as result:
async for row in result:
process_row(row)
# Automatically cleaned up# DON'T DO THIS - defeats the purpose of streaming!
async with await session.execute_stream(
"SELECT * FROM huge_table",
stream_config=StreamConfig(fetch_size=1000)
) as result:
# This loads everything into memory!
all_rows = []
async for row in result:
all_rows.append(row) # ❌ Accumulating in memory
# DO THIS INSTEAD
async with await session.execute_stream(
"SELECT * FROM huge_table",
stream_config=StreamConfig(fetch_size=1000)
) as result:
async for row in result:
await process_row(row) # ✅ Process and discard# Unnecessary - LIMIT already bounds the result
result = await session.execute_stream(
"SELECT * FROM users LIMIT 100", # Only 100 rows!
stream_config=StreamConfig(fetch_size=1000)
)
# Just use regular execute for small results
result = await session.execute("SELECT * FROM users LIMIT 100")# DO implement retry logic for large streams
async def reliable_stream_processing():
last_processed_id = None
while True:
try:
query = "SELECT * FROM events"
if last_processed_id:
query += f" WHERE id > '{last_processed_id}'"
query += " ALLOW FILTERING" # Be careful with this!
async with await session.execute_stream(
query,
stream_config=StreamConfig(fetch_size=5000)
) as result:
async for row in result:
await process_event(row)
last_processed_id = row.id
break # Success, exit loop
except Exception as e:
print(f"Stream failed after {last_processed_id}, retrying: {e}")
await asyncio.sleep(5)# Streaming large tables takes time!
result = await session.execute_stream(
"SELECT * FROM billion_row_table",
timeout=10.0, # ❌ 10 seconds is too short!
stream_config=StreamConfig(fetch_size=5000)
)
# Use appropriate timeout or no timeout
result = await session.execute_stream(
"SELECT * FROM billion_row_table",
timeout=None, # ✅ No timeout for streaming
stream_config=StreamConfig(fetch_size=5000)
)graph TD
A[Query Result Size] --> B{"More than 5000 rows?"}
B -->|Yes| C{"Memory constrained?"}
B -->|No| D["Use regular execute()"]
C -->|Yes| E["Use execute_stream()<br/>with small fetch_size"]
C -->|No| F{"Need progress tracking?"}
F -->|Yes| G["Use execute_stream()<br/>with callbacks"]
F -->|No| H{"Process incrementally?"}
H -->|Yes| G
H -->|No| I{"Network latency high?"}
I -->|Yes| J["Consider regular execute()<br/>or large fetch_size"]
I -->|No| G
Here's a complete example showing how to build a robust ETL pipeline:
import asyncio
from datetime import datetime
from typing import AsyncIterator
async def etl_pipeline(
source_table: str,
transform_func,
destination_table: str,
batch_size: int = 1000
):
"""
Extract, transform, and load data efficiently
"""
start_time = datetime.now()
total_processed = 0
failed_rows = []
# Prepare insert statement for destination
insert_stmt = await session.prepare(
f"INSERT INTO {destination_table} (id, data, processed_at) "
"VALUES (?, ?, ?)"
)
# Stream from source table
config = StreamConfig(
fetch_size=batch_size,
page_callback=lambda p, t: print(
f"[ETL] Page {p}: Extracted {t:,} rows from {source_table}"
)
)
# Stream from source - ALWAYS use context manager
async with await session.execute_stream(
f"SELECT * FROM {source_table}",
stream_config=config
) as result:
# Process in batches for efficiency
batch = []
async for row in result:
try:
# Transform
transformed = await transform_func(row)
# Add to batch
batch.append((
transformed['id'],
transformed['data'],
datetime.now()
))
# Load batch when full
if len(batch) >= batch_size:
await load_batch(insert_stmt, batch)
total_processed += len(batch)
batch = []
except Exception as e:
failed_rows.append((row.id, str(e)))
# Load final partial batch
if batch:
await load_batch(insert_stmt, batch)
total_processed += len(batch)
# Result automatically closed here
# Report results
duration = (datetime.now() - start_time).total_seconds()
print(f"\nETL Pipeline Complete:")
print(f"- Processed: {total_processed:,} rows")
print(f"- Failed: {len(failed_rows)} rows")
print(f"- Duration: {duration:.1f} seconds")
print(f"- Rate: {total_processed/duration:.1f} rows/second")
return {
'processed': total_processed,
'failed': failed_rows,
'duration': duration
}
async def load_batch(prepared_stmt, batch):
"""Load a batch of transformed data"""
# Use execute_concurrent for parallel inserts
from cassandra.concurrent import execute_concurrent_with_args
await asyncio.get_event_loop().run_in_executor(
None,
lambda: execute_concurrent_with_args(
session._session,
prepared_stmt,
batch,
concurrency=50
)
)For detailed information about True Async Paging behavior, common misconceptions, and best practices, see our dedicated guide: True Async Paging.
Key points covered:
- Critical importance of context managers
- How paging actually works (on-demand, not pre-fetched)
- When LIMIT is needed (hint: rarely with paging!)
- Page size recommendations for different use cases
- Common patterns and anti-patterns
The key to understanding streaming in async-cassandra is recognizing that:
- Cassandra returns data in pages (controlled by
fetch_size) - Regular execute() fetches ALL pages into memory automatically
- execute_stream() fetches pages on-demand as you iterate
- Memory usage is bounded to approximately one page at a time
Choose streaming when you need to process large result sets without overwhelming your application's memory. The slight complexity is worth it for the memory efficiency and control it provides.