Skip to content

feat(core): add fetch listener callbacks and async inter-broker requests#3248

Open
Gezi-lzq wants to merge 20 commits into1.7from
feat/consumer-lag-hook
Open

feat(core): add fetch listener callbacks and async inter-broker requests#3248
Gezi-lzq wants to merge 20 commits into1.7from
feat/consumer-lag-hook

Conversation

@Gezi-lzq
Copy link
Copy Markdown
Contributor

No description provided.

@Gezi-lzq Gezi-lzq force-pushed the feat/consumer-lag-hook branch 2 times, most recently from 06b861a to 930015a Compare March 11, 2026 08:26
@Gezi-lzq Gezi-lzq changed the title feat(core): add fetch/session-close hooks feat(core): add fetch listener callbacks and async inter-broker requests Mar 11, 2026
@Gezi-lzq Gezi-lzq requested a review from Copilot March 11, 2026 08:45
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new async inter-broker request sender utility and adds fetch lifecycle callbacks (per-partition fetch results + fetch session close notifications) to the ElasticKafkaApis path.

Changes:

  • Add AsyncSender + InterBrokerAsyncSender to support async inter-broker requests backed by InterBrokerSendThread.
  • Introduce FetchListener and wire it into ElasticKafkaApis to report fetch offsets/timestamps, plus session-close notifications via FetchSessionCacheShard.
  • Add/extend unit tests covering the sender behavior, fetch listener notifications, and session removal listener behavior.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
server-common/src/main/java/org/apache/kafka/server/util/AsyncSender.java New async sender interface for inter-broker requests
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerAsyncSender.java Async wrapper around InterBrokerSendThread returning CompletableFuture
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerAsyncSenderTest.java Unit tests for async sender success/error/timeout paths
core/src/main/scala/kafka/server/streamaspect/FetchListener.java New listener interface for fetch + session-close callbacks
core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala Wire fetch listener notifications into fetch handling
core/src/main/scala/kafka/server/FetchSession.scala Add removal listener hook to FetchSessionCacheShard
core/src/main/scala/kafka/server/BrokerServer.scala Instantiate listener + hook session-close notifications + set fetch listener on ElasticKafkaApis
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Add tests validating fetch listener notifications
core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Add test for removal listener notifications
core/src/test/scala/unit/kafka/server/streamaspect/ElasticKafkaApisTest.scala Update helper to accept/set fetch listener

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/BrokerServer.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala
Comment thread core/src/main/scala/kafka/server/FetchSession.scala
Comment thread core/src/main/scala/kafka/server/BrokerServer.scala Outdated
@Gezi-lzq Gezi-lzq force-pushed the feat/consumer-lag-hook branch 2 times, most recently from f275710 to 3129dd4 Compare April 9, 2026 09:27
@Gezi-lzq Gezi-lzq changed the base branch from 1.6 to 1.7 April 9, 2026 09:27
@Gezi-lzq Gezi-lzq force-pushed the feat/consumer-lag-hook branch 2 times, most recently from d8c5a65 to c6e8f2a Compare April 10, 2026 03:24
@Gezi-lzq Gezi-lzq marked this pull request as ready for review April 10, 2026 06:12
Comment thread core/src/main/scala/kafka/server/streamaspect/AsyncFetchListener.scala Outdated
Comment thread core/src/main/scala/kafka/server/BrokerServer.scala Outdated
@Gezi-lzq Gezi-lzq force-pushed the feat/consumer-lag-hook branch 2 times, most recently from 203e776 to ab84639 Compare April 14, 2026 01:45

def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage

@volatile private var _latestAppendState: Option[LatestAppendState] = None
Copy link
Copy Markdown
Collaborator

@superhx superhx Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you modify the Kafka upstream code, please wrap the change with AutoMQ inject start & end. (And check other codes)

}

@Override
public void close() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we gracefully await the pendingRequest completion.

…tion

- introduce AsyncSender interface for async request sending

- add InterBrokerAsyncSender based on InterBrokerSendThread with CompletableFuture completion

- add InterBrokerAsyncSenderTest covering success, readiness transitions, disconnect/auth failure, timeout, wakeup and close
…ction

- Add AsyncFetchListener wrapper to handle both onFetch and onSessionClosed asynchronously
- Move fetchListenerExecutor from ElasticKafkaApis to BrokerServer for unified lifecycle management
- Simplify notifyFetchListener in ElasticKafkaApis by delegating async logic to AsyncFetchListener
- Ensure proper executor shutdown in BrokerServer.shutdown()

This change addresses the issue where calling fetchListener.onSessionClosed()
directly inside FetchSessionCacheShard.remove() (while holding synchronized lock)
could block fetch session cache operations if the listener is slow or blocked.
Gezi-lzq added 14 commits April 22, 2026 10:37
Cherry-picked from 9d53374 (feat/consumer-lag).
Adds LatestAppendState to UnifiedLog to capture maxTimestamp and
shallowOffsetOfMaxTimestamp on each successful append, avoiding the
expensive fetchOffsetByTimestamp lookup.
- Add @volatile to fetchListener for cross-thread visibility
- Drain pending requests on InterBrokerAsyncSender close
- Replace removed shallowOffsetOfMaxTimestamp with lastOffset
Use a bounded LinkedBlockingQueue(1024) with DiscardOldestPolicy to
prevent unbounded memory growth under high load. Also batch session
removal notifications into a single executor task to minimize lock
hold time on FetchSessionCacheShard.
- Add volatile closed flag to InterBrokerAsyncSender to reject requests
  after close, preventing futures that never complete
- Add awaitTermination for fetchListenerExecutor during broker shutdown
- Add onSessionClosedBatch to AsyncFetchListener to eliminate duplicated
  async dispatch logic in BrokerServer sessionRemovalListener
- Skip notifyFetchListener when listener is NOOP to avoid unnecessary
  map allocation and record batch iteration on the fetch hot path
- Remove speculative TODO comment in extractFetchOffsetAndTimestamp
- Add missing license header to AsyncSender.java and trailing newlines
Delegate session removal notification to AsyncFetchListener batch
method instead of duplicating the async dispatch logic inline.
@Gezi-lzq Gezi-lzq force-pushed the feat/consumer-lag-hook branch from ab84639 to bf5f33d Compare April 22, 2026 02:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants