feat(core): add fetch listener callbacks and async inter-broker requests#3248
feat(core): add fetch listener callbacks and async inter-broker requests#3248
Conversation
06b861a to
930015a
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new async inter-broker request sender utility and adds fetch lifecycle callbacks (per-partition fetch results + fetch session close notifications) to the ElasticKafkaApis path.
Changes:
- Add
AsyncSender+InterBrokerAsyncSenderto support async inter-broker requests backed byInterBrokerSendThread. - Introduce
FetchListenerand wire it intoElasticKafkaApisto report fetch offsets/timestamps, plus session-close notifications viaFetchSessionCacheShard. - Add/extend unit tests covering the sender behavior, fetch listener notifications, and session removal listener behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| server-common/src/main/java/org/apache/kafka/server/util/AsyncSender.java | New async sender interface for inter-broker requests |
| server-common/src/main/java/org/apache/kafka/server/util/InterBrokerAsyncSender.java | Async wrapper around InterBrokerSendThread returning CompletableFuture |
| server-common/src/test/java/org/apache/kafka/server/util/InterBrokerAsyncSenderTest.java | Unit tests for async sender success/error/timeout paths |
| core/src/main/scala/kafka/server/streamaspect/FetchListener.java | New listener interface for fetch + session-close callbacks |
| core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala | Wire fetch listener notifications into fetch handling |
| core/src/main/scala/kafka/server/FetchSession.scala | Add removal listener hook to FetchSessionCacheShard |
| core/src/main/scala/kafka/server/BrokerServer.scala | Instantiate listener + hook session-close notifications + set fetch listener on ElasticKafkaApis |
| core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | Add tests validating fetch listener notifications |
| core/src/test/scala/unit/kafka/server/FetchSessionTest.scala | Add test for removal listener notifications |
| core/src/test/scala/unit/kafka/server/streamaspect/ElasticKafkaApisTest.scala | Update helper to accept/set fetch listener |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e17325f to
03e253e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f275710 to
3129dd4
Compare
d8c5a65 to
c6e8f2a
Compare
203e776 to
ab84639
Compare
|
|
||
| def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage | ||
|
|
||
| @volatile private var _latestAppendState: Option[LatestAppendState] = None |
There was a problem hiding this comment.
If you modify the Kafka upstream code, please wrap the change with AutoMQ inject start & end. (And check other codes)
| } | ||
|
|
||
| @Override | ||
| public void close() { |
There was a problem hiding this comment.
Could we gracefully await the pendingRequest completion.
…tion - introduce AsyncSender interface for async request sending - add InterBrokerAsyncSender based on InterBrokerSendThread with CompletableFuture completion - add InterBrokerAsyncSenderTest covering success, readiness transitions, disconnect/auth failure, timeout, wakeup and close
…BrokerExtensionHandle
…ction - Add AsyncFetchListener wrapper to handle both onFetch and onSessionClosed asynchronously - Move fetchListenerExecutor from ElasticKafkaApis to BrokerServer for unified lifecycle management - Simplify notifyFetchListener in ElasticKafkaApis by delegating async logic to AsyncFetchListener - Ensure proper executor shutdown in BrokerServer.shutdown() This change addresses the issue where calling fetchListener.onSessionClosed() directly inside FetchSessionCacheShard.remove() (while holding synchronized lock) could block fetch session cache operations if the listener is slow or blocked.
Cherry-picked from 9d53374 (feat/consumer-lag). Adds LatestAppendState to UnifiedLog to capture maxTimestamp and shallowOffsetOfMaxTimestamp on each successful append, avoiding the expensive fetchOffsetByTimestamp lookup.
- Add @volatile to fetchListener for cross-thread visibility - Drain pending requests on InterBrokerAsyncSender close - Replace removed shallowOffsetOfMaxTimestamp with lastOffset
Use a bounded LinkedBlockingQueue(1024) with DiscardOldestPolicy to prevent unbounded memory growth under high load. Also batch session removal notifications into a single executor task to minimize lock hold time on FetchSessionCacheShard.
- Add volatile closed flag to InterBrokerAsyncSender to reject requests after close, preventing futures that never complete - Add awaitTermination for fetchListenerExecutor during broker shutdown
- Add onSessionClosedBatch to AsyncFetchListener to eliminate duplicated async dispatch logic in BrokerServer sessionRemovalListener - Skip notifyFetchListener when listener is NOOP to avoid unnecessary map allocation and record batch iteration on the fetch hot path - Remove speculative TODO comment in extractFetchOffsetAndTimestamp - Add missing license header to AsyncSender.java and trailing newlines
Delegate session removal notification to AsyncFetchListener batch method instead of duplicating the async dispatch logic inline.
ab84639 to
bf5f33d
Compare
No description provided.