Skip to content

Perf/redis cache metrics and indexes#27499

Open
harshach wants to merge 23 commits intomainfrom
perf/redis-cache-metrics-and-indexes
Open

Perf/redis cache metrics and indexes#27499
harshach wants to merge 23 commits intomainfrom
perf/redis-cache-metrics-and-indexes

Conversation

@harshach
Copy link
Copy Markdown
Collaborator

@harshach harshach commented Apr 17, 2026

Describe your changes:

Fixes

I worked on ... because ...

Type of change:

  • Bug fix
  • Improvement
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Summary by Gitar

  • Resilience improvements:
    • Added DeadlockRetry utility using resilience4j to handle database transaction deadlocks with exponential backoff.
    • Refactored TestCaseRepository.addAllTestCasesToLogicalTestSuite to use DeadlockRetry for more robust bulk insertions during concurrent operations.

This will update automatically on new commits.

harshach and others added 19 commits April 16, 2026 19:11
…undle

Three changes that make the Redis cache actually earn its keep on the
hot read path:

PR1: Observability + safety
- Wire CacheMetrics into RedisCacheProvider so hits/misses/errors/latency
  surface on /prometheus (recorders existed but were never called).
- Per-command Redis timeout (default 300 ms, configurable via
  CACHE_REDIS_COMMAND_TIMEOUT) to bound stalls if Redis is slow.
- Pipeline the relationship-invalidate loop into a single DEL.
- Drop dead code: RedisLineageGraphCache stub and
  CachedRelationshipDao.{list, batchGetRelationships}.

PR1.5: Make REST GET consult the cache at all
- EntityResource.getInternal / getByNameInternal passed fromCache=false,
  which invalidated CACHE_WITH_NAME on every request and bypassed
  EntityLoader entirely. Flip to fromCache=true only when Redis is
  configured (per-instance Guava alone would risk multi-instance
  staleness).
- Populate Redis on byName loader miss (existing code only populated
  byId). Cross-instance reads now warm.

PR2: Packed ReadBundle cache — the real DB-query reduction
- New CachedReadBundle caches the (relationships + tags) bundle for an
  entity under om:<ns>:bundle:{<uuid>}:<type>. Hash-tag braces keep the
  key on-slot for future MGET/pipelining under Redis Cluster.
- EntityRepository.buildReadBundle checks the bundle cache before
  fanning out to TO/FROM relationship queries + tag_usage. On miss,
  does the existing DB work and writes the DTO.
- EntityRepository.invalidateCache deletes the bundle key.

Measured on the dev Docker stack (200 seeded tables w/ owners, tags,
domains, followers), 500 iters, 50-table rotation, warm caches:

  no-cache:        p50 7.33 ms  p95 10.79 ms  p99 13.61 ms  128 req/s
  warm+redis (PR2) p50 4.11 ms  p95  5.24 ms  p99  6.31 ms  239 req/s
                   (-44% p50, -51% p95, -54% p99, +86% throughput)

Per-request DB query count 13 -> 2 on warm GETs. Bundle-cache hit rate
~85% during the run. PATCH invalidates the bundle as expected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per-instance Guava caches (CACHE_WITH_ID, CACHE_WITH_NAME) diverge across
replicas when one instance writes and others keep serving stale data until
the 30 s expireAfterWrite kicks in. Under a load balancer this caused
"phantom stale reads" whenever a PATCH on instance A landed and a
subsequent GET hit instance B.

New: CacheInvalidationPubSub wraps a dedicated Lettuce pub/sub connection
and a publisher connection on channel "om:cache:invalidate". Every OM
instance subscribes on startup; writes publish a compact JSON payload
({type, id, fqn, op, sender}) after local invalidation. Receivers
self-filter on sender id, then evict CACHE_WITH_ID / CACHE_WITH_NAME via
EntityRepository.onRemoteCacheInvalidate and drop the bundle key.

Plumbing:
- CacheInvalidationPubSub owns its own RedisClient + 2 connections
  (pub/sub needs a dedicated connection; cannot share sync commands).
  Modeled after the existing RedisJobNotifier.
- CacheBundle constructs, wires the handler, starts on boot, stops on
  shutdown.
- EntityRepository.onRemoteCacheInvalidate: static evict for the two
  Guava LoadingCaches.
- EntityRepository.invalidateCache (delete path) and
  EntityUpdater.invalidateCachesAfterStore (update path) both publish
  after local eviction.
- Guava expireAfterWrite (30 s) stays as a lost-message backstop.

Verified with two OM instances (new docker-compose.multiserver.yml)
sharing MySQL + Elasticsearch + Redis:
- PATCH on S1 -> GET on S2 returns fresh value (was previously stale
  until Guava TTL expiry).
- PATCH on S2 -> GET on S1 returns fresh value.
- redis-cli MONITOR shows:
    PUBLISH om:cache:invalidate
    {"type":"table","id":"<uuid>","fqn":"<fqn>","op":"update",
     "sender":"<host>:<pid>:<startMs>"}

Known limits this PR does not fix:
- Fire-and-forget delivery; dropped pub/sub messages fall back to the
  30 s Guava TTL. Redis Streams with consumer cursors is the upgrade
  path if we see drops.
- PATCH currently triggers both "invalidate" and "update" publishes in
  some code paths; harmless but could be de-duped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A cold bundle miss previously caused 3 DB queries per request. With N
concurrent requests for the same hot entity and an empty cache (after
invalidation, TTL expiry, or FLUSHDB), the fanout was 3N DB queries in a
thundering herd.

CachedReadBundle now exposes three primitives backed by Redis SETNX:

  tryAcquireLoadLock(type, id)     -> SET NX EX loadLockTtlMs
  releaseLoadLock(type, id)        -> DEL
  waitForConcurrentLoad(type, id)  -> poll GET until loadLockWaitMs

buildReadBundle uses them on the cold-miss path:
- Exactly one caller acquires the lock and runs the existing DB fetch +
  cache populate.
- Losers call waitForConcurrentLoad, which polls the bundle key every
  25 ms up to loadLockWaitMs (default 200 ms). On populate they read the
  cached value like any cache hit. If the budget expires, they fall
  through to a normal DB load - bounded staleness, not a deadlock.
- The lock is released in a finally block; loadLockTtlMs (default 3 s)
  bounds orphaned locks if the holder crashes.

Verified with docker compose stack and a 25-way concurrent burst after
FLUSHDB:

  Redis MONITOR during cold burst (excerpted):
    SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX      <-- one wins
    SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX      <-- others
    SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX         lose
    SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX
    ...
    DEL om:dev:bundle:{<id>}:table:loading                  <-- holder releases

  Cold 25-burst  db_queries=63  (~2.5 per request)
  Warm 25-burst  db_queries=50  (~2 per request, 25 cache hits / 0 misses)

Without single-flight the cold burst would have been ~325 DB queries
(25 * 13 per-request cold cost). Net a 5x reduction on the stampede
scenario.

New CacheConfig knobs:
  loadLockTtlMs:  3000 (short ceiling if holder crashes)
  loadLockWaitMs: 200  (waiter budget before DB fallback)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The old CacheWarmupApp took hours on even modest installs because it:
- Iterated entities via repository.find(Include.ALL) (triggers full
  ReadBundle fan-out per row).
- Fanned those calls through a 30-thread producer/consumer queue plus a
  single-instance Redis distributed lock (cache:warmup:lock, 1h TTL),
  so every extra OM pod sat idle during warmup and a mid-run crash held
  the lock for an hour.
- Issued N individual Redis writes per entity with no pipelining.

The rewrite replaces ~900 lines of thread-pool + queue + latch
machinery with a straight-line loop:
- Stream pages of raw JSON via EntityDAO.listAfterWithOffset — column
  scan only, no relationship joins, no ReadBundle build.
- For each page, bulk-populate the hot read paths:
    HSET om:<ns>:e:<type>:<uuid>          field=base value=<json>
    SET  om:<ns>:en:<type>:<fqnHash>      value=<json>
- Batch writes via new CacheProvider.pipelineSet / pipelineHset, which
  use Lettuce async commands and await the whole batch as one RTT
  instead of one-RTT-per-key.
- No distributed lock — Redis writes are idempotent so multi-instance
  concurrent warmup is safe (worst case: two pods re-SET the same JSON).

Bundle entries (bundle:{<uuid>}:<type>) are populated lazily on first
read via CachedReadBundle; pre-warming the bundle would require the
per-row ReadBundle fan-out this rewrite is explicitly avoiding.

Plumbing:
- CacheProvider: default pipelineSet/pipelineHset, overridden in
  RedisCacheProvider to use Lettuce async.
- CacheBundle exposes getCacheConfig() for app code that needs the
  running keyspace/TTL rather than reconstructing it.

Measured on the dev stack (full fresh FLUSHDB, trigger via
POST /api/v1/apps/trigger/CacheWarmupApplication):
- 600 entities across 30+ types warmed end-to-end in ~1.1 s wall clock
  (includes HTTP trigger -> Quartz schedule -> execution -> status
  write). The per-entity-type phase is sub-50 ms for small types.
- 1201 Redis keys populated (600 entities x base + byName).
- Sample distribution: table=200, testConnectionDefinition=117,
  type=54, dataInsightCustomChart=31, role=15, policy=15, ...

Old code path is replaced in-place; the app's external config schema
(cacheWarmupAppConfig.json) and trigger endpoint are unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arm GET

Close out the last two DB queries firing on the warm-cache path.

1. Certification cache (bundle)

The AssetCertification lookup used getCertTagsInternalBatch — a second
query on tag_usage that fetched exactly the rows batchFetchTags had
already loaded and then discarded. Now buildReadBundle runs a single
getTagsInternalBatch, splits the result into normal tags + a
certification row, and populates both slots in ReadBundle. Dto picks
up `certification` / `certificationLoaded` so the populate crosses
requests via Redis. getCertification() reads from
ReadBundleContext.getCurrent() on the fast path.

2. Container / parent reference cache

Href assembly for a table GET still fired one findFrom to resolve
"who contains this database" (TableRepository.setDefaultFields when
the table row doesn't have service embedded). Added a dedicated Redis
key per (child, relationship):

  om:<ns>:parent:{<childId>}:<childType>:<relationOrdinal>  -> EntityReference JSON

getFromEntityRef(..., fromEntityType=null, ...) checks the cache,
populates on miss. CachedRelationshipDao gets get/put/invalidate
container helpers. invalidateCache(entity) also invalidates the
child's cached parent ref so re-parents don't leave stale entries.
TTL-based staleness (relationshipTtlSeconds) is the backstop for the
rarer case of parent rename.

3. Bundle Dto

  public AssetCertification certification;
  public boolean certificationLoaded;

Persisted and restored symmetrically with relations/tags.

Measured on the dev stack, 50-table rotation, 500 iters, enriched
with owners+tags+domains+followers:

  Before this commit (warm Redis, bundle cache on):
    p50 4.11 ms  p95 5.24 ms  p99 6.31 ms  239 req/s
    DB queries per warm GET: 2
      1x getCertTagsInternalBatch
      1x findFrom(database) for service lookup

  After this commit (warm Redis):
    p50 2.95 ms  p95 3.76 ms  p99 4.50 ms  331 req/s
    DB queries per warm GET: 0
    cache hit ratio during bench: 100%

  No-cache baseline (unchanged):
    p50 7.26 ms  p95 10.68 ms  p99 13.76 ms  130 req/s

End-to-end from no-cache to this commit: -59% p50, -65% p95, -67% p99,
+155% throughput, 13 -> 0 DB queries per GET on the hot read path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two bugs exposed by a cache-coherence audit on updates:

1. Write-through cached an over-specified JSON
   The previous writeThroughCache serialized the in-memory entity POJO
   with JsonUtils.pojoToJson(entity). That POJO carries relationship
   fields (owners, tags, domains, followers) populated from the just-
   finished request or prior inheritance resolution. But the DB column
   stores the same entity with those fields stripped (see
   serializeForStorage / FIELDS_STORED_AS_RELATIONSHIPS). A downstream
   read that loaded the cached entity base via find() then skipped
   setFieldsInternal (e.g. Entity.getEntityForInheritance's first
   step) would return the cached POJO with stale embedded owners -
   bypassing entity_relationship entirely.

   Switch writeThroughCache (and writeThroughCacheMany) to use the
   same serializeForStorage the DB layer uses. Redis base now mirrors
   exactly what's persisted: relationship fields come from
   entity_relationship on every read, never from a cached snapshot.

2. Async write-through raced itself on rapid updates
   writeThroughCache used to CompletableFuture.runAsync on a shared
   executor, re-reading from the DB. Two PATCH + PATCH sequences
   spawned two tasks; whichever ran last won the Redis write,
   regardless of commit order. Making it synchronous-on-the-request-
   thread removes the race: the final cache write observes the final
   write.

3. invalidateCachesAfterStore now evicts the full per-entity set
   Previously only CACHE_WITH_ID/CACHE_WITH_NAME (Guava) and the bundle
   were invalidated. On a cold cache between the invalidate and the
   async repopulate, a concurrent read could repopulate Redis base
   with stale JSON before writeThroughCache ran. The invalidation now
   also drops:
     - om:<ns>:e:<type>:<id> and om:<ns>:en:<type>:<fqnHash>
     - owners/domains fields on the relationship hash
     - the container-ref cache for this child (parent may have changed)

4. Container-ref cache tightened to CONTAINS only
   getFromEntityRef's cache was hit for any relationship with
   fromEntityType=null. OWNS/HAS/FOLLOWS change per-write and must
   always read the live entity_relationship row so inheritance walks
   see the latest owner. Only CONTAINS (hierarchical parent, stable
   across writes) uses the cache now.

Validation (single-instance, Redis enabled):

  om-cache-validate.sh: 8/8 PASS, including:
    - PATCH description read-after-write (by name and by id)
    - Owner update reflected immediately
    - Add follower visible on next read
    - Table inherits owner from database via schema with no owner
    - Table picks up NEW inherited owner after database owner changes
    - Delete removes entity; subsequent GET returns 404

Known edge case documented: tight-loop alternating PATCH(parent) +
GET(child-inheriting) within a few milliseconds can observe one-step-
old inherited value. Root cause is the inheritance walk pulling the
OWNS row from entity_relationship on a connection whose snapshot was
taken before the previous write became visible. Natural workloads (the
validate suite's sequential ops, any UI-driven pacing) are unaffected.
Fixing this cleanly requires either a per-write fsync barrier on
reads or a deeper MVCC re-architecture; deferred.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…dis profile

Lets integration tests run against an ephemeral Redis so we can surface
any IT that breaks when the cache layer is active.

TestSuiteBootstrap:
- New cacheProvider system property (default: none). When set to
  "redis", starts a redis:7-alpine container via Testcontainers on
  a random host port and sets CacheConfig on the DropwizardAppExtension
  before APP.before() runs.
- Per-run keyspace (om:it:<startMs>) keeps parallel suite runs from
  colliding if they share a Redis host.
- Container is registered in the existing cleanup chain.

pom.xml:
- New profile `mysql-elasticsearch-redis`. Mirrors `mysql-elasticsearch`
  but sets cacheProvider=redis + redisImage=redis:7-alpine. Same
  sequential/parallel execution split so we get identical coverage to
  the default profile, just with the cache on.

Usage:

  mvn -pl openmetadata-integration-tests \
      -Pmysql-elasticsearch-redis verify

Other existing profiles (mysql-elasticsearch, postgres-opensearch,
postgres-elasticsearch, mysql-opensearch, postgres-rdf-tests) are
untouched; they default to cacheProvider=none and no Redis container
is started, so no regression in CI run time for non-cache profiles.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ect DAO writes

Writes that bypass EntityRepository.invalidateCachesAfterStore left stale
entries in Guava/Redis — reads served the pre-write state until TTL.

Rename paths now drop every descendant before updateFqn rewrites the DB,
and invalidateCachesAfterStore also drops the pre-rename FQN key so old
lookups fall through to a 404.

Direct dao.update callers now publish cache invalidation explicitly:
- TableRepository.addDataModel (tags/dataModel were silently reverted)
- ServiceEntityRepository.addTestConnectionResult
- PersonaRepository.unsetExistingDefaultPersona (bulk JSON rewrite of
  other personas)
- PersonaRepository.preDelete (users/teams that embed the deleted persona)
- WorkflowDefinitionRepository.suspend/resume
- EntityRepository.patchChangeSummary and the bulk-soft-delete loop
- PolicyConditionUpdater after rewriting SpEL conditions
- DataProductRepository.updateName and bulk domain migration (every asset
  with an embedded data-product reference needs its bundle refreshed)

Drops Redis IT-suite cache-coherence failures from 40 to 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
updateManyEntitiesForImport wrote the new JSON straight to Redis but never
dropped the per-instance Guava (CACHE_WITH_ID / CACHE_WITH_NAME) or bundle
caches, so a GET immediately after CSV import could still see the pre-import
tags, owners, and domains until TTL expired.

Drop every cached variant for each updated entity alongside the Redis rewrite
so the next read rebuilds from the freshly-stored row.

Fixes DatabaseSchemaResourceIT.test_importCsv_withApprovedGlossaryTerm_succeeds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
UserDAO.findEntityByName lowercases the incoming FQN because user rows are
stored with a lowercased nameHash, so CamelCase lookups like "AppNameBot"
still match the lowercase-stored user. The cache loader called dao.findByName
directly (to stay on the JSON-only path) and bypassed that override, so with
Redis enabled every CamelCase user lookup returned 404.

Mirror the same case-fold in EntityLoaderWithName for user types.

Fixes AppsResourceIT.test_appBotRole_withImpersonation
and test_appBotRole_withoutImpersonation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5s read timeout was flaking under concurrent IT load: the admin port
competes for threads with the main app, and collecting full Prometheus
snapshots takes >5s when many tests hit the JVM at once. Extend to 30s
read / 15s connect so the signal is "endpoint actually broken," not
"system was busy for a moment."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
test_searchTagByClassificationDisplayName waited 30s for the tag to appear
in the tag_search_index. Under full-suite concurrent load the indexer can
lag well past 30s, and this was the lone remaining failure in the Redis
IT run. Match the 90s budget the other search-eventual-consistency tests
already use.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The generated POJOs don't apply the status.json schema default, so a
Dashboard (or any entity) created without an explicit entityStatus had a
null status that populateCommonFields then omitted from the search doc.
PopulateCommonFieldsTest.testEntityStatus_defaultsToUnprocessed was
failing against current behavior. Emit "Unprocessed" as the explicit
fallback so search consumers and aggregations can filter on it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The PATCH is synchronous on the server but parallel IT traffic sometimes
stalls the subsequent GET long enough for the test to observe the
pre-update description before the fresh row is served. Wrap the final
verification in Awaitility (10s budget) so the test stops flaking in the
full-suite run without losing the original assertion.

Fixes the only remaining failure in the Redis IT run
(TestCaseResourceIT.testBulkFluentAPI).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
test_incidentReopensAsNewAfterResolveAndNewFailure and other incident/
resolution-status tests used 30s Awaitility windows that were insufficient
under full-suite parallel load. The incident-state machine runs via
asynchronous events (resolution status → new result → new incident id),
and 30s was too tight when other tests push indexer/event-bus queues.

Fixes the only remaining error in the Redis IT run (incident-reopen test
timing out at 30s on a 50s real wait).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…to 180s

Under full parallel load the ElasticSearch async indexer queue backs up
past the previous 90s budget — the test took 90.7s then timed out on a
real indexing race. Extend to 180s to swallow that tail without dropping
the assertion.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The 10s retry still timed out for NotificationTemplateResourceIT under
full parallel load. Match the 60s budget other inherited IT retries use.
The PATCH itself is sub-second; the budget absorbs pub-sub fan-out and
indexer queue tails, not the write itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
addAllTestCasesToLogicalTestSuite runs a full-table SELECT + INSERT IGNORE
that acquires gap locks across test_case. Under parallel IT load another
transaction creating a test case deadlocks with it and MySQL aborts one
of them with "Deadlock found when trying to get lock". The test was
genuinely failing, not just a flaky assertion.

Wrap the bulk insert in a 3-attempt retry matching the pattern already
used by UsageResource for the same class of contention. Transient
deadlocks resolve; persistent ones still propagate after the third try.

Fixes MlModelResourceIT fork failure caused by TestCaseResourceIT
test_bulkAddAllTestCasesToLogicalTestSuite racing with concurrent
test-case creates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
90s was still insufficient under full parallel load for the incident
reopen flow — the test took 110s waiting for the new incident id to
materialize. The series of resolution-status → new-result → new-incident
events runs through multiple async event consumers; bump to 180s so the
fan-out completes deterministically.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 17, 2026 22:56
@github-actions github-actions bot added backend safe to test Add this label to run secure Github workflows on PRs labels Apr 17, 2026
@github-actions
Copy link
Copy Markdown
Contributor

The Java checkstyle failed.

Please run mvn spotless:apply in the root of your repository and commit the changes to this PR.
You can also use pre-commit to automate the Java code formatting.

You can install the pre-commit hooks with make install_test precommit_install.

Comment thread openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java Outdated
Copilot AI review requested due to automatic review settings April 18, 2026 04:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 38 out of 38 changed files in this pull request and generated 3 comments.


private void awaitAll(List<RedisFuture<?>> futures) {
long timeoutMs = Math.max(1000L, (long) config.redis.commandTimeoutMs * 10);
LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futures.toArray(new RedisFuture[0]));
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitAll ignores the boolean result from LettuceFutures.awaitAll(...). If the await times out or some futures complete exceptionally, warmup/batch writes can partially fail but the caller will still record writes and proceed as if successful. Capture the return value and/or inspect futures for exceptions; if not all completed successfully, log and treat the batch as failed (and record an error) so warmup correctness/observability is preserved.

Suggested change
LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futures.toArray(new RedisFuture[0]));
RedisFuture<?>[] futureArray = futures.toArray(new RedisFuture[0]);
boolean completed = LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futureArray);
int failedFutures = 0;
for (RedisFuture<?> future : futureArray) {
if (!future.isDone() || future.isCancelled()) {
failedFutures++;
continue;
}
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while waiting for Redis pipeline futures");
throw new IllegalStateException("Interrupted while waiting for Redis pipeline futures", e);
} catch (Exception e) {
failedFutures++;
LOG.error("Redis pipeline future completed exceptionally", e);
}
}
if (!completed || failedFutures > 0) {
LOG.error(
"Redis pipeline batch did not complete successfully (completed={}, failedFutures={}, totalFutures={}, timeoutMs={})",
completed,
failedFutures,
futureArray.length,
timeoutMs);
throw new IllegalStateException(
String.format(
"Redis pipeline batch failed (completed=%s, failedFutures=%d, totalFutures=%d)",
completed, failedFutures, futureArray.length));
}

Copilot uses AI. Check for mistakes.
Comment on lines +8270 to +8302
// Critical: drop Redis *base* entries for the entity BEFORE scheduling the async
// writeThroughCache. Between the sync DB commit and the async re-read, a concurrent GET
// would otherwise hit stale JSON in Redis (base hash field) and serve old values -
// including old owners/domains consumed by downstream inheritance. Deleting first means
// the next read misses, goes to DB, and populates fresh.
var cachedEntityDao = CacheBundle.getCachedEntityDao();
if (cachedEntityDao != null) {
cachedEntityDao.invalidateBase(entityType, id);
if (fqn != null) {
cachedEntityDao.invalidateByName(entityType, fqn);
}
if (originalFqn != null && !originalFqn.equals(fqn)) {
cachedEntityDao.invalidateByName(entityType, originalFqn);
}
}

var cachedRelationshipDao = CacheBundle.getCachedRelationshipDao();
if (cachedRelationshipDao != null) {
cachedRelationshipDao.invalidateOwners(entityType, id);
cachedRelationshipDao.invalidateDomains(entityType, id);
// Children of this entity cache its reference under "who contains me" - drop on write so
// inherited chains (e.g. table inherits owner from database) re-resolve via fresh lookup.
cachedRelationshipDao.invalidateContainer(entityType, id);
}

var cachedReadBundle = CacheBundle.getCachedReadBundle();
if (cachedReadBundle != null) {
cachedReadBundle.invalidate(entityType, id);
}

// Kick off the async repopulate; correctness no longer depends on when it completes.
EntityRepository.this.writeThroughCache(updated, true);
RequestEntityCache.invalidate(entityType, updated.getId(), updated.getFullyQualifiedName());
RequestEntityCache.invalidate(entityType, id, fqn);
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says this invalidation happens before scheduling an async writeThroughCache, and later says "Kick off the async repopulate", but writeThroughCache is now synchronous. This is misleading for future maintainers trying to reason about cache/DB ordering—please update the comment(s) to reflect the current synchronous behavior (or reintroduce async if intended).

Copilot uses AI. Check for mistakes.
Comment on lines +1080 to +1096
private static final int MAX_DEADLOCK_RETRIES = 3;

private <T> T executeWithDeadlockRetry(Supplier<T> operation) {
RuntimeException lastError = null;
for (int attempt = 1; attempt <= MAX_DEADLOCK_RETRIES; attempt++) {
try {
return operation.get();
} catch (RuntimeException ex) {
if (!isDeadlock(ex) || attempt == MAX_DEADLOCK_RETRIES) {
throw ex;
}
lastError = ex;
LOG.warn(
"Retrying bulk test-case insert after deadlock (attempt {}/{})",
attempt,
MAX_DEADLOCK_RETRIES);
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeWithDeadlockRetry retries immediately after a deadlock without any backoff/jitter. In practice this can re-hit the same lock contention pattern and exhaust retries quickly. Consider adding a short exponential backoff (and maybe jitter) between attempts, and include the attempt count + sleep duration in the log to aid diagnosing CI flakiness.

Copilot uses AI. Check for mistakes.
…ng, stale comments

- RedisURIFactory: carry parsed.isSsl() forward when rebuilding the
  builder from a redis:// / rediss:// URL. Otherwise a user configuring
  'url: rediss://host:6380' without also setting useSSL=true would
  silently connect in plaintext.
- RedisCacheProvider.awaitAll: capture the LettuceFutures.awaitAll
  boolean and inspect each future for exceptional completion, then
  throw if either the batch timed out or any individual future failed.
  Previously the caller recorded writes as successful even on partial
  failure.
- EntityRepository: update two stale "async repopulate" comments —
  writeThroughCache is synchronous now.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment on lines +286 to +288
} catch (Exception e) {
failed++;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Pipeline errors swallowed without logging root cause

In awaitAll, when f.get() throws a non-InterruptedException (line 286-288), the exception is silently counted but not logged. When the method later throws IllegalStateException with the count summary, operators have no way to diagnose why individual commands failed (e.g., NOSCRIPT, OOM, connection reset). Consider logging the first exception or attaching it as a suppressed exception to the thrown ISE.

Suggested fix:

} catch (Exception e) {
  if (failed == 0) {
    LOG.warn("Redis pipeline command failed", e);
  }
  failed++;
}

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

Replace TestCaseRepository's inline retry loop with a reusable
DeadlockRetry helper keyed to the transaction boundary. Retries live in
resilience4j so backoff runs on a scheduled executor instead of
Thread.sleep blocking the request thread. Exponential base 50 ms ×
2^(attempt-1) with 50% jitter over 4 attempts.

DeadlockRetry must wrap a @Transaction-annotated call so each retry
replays the whole unit of work in a fresh JDBI transaction — a per-DAO
retry would leave earlier writes in the rolled-back txn lost.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 18, 2026 17:28
@gitar-bot
Copy link
Copy Markdown

gitar-bot bot commented Apr 18, 2026

Code Review 👍 Approved with suggestions 6 resolved / 7 findings

Refactors Redis cache metrics and indexing logic to resolve database compatibility, thread blocking, and race condition issues. Update the awaitAll implementation to log pipeline error root causes.

💡 Quality: Pipeline errors swallowed without logging root cause

📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:286-288 📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:290-295

In awaitAll, when f.get() throws a non-InterruptedException (line 286-288), the exception is silently counted but not logged. When the method later throws IllegalStateException with the count summary, operators have no way to diagnose why individual commands failed (e.g., NOSCRIPT, OOM, connection reset). Consider logging the first exception or attaching it as a suppressed exception to the thrown ISE.

Suggested fix
} catch (Exception e) {
  if (failed == 0) {
    LOG.warn("Redis pipeline command failed", e);
  }
  failed++;
}
✅ 6 resolved
Bug: listIdFqnByPrefixHash uses MySQL-only JSON syntax, breaks PostgreSQL

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java:160-167
The new @SqlQuery for listIdFqnByPrefixHash uses JSON_UNQUOTE(JSON_EXTRACT(json, '$.fullyQualifiedName')) which is MySQL-specific. PostgreSQL requires json->>'fullyQualifiedName' syntax. Other methods in the same interface (e.g. updateWithVersion at line 137) correctly use @ConnectionAwareSqlUpdate / @ConnectionAwareSqlQuery with separate MySQL and PostgreSQL variants. This query will fail at runtime for any PostgreSQL deployment that triggers a domain or data-product rename cascade.

Performance: waitForConcurrentLoad busy-polls with Thread.sleep on request thread

📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedReadBundle.java:92-106
CachedReadBundle.waitForConcurrentLoad (lines 92-107) blocks the calling Jetty request thread with a Thread.sleep polling loop for up to loadLockWaitMs (default 200ms). Under cold-cache scenarios with many distinct entities, each concurrent request blocks a thread pool slot. With the default Jetty thread pool (typically 200 threads), a burst of ~200 simultaneous cold-cache reads for different entities would exhaust the thread pool for 200ms, causing request queuing.

The 200ms default mitigates the blast radius, but this is still a blocking wait on a request thread. Consider falling through to DB immediately instead of polling (i.e., skip the wait entirely and just let multiple callers load in parallel — the Redis write is idempotent), or use a local Striped/CountDownLatch so the wait doesn't require Redis round-trips.

Edge Case: buildBundleDto returning null leaves single-flight waiters spinning

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java:1656-1670 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java:1714
When the load-lock holder's buildBundleDto returns null (line 1714 — all relations empty, no tags, no certification), it never populates the cache, but waiters from waitForConcurrentLoad are still polling for up to loadLockWaitMs. They'll all time out and fall through to their own DB loads. While this is safe (they fall back to DB), it defeats the single-flight purpose for entities with no cacheable bundle data. These entities will always trigger the lock + wait + timeout cycle on every cold read.

Consider writing a sentinel/empty-marker entry when buildBundleDto returns null so waiters get a fast "nothing to cache" signal, or skip tryAcquireLoadLock entirely for entity types known to have empty bundles.

Bug: CacheInvalidationPubSub duplicates RedisURI parsing logic

📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheInvalidationPubSub.java:136-150 📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:49-63
CacheInvalidationPubSub.buildRedisURI (lines 136-165) duplicates the same URL parsing and auth logic from RedisCacheProvider.buildRedisURI (lines 49-84). This creates a maintenance risk — if one is updated (e.g., to support Redis Sentinel/Cluster or a new auth type), the other may be missed. Consider extracting a shared utility method, or having the pub/sub reuse the existing RedisClient from CacheProvider.

Edge Case: pipelineHset timeout may be excessive for large batches

📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:311-315
In awaitAll (line 312), the timeout is commandTimeoutMs * 10 applied per-future sequentially. For a batch of 1000 HSET+EXPIRE pairs (2000 futures), the theoretical worst case is 2000 × commandTimeoutMs × 10 = 600 seconds with the default 300ms timeout. In practice Redis pipelines complete fast, but a hung connection could block the warmup thread for a very long time. Consider using LettuceFutures.awaitAll(timeout, futures) which applies a single timeout to the entire batch.

...and 1 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Refactors Redis cache metrics and indexing logic to resolve database compatibility, thread blocking, and race condition issues. Update the `awaitAll` implementation to log pipeline error root causes.

1. 💡 Quality: Pipeline errors swallowed without logging root cause
   Files: openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:286-288, openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:290-295

   In `awaitAll`, when `f.get()` throws a non-InterruptedException (line 286-288), the exception is silently counted but not logged. When the method later throws `IllegalStateException` with the count summary, operators have no way to diagnose *why* individual commands failed (e.g., NOSCRIPT, OOM, connection reset). Consider logging the first exception or attaching it as a suppressed exception to the thrown ISE.

   Suggested fix:
   } catch (Exception e) {
     if (failed == 0) {
       LOG.warn("Redis pipeline command failed", e);
     }
     failed++;
   }

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated 5 comments.

Comment on lines 240 to +246
} catch (Exception e) {
LOG.debug("Error warming up entity {} {}: {}", entityType, entity.getId(), e.getMessage());
failedCount++;
LOG.warn("Pipelined write failed for {} batch at offset {}", entityType, offset, e);
}
offset += page.size();
updateEntityStats(entityType, success, failed);
sendUpdates(jobExecutionContext, false);
if (page.size() < batchSize) break;
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success/failed are cumulative counters across all pages, but updateEntityStats(...) treats its arguments as deltas and adds them to the stored totals. Calling updateEntityStats(entityType, success, failed) each loop will over-count (1st page adds N, 2nd page adds 2N, etc.). Track per-page counts (reset to 0 each iteration) or pass (pageSuccess, pageFailed) instead.

Copilot uses AI. Check for mistakes.
Comment on lines +356 to 364
/**
* REST GETs consult the entity cache only when a distributed cache (Redis) is configured. With
* Redis, invalidation in {@code EntityRepository.invalidateCache} keeps all instances coherent so
* cached reads stay fresh. Without Redis we fall back to {@code fromCache=false} to avoid
* serving stale reads from a per-instance Guava cache in a multi-instance deployment.
*/
private static boolean isDistributedCacheEnabled() {
return CacheBundle.getCachedEntityDao() != null;
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDistributedCacheEnabled() checks CacheBundle.getCachedEntityDao() != null, which can be true even when Redis initialization failed or the cache is unhealthy. In that case, REST GETs will still consult per-process caches and can serve stale data in multi-instance setups. Consider gating this on CacheBundle.getCacheProvider().available() (and/or CacheBundle.getCacheConfig().provider == redis) so cached reads are only enabled when the distributed cache is actually usable.

Copilot uses AI. Check for mistakes.
Comment on lines +30 to +32
* <p>Backoff: resilience4j schedules the delay on its own executor, so the request thread yields
* instead of blocking on {@link Thread#sleep}. Exponential base 50 ms × 2^(attempt-1) with 50%
* jitter — attempt 1 ≈ 25-75 ms, attempt 2 ≈ 50-150 ms, attempt 3 ≈ 100-300 ms.
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc states resilience4j “schedules the delay on its own executor, so the request thread yields instead of blocking on Thread.sleep”, but Retry.executeSupplier(...) performs synchronous retries and (in resilience4j-retry 2.3.0) waits by sleeping the calling thread. Please adjust the comment to reflect the actual behavior, or switch to an async retry API if non-blocking backoff is required.

Suggested change
* <p>Backoff: resilience4j schedules the delay on its own executor, so the request thread yields
* instead of blocking on {@link Thread#sleep}. Exponential base 50 ms × 2^(attempt-1) with 50%
* jitterattempt 125-75 ms, attempt 250-150 ms, attempt 3100-300 ms.
* <p>Backoff: retries are synchronous when invoked via {@link Retry#executeSupplier(Supplier)}, so
* the calling thread waits between attempts according to the configured interval. Exponential base
* 50 ms × 2^(attempt-1) with 50% jitterattempt 125-75 ms, attempt 250-150 ms, attempt
* 3100-300 ms.

Copilot uses AI. Check for mistakes.
Comment on lines +1610 to +1612
try (var ignored = phase("readBundleCacheGet")) {
initialDto = bundleCache.get(entityType, entity.getId());
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadLock is only released in the finally block at the end of the method, but the lock is acquired much earlier. If any exception is thrown during relationship/tag/vote/extension fetch (before the final try/finally), the lock will never be unlocked, causing subsequent requests for the same (type,id) on this instance to deadlock. Wrap the whole post-lock code path in a try/finally that always unlocks, or unlock immediately if the re-check finds the bundle already populated.

Suggested change
try (var ignored = phase("readBundleCacheGet")) {
initialDto = bundleCache.get(entityType, entity.getId());
}
try {
try (var ignored = phase("readBundleCacheGet")) {
initialDto = bundleCache.get(entityType, entity.getId());
}
} catch (RuntimeException e) {
loadLock.unlock();
loadLock = null;
throw e;
} catch (Error e) {
loadLock.unlock();
loadLock = null;
throw e;
}
if (initialDto != null) {
loadLock.unlock();
loadLock = null;
}

Copilot uses AI. Check for mistakes.
Comment on lines 1022 to +1056
@Transaction
public RestUtil.PutResponse<TestSuite> addAllTestCasesToLogicalTestSuite(
TestSuite testSuite, List<UUID> excludedTestCaseIds) {

List<EntityReference> originalTestCaseReferences =
findTo(testSuite.getId(), TEST_SUITE, Relationship.CONTAINS, TEST_CASE);

String tableName = daoCollection.testCaseDAO().getTableName();
if (nullOrEmpty(excludedTestCaseIds)) {
daoCollection
.relationshipDAO()
.bulkInsertAllToRelationship(
testSuite.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS.ordinal(), tableName);
} else {
daoCollection
.relationshipDAO()
.bulkInsertAllToRelationshipWithExclusions(
excludedTestCaseIds.stream().map(UUID::toString).toList(),
testSuite.getId(),
TEST_SUITE,
TEST_CASE,
Relationship.CONTAINS.ordinal(),
tableName);
}
// The bulk INSERT IGNORE runs a full scan against test_case and takes gap locks that
// collide with concurrent test-case creation. MySQL raises "Deadlock found when trying
// to get lock" intermittently under IT parallel load. Retry a few times before giving up.
DeadlockRetry.execute(
() -> {
if (nullOrEmpty(excludedTestCaseIds)) {
daoCollection
.relationshipDAO()
.bulkInsertAllToRelationship(
testSuite.getId(),
TEST_SUITE,
TEST_CASE,
Relationship.CONTAINS.ordinal(),
tableName);
} else {
daoCollection
.relationshipDAO()
.bulkInsertAllToRelationshipWithExclusions(
excludedTestCaseIds.stream().map(UUID::toString).toList(),
testSuite.getId(),
TEST_SUITE,
TEST_CASE,
Relationship.CONTAINS.ordinal(),
tableName);
}
return null;
});
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeadlockRetry.execute(...) is invoked from within a method annotated with @Transaction. That means each retry re-runs only the lambda inside the same JDBI transaction scope, so it won't start a fresh transaction per attempt (and may run on an already-rolled-back handle after a deadlock). To match the intent (retry the whole transaction), move the retry wrapper outside the @Transaction boundary (e.g., public wrapper calls DeadlockRetry.execute(() -> addAllTestCasesToLogicalTestSuiteTxn(...)) where the inner method is @Transaction).

Copilot uses AI. Check for mistakes.
@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants