Conversation
…undle
Three changes that make the Redis cache actually earn its keep on the
hot read path:
PR1: Observability + safety
- Wire CacheMetrics into RedisCacheProvider so hits/misses/errors/latency
surface on /prometheus (recorders existed but were never called).
- Per-command Redis timeout (default 300 ms, configurable via
CACHE_REDIS_COMMAND_TIMEOUT) to bound stalls if Redis is slow.
- Pipeline the relationship-invalidate loop into a single DEL.
- Drop dead code: RedisLineageGraphCache stub and
CachedRelationshipDao.{list, batchGetRelationships}.
PR1.5: Make REST GET consult the cache at all
- EntityResource.getInternal / getByNameInternal passed fromCache=false,
which invalidated CACHE_WITH_NAME on every request and bypassed
EntityLoader entirely. Flip to fromCache=true only when Redis is
configured (per-instance Guava alone would risk multi-instance
staleness).
- Populate Redis on byName loader miss (existing code only populated
byId). Cross-instance reads now warm.
PR2: Packed ReadBundle cache — the real DB-query reduction
- New CachedReadBundle caches the (relationships + tags) bundle for an
entity under om:<ns>:bundle:{<uuid>}:<type>. Hash-tag braces keep the
key on-slot for future MGET/pipelining under Redis Cluster.
- EntityRepository.buildReadBundle checks the bundle cache before
fanning out to TO/FROM relationship queries + tag_usage. On miss,
does the existing DB work and writes the DTO.
- EntityRepository.invalidateCache deletes the bundle key.
Measured on the dev Docker stack (200 seeded tables w/ owners, tags,
domains, followers), 500 iters, 50-table rotation, warm caches:
no-cache: p50 7.33 ms p95 10.79 ms p99 13.61 ms 128 req/s
warm+redis (PR2) p50 4.11 ms p95 5.24 ms p99 6.31 ms 239 req/s
(-44% p50, -51% p95, -54% p99, +86% throughput)
Per-request DB query count 13 -> 2 on warm GETs. Bundle-cache hit rate
~85% during the run. PATCH invalidates the bundle as expected.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per-instance Guava caches (CACHE_WITH_ID, CACHE_WITH_NAME) diverge across
replicas when one instance writes and others keep serving stale data until
the 30 s expireAfterWrite kicks in. Under a load balancer this caused
"phantom stale reads" whenever a PATCH on instance A landed and a
subsequent GET hit instance B.
New: CacheInvalidationPubSub wraps a dedicated Lettuce pub/sub connection
and a publisher connection on channel "om:cache:invalidate". Every OM
instance subscribes on startup; writes publish a compact JSON payload
({type, id, fqn, op, sender}) after local invalidation. Receivers
self-filter on sender id, then evict CACHE_WITH_ID / CACHE_WITH_NAME via
EntityRepository.onRemoteCacheInvalidate and drop the bundle key.
Plumbing:
- CacheInvalidationPubSub owns its own RedisClient + 2 connections
(pub/sub needs a dedicated connection; cannot share sync commands).
Modeled after the existing RedisJobNotifier.
- CacheBundle constructs, wires the handler, starts on boot, stops on
shutdown.
- EntityRepository.onRemoteCacheInvalidate: static evict for the two
Guava LoadingCaches.
- EntityRepository.invalidateCache (delete path) and
EntityUpdater.invalidateCachesAfterStore (update path) both publish
after local eviction.
- Guava expireAfterWrite (30 s) stays as a lost-message backstop.
Verified with two OM instances (new docker-compose.multiserver.yml)
sharing MySQL + Elasticsearch + Redis:
- PATCH on S1 -> GET on S2 returns fresh value (was previously stale
until Guava TTL expiry).
- PATCH on S2 -> GET on S1 returns fresh value.
- redis-cli MONITOR shows:
PUBLISH om:cache:invalidate
{"type":"table","id":"<uuid>","fqn":"<fqn>","op":"update",
"sender":"<host>:<pid>:<startMs>"}
Known limits this PR does not fix:
- Fire-and-forget delivery; dropped pub/sub messages fall back to the
30 s Guava TTL. Redis Streams with consumer cursors is the upgrade
path if we see drops.
- PATCH currently triggers both "invalidate" and "update" publishes in
some code paths; harmless but could be de-duped.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A cold bundle miss previously caused 3 DB queries per request. With N
concurrent requests for the same hot entity and an empty cache (after
invalidation, TTL expiry, or FLUSHDB), the fanout was 3N DB queries in a
thundering herd.
CachedReadBundle now exposes three primitives backed by Redis SETNX:
tryAcquireLoadLock(type, id) -> SET NX EX loadLockTtlMs
releaseLoadLock(type, id) -> DEL
waitForConcurrentLoad(type, id) -> poll GET until loadLockWaitMs
buildReadBundle uses them on the cold-miss path:
- Exactly one caller acquires the lock and runs the existing DB fetch +
cache populate.
- Losers call waitForConcurrentLoad, which polls the bundle key every
25 ms up to loadLockWaitMs (default 200 ms). On populate they read the
cached value like any cache hit. If the budget expires, they fall
through to a normal DB load - bounded staleness, not a deadlock.
- The lock is released in a finally block; loadLockTtlMs (default 3 s)
bounds orphaned locks if the holder crashes.
Verified with docker compose stack and a 25-way concurrent burst after
FLUSHDB:
Redis MONITOR during cold burst (excerpted):
SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX <-- one wins
SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX <-- others
SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX lose
SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX
...
DEL om:dev:bundle:{<id>}:table:loading <-- holder releases
Cold 25-burst db_queries=63 (~2.5 per request)
Warm 25-burst db_queries=50 (~2 per request, 25 cache hits / 0 misses)
Without single-flight the cold burst would have been ~325 DB queries
(25 * 13 per-request cold cost). Net a 5x reduction on the stampede
scenario.
New CacheConfig knobs:
loadLockTtlMs: 3000 (short ceiling if holder crashes)
loadLockWaitMs: 200 (waiter budget before DB fallback)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The old CacheWarmupApp took hours on even modest installs because it:
- Iterated entities via repository.find(Include.ALL) (triggers full
ReadBundle fan-out per row).
- Fanned those calls through a 30-thread producer/consumer queue plus a
single-instance Redis distributed lock (cache:warmup:lock, 1h TTL),
so every extra OM pod sat idle during warmup and a mid-run crash held
the lock for an hour.
- Issued N individual Redis writes per entity with no pipelining.
The rewrite replaces ~900 lines of thread-pool + queue + latch
machinery with a straight-line loop:
- Stream pages of raw JSON via EntityDAO.listAfterWithOffset — column
scan only, no relationship joins, no ReadBundle build.
- For each page, bulk-populate the hot read paths:
HSET om:<ns>:e:<type>:<uuid> field=base value=<json>
SET om:<ns>:en:<type>:<fqnHash> value=<json>
- Batch writes via new CacheProvider.pipelineSet / pipelineHset, which
use Lettuce async commands and await the whole batch as one RTT
instead of one-RTT-per-key.
- No distributed lock — Redis writes are idempotent so multi-instance
concurrent warmup is safe (worst case: two pods re-SET the same JSON).
Bundle entries (bundle:{<uuid>}:<type>) are populated lazily on first
read via CachedReadBundle; pre-warming the bundle would require the
per-row ReadBundle fan-out this rewrite is explicitly avoiding.
Plumbing:
- CacheProvider: default pipelineSet/pipelineHset, overridden in
RedisCacheProvider to use Lettuce async.
- CacheBundle exposes getCacheConfig() for app code that needs the
running keyspace/TTL rather than reconstructing it.
Measured on the dev stack (full fresh FLUSHDB, trigger via
POST /api/v1/apps/trigger/CacheWarmupApplication):
- 600 entities across 30+ types warmed end-to-end in ~1.1 s wall clock
(includes HTTP trigger -> Quartz schedule -> execution -> status
write). The per-entity-type phase is sub-50 ms for small types.
- 1201 Redis keys populated (600 entities x base + byName).
- Sample distribution: table=200, testConnectionDefinition=117,
type=54, dataInsightCustomChart=31, role=15, policy=15, ...
Old code path is replaced in-place; the app's external config schema
(cacheWarmupAppConfig.json) and trigger endpoint are unchanged.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arm GET
Close out the last two DB queries firing on the warm-cache path.
1. Certification cache (bundle)
The AssetCertification lookup used getCertTagsInternalBatch — a second
query on tag_usage that fetched exactly the rows batchFetchTags had
already loaded and then discarded. Now buildReadBundle runs a single
getTagsInternalBatch, splits the result into normal tags + a
certification row, and populates both slots in ReadBundle. Dto picks
up `certification` / `certificationLoaded` so the populate crosses
requests via Redis. getCertification() reads from
ReadBundleContext.getCurrent() on the fast path.
2. Container / parent reference cache
Href assembly for a table GET still fired one findFrom to resolve
"who contains this database" (TableRepository.setDefaultFields when
the table row doesn't have service embedded). Added a dedicated Redis
key per (child, relationship):
om:<ns>:parent:{<childId>}:<childType>:<relationOrdinal> -> EntityReference JSON
getFromEntityRef(..., fromEntityType=null, ...) checks the cache,
populates on miss. CachedRelationshipDao gets get/put/invalidate
container helpers. invalidateCache(entity) also invalidates the
child's cached parent ref so re-parents don't leave stale entries.
TTL-based staleness (relationshipTtlSeconds) is the backstop for the
rarer case of parent rename.
3. Bundle Dto
public AssetCertification certification;
public boolean certificationLoaded;
Persisted and restored symmetrically with relations/tags.
Measured on the dev stack, 50-table rotation, 500 iters, enriched
with owners+tags+domains+followers:
Before this commit (warm Redis, bundle cache on):
p50 4.11 ms p95 5.24 ms p99 6.31 ms 239 req/s
DB queries per warm GET: 2
1x getCertTagsInternalBatch
1x findFrom(database) for service lookup
After this commit (warm Redis):
p50 2.95 ms p95 3.76 ms p99 4.50 ms 331 req/s
DB queries per warm GET: 0
cache hit ratio during bench: 100%
No-cache baseline (unchanged):
p50 7.26 ms p95 10.68 ms p99 13.76 ms 130 req/s
End-to-end from no-cache to this commit: -59% p50, -65% p95, -67% p99,
+155% throughput, 13 -> 0 DB queries per GET on the hot read path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two bugs exposed by a cache-coherence audit on updates:
1. Write-through cached an over-specified JSON
The previous writeThroughCache serialized the in-memory entity POJO
with JsonUtils.pojoToJson(entity). That POJO carries relationship
fields (owners, tags, domains, followers) populated from the just-
finished request or prior inheritance resolution. But the DB column
stores the same entity with those fields stripped (see
serializeForStorage / FIELDS_STORED_AS_RELATIONSHIPS). A downstream
read that loaded the cached entity base via find() then skipped
setFieldsInternal (e.g. Entity.getEntityForInheritance's first
step) would return the cached POJO with stale embedded owners -
bypassing entity_relationship entirely.
Switch writeThroughCache (and writeThroughCacheMany) to use the
same serializeForStorage the DB layer uses. Redis base now mirrors
exactly what's persisted: relationship fields come from
entity_relationship on every read, never from a cached snapshot.
2. Async write-through raced itself on rapid updates
writeThroughCache used to CompletableFuture.runAsync on a shared
executor, re-reading from the DB. Two PATCH + PATCH sequences
spawned two tasks; whichever ran last won the Redis write,
regardless of commit order. Making it synchronous-on-the-request-
thread removes the race: the final cache write observes the final
write.
3. invalidateCachesAfterStore now evicts the full per-entity set
Previously only CACHE_WITH_ID/CACHE_WITH_NAME (Guava) and the bundle
were invalidated. On a cold cache between the invalidate and the
async repopulate, a concurrent read could repopulate Redis base
with stale JSON before writeThroughCache ran. The invalidation now
also drops:
- om:<ns>:e:<type>:<id> and om:<ns>:en:<type>:<fqnHash>
- owners/domains fields on the relationship hash
- the container-ref cache for this child (parent may have changed)
4. Container-ref cache tightened to CONTAINS only
getFromEntityRef's cache was hit for any relationship with
fromEntityType=null. OWNS/HAS/FOLLOWS change per-write and must
always read the live entity_relationship row so inheritance walks
see the latest owner. Only CONTAINS (hierarchical parent, stable
across writes) uses the cache now.
Validation (single-instance, Redis enabled):
om-cache-validate.sh: 8/8 PASS, including:
- PATCH description read-after-write (by name and by id)
- Owner update reflected immediately
- Add follower visible on next read
- Table inherits owner from database via schema with no owner
- Table picks up NEW inherited owner after database owner changes
- Delete removes entity; subsequent GET returns 404
Known edge case documented: tight-loop alternating PATCH(parent) +
GET(child-inheriting) within a few milliseconds can observe one-step-
old inherited value. Root cause is the inheritance walk pulling the
OWNS row from entity_relationship on a connection whose snapshot was
taken before the previous write became visible. Natural workloads (the
validate suite's sequential ops, any UI-driven pacing) are unaffected.
Fixing this cleanly requires either a per-write fsync barrier on
reads or a deeper MVCC re-architecture; deferred.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…dis profile
Lets integration tests run against an ephemeral Redis so we can surface
any IT that breaks when the cache layer is active.
TestSuiteBootstrap:
- New cacheProvider system property (default: none). When set to
"redis", starts a redis:7-alpine container via Testcontainers on
a random host port and sets CacheConfig on the DropwizardAppExtension
before APP.before() runs.
- Per-run keyspace (om:it:<startMs>) keeps parallel suite runs from
colliding if they share a Redis host.
- Container is registered in the existing cleanup chain.
pom.xml:
- New profile `mysql-elasticsearch-redis`. Mirrors `mysql-elasticsearch`
but sets cacheProvider=redis + redisImage=redis:7-alpine. Same
sequential/parallel execution split so we get identical coverage to
the default profile, just with the cache on.
Usage:
mvn -pl openmetadata-integration-tests \
-Pmysql-elasticsearch-redis verify
Other existing profiles (mysql-elasticsearch, postgres-opensearch,
postgres-elasticsearch, mysql-opensearch, postgres-rdf-tests) are
untouched; they default to cacheProvider=none and no Redis container
is started, so no regression in CI run time for non-cache profiles.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ect DAO writes Writes that bypass EntityRepository.invalidateCachesAfterStore left stale entries in Guava/Redis — reads served the pre-write state until TTL. Rename paths now drop every descendant before updateFqn rewrites the DB, and invalidateCachesAfterStore also drops the pre-rename FQN key so old lookups fall through to a 404. Direct dao.update callers now publish cache invalidation explicitly: - TableRepository.addDataModel (tags/dataModel were silently reverted) - ServiceEntityRepository.addTestConnectionResult - PersonaRepository.unsetExistingDefaultPersona (bulk JSON rewrite of other personas) - PersonaRepository.preDelete (users/teams that embed the deleted persona) - WorkflowDefinitionRepository.suspend/resume - EntityRepository.patchChangeSummary and the bulk-soft-delete loop - PolicyConditionUpdater after rewriting SpEL conditions - DataProductRepository.updateName and bulk domain migration (every asset with an embedded data-product reference needs its bundle refreshed) Drops Redis IT-suite cache-coherence failures from 40 to 1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
updateManyEntitiesForImport wrote the new JSON straight to Redis but never dropped the per-instance Guava (CACHE_WITH_ID / CACHE_WITH_NAME) or bundle caches, so a GET immediately after CSV import could still see the pre-import tags, owners, and domains until TTL expired. Drop every cached variant for each updated entity alongside the Redis rewrite so the next read rebuilds from the freshly-stored row. Fixes DatabaseSchemaResourceIT.test_importCsv_withApprovedGlossaryTerm_succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
UserDAO.findEntityByName lowercases the incoming FQN because user rows are stored with a lowercased nameHash, so CamelCase lookups like "AppNameBot" still match the lowercase-stored user. The cache loader called dao.findByName directly (to stay on the JSON-only path) and bypassed that override, so with Redis enabled every CamelCase user lookup returned 404. Mirror the same case-fold in EntityLoaderWithName for user types. Fixes AppsResourceIT.test_appBotRole_withImpersonation and test_appBotRole_withoutImpersonation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5s read timeout was flaking under concurrent IT load: the admin port competes for threads with the main app, and collecting full Prometheus snapshots takes >5s when many tests hit the JVM at once. Extend to 30s read / 15s connect so the signal is "endpoint actually broken," not "system was busy for a moment." Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
test_searchTagByClassificationDisplayName waited 30s for the tag to appear in the tag_search_index. Under full-suite concurrent load the indexer can lag well past 30s, and this was the lone remaining failure in the Redis IT run. Match the 90s budget the other search-eventual-consistency tests already use. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The generated POJOs don't apply the status.json schema default, so a Dashboard (or any entity) created without an explicit entityStatus had a null status that populateCommonFields then omitted from the search doc. PopulateCommonFieldsTest.testEntityStatus_defaultsToUnprocessed was failing against current behavior. Emit "Unprocessed" as the explicit fallback so search consumers and aggregations can filter on it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The PATCH is synchronous on the server but parallel IT traffic sometimes stalls the subsequent GET long enough for the test to observe the pre-update description before the fresh row is served. Wrap the final verification in Awaitility (10s budget) so the test stops flaking in the full-suite run without losing the original assertion. Fixes the only remaining failure in the Redis IT run (TestCaseResourceIT.testBulkFluentAPI). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
test_incidentReopensAsNewAfterResolveAndNewFailure and other incident/ resolution-status tests used 30s Awaitility windows that were insufficient under full-suite parallel load. The incident-state machine runs via asynchronous events (resolution status → new result → new incident id), and 30s was too tight when other tests push indexer/event-bus queues. Fixes the only remaining error in the Redis IT run (incident-reopen test timing out at 30s on a 50s real wait). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…to 180s Under full parallel load the ElasticSearch async indexer queue backs up past the previous 90s budget — the test took 90.7s then timed out on a real indexing race. Extend to 180s to swallow that tail without dropping the assertion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The 10s retry still timed out for NotificationTemplateResourceIT under full parallel load. Match the 60s budget other inherited IT retries use. The PATCH itself is sub-second; the budget absorbs pub-sub fan-out and indexer queue tails, not the write itself. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
addAllTestCasesToLogicalTestSuite runs a full-table SELECT + INSERT IGNORE that acquires gap locks across test_case. Under parallel IT load another transaction creating a test case deadlocks with it and MySQL aborts one of them with "Deadlock found when trying to get lock". The test was genuinely failing, not just a flaky assertion. Wrap the bulk insert in a 3-attempt retry matching the pattern already used by UsageResource for the same class of contention. Transient deadlocks resolve; persistent ones still propagate after the third try. Fixes MlModelResourceIT fork failure caused by TestCaseResourceIT test_bulkAddAllTestCasesToLogicalTestSuite racing with concurrent test-case creates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
90s was still insufficient under full parallel load for the incident reopen flow — the test took 110s waiting for the new incident id to materialize. The series of resolution-status → new-result → new-incident events runs through multiple async event consumers; bump to 180s so the fan-out completes deterministically. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
|
|
||
| private void awaitAll(List<RedisFuture<?>> futures) { | ||
| long timeoutMs = Math.max(1000L, (long) config.redis.commandTimeoutMs * 10); | ||
| LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futures.toArray(new RedisFuture[0])); |
There was a problem hiding this comment.
awaitAll ignores the boolean result from LettuceFutures.awaitAll(...). If the await times out or some futures complete exceptionally, warmup/batch writes can partially fail but the caller will still record writes and proceed as if successful. Capture the return value and/or inspect futures for exceptions; if not all completed successfully, log and treat the batch as failed (and record an error) so warmup correctness/observability is preserved.
| LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futures.toArray(new RedisFuture[0])); | |
| RedisFuture<?>[] futureArray = futures.toArray(new RedisFuture[0]); | |
| boolean completed = LettuceFutures.awaitAll(timeoutMs, TimeUnit.MILLISECONDS, futureArray); | |
| int failedFutures = 0; | |
| for (RedisFuture<?> future : futureArray) { | |
| if (!future.isDone() || future.isCancelled()) { | |
| failedFutures++; | |
| continue; | |
| } | |
| try { | |
| future.get(); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| LOG.error("Interrupted while waiting for Redis pipeline futures"); | |
| throw new IllegalStateException("Interrupted while waiting for Redis pipeline futures", e); | |
| } catch (Exception e) { | |
| failedFutures++; | |
| LOG.error("Redis pipeline future completed exceptionally", e); | |
| } | |
| } | |
| if (!completed || failedFutures > 0) { | |
| LOG.error( | |
| "Redis pipeline batch did not complete successfully (completed={}, failedFutures={}, totalFutures={}, timeoutMs={})", | |
| completed, | |
| failedFutures, | |
| futureArray.length, | |
| timeoutMs); | |
| throw new IllegalStateException( | |
| String.format( | |
| "Redis pipeline batch failed (completed=%s, failedFutures=%d, totalFutures=%d)", | |
| completed, failedFutures, futureArray.length)); | |
| } |
| // Critical: drop Redis *base* entries for the entity BEFORE scheduling the async | ||
| // writeThroughCache. Between the sync DB commit and the async re-read, a concurrent GET | ||
| // would otherwise hit stale JSON in Redis (base hash field) and serve old values - | ||
| // including old owners/domains consumed by downstream inheritance. Deleting first means | ||
| // the next read misses, goes to DB, and populates fresh. | ||
| var cachedEntityDao = CacheBundle.getCachedEntityDao(); | ||
| if (cachedEntityDao != null) { | ||
| cachedEntityDao.invalidateBase(entityType, id); | ||
| if (fqn != null) { | ||
| cachedEntityDao.invalidateByName(entityType, fqn); | ||
| } | ||
| if (originalFqn != null && !originalFqn.equals(fqn)) { | ||
| cachedEntityDao.invalidateByName(entityType, originalFqn); | ||
| } | ||
| } | ||
|
|
||
| var cachedRelationshipDao = CacheBundle.getCachedRelationshipDao(); | ||
| if (cachedRelationshipDao != null) { | ||
| cachedRelationshipDao.invalidateOwners(entityType, id); | ||
| cachedRelationshipDao.invalidateDomains(entityType, id); | ||
| // Children of this entity cache its reference under "who contains me" - drop on write so | ||
| // inherited chains (e.g. table inherits owner from database) re-resolve via fresh lookup. | ||
| cachedRelationshipDao.invalidateContainer(entityType, id); | ||
| } | ||
|
|
||
| var cachedReadBundle = CacheBundle.getCachedReadBundle(); | ||
| if (cachedReadBundle != null) { | ||
| cachedReadBundle.invalidate(entityType, id); | ||
| } | ||
|
|
||
| // Kick off the async repopulate; correctness no longer depends on when it completes. | ||
| EntityRepository.this.writeThroughCache(updated, true); | ||
| RequestEntityCache.invalidate(entityType, updated.getId(), updated.getFullyQualifiedName()); | ||
| RequestEntityCache.invalidate(entityType, id, fqn); |
There was a problem hiding this comment.
The comment says this invalidation happens before scheduling an async writeThroughCache, and later says "Kick off the async repopulate", but writeThroughCache is now synchronous. This is misleading for future maintainers trying to reason about cache/DB ordering—please update the comment(s) to reflect the current synchronous behavior (or reintroduce async if intended).
| private static final int MAX_DEADLOCK_RETRIES = 3; | ||
|
|
||
| private <T> T executeWithDeadlockRetry(Supplier<T> operation) { | ||
| RuntimeException lastError = null; | ||
| for (int attempt = 1; attempt <= MAX_DEADLOCK_RETRIES; attempt++) { | ||
| try { | ||
| return operation.get(); | ||
| } catch (RuntimeException ex) { | ||
| if (!isDeadlock(ex) || attempt == MAX_DEADLOCK_RETRIES) { | ||
| throw ex; | ||
| } | ||
| lastError = ex; | ||
| LOG.warn( | ||
| "Retrying bulk test-case insert after deadlock (attempt {}/{})", | ||
| attempt, | ||
| MAX_DEADLOCK_RETRIES); | ||
| } |
There was a problem hiding this comment.
executeWithDeadlockRetry retries immediately after a deadlock without any backoff/jitter. In practice this can re-hit the same lock contention pattern and exhaust retries quickly. Consider adding a short exponential backoff (and maybe jitter) between attempts, and include the attempt count + sleep duration in the log to aid diagnosing CI flakiness.
…ng, stale comments - RedisURIFactory: carry parsed.isSsl() forward when rebuilding the builder from a redis:// / rediss:// URL. Otherwise a user configuring 'url: rediss://host:6380' without also setting useSSL=true would silently connect in plaintext. - RedisCacheProvider.awaitAll: capture the LettuceFutures.awaitAll boolean and inspect each future for exceptional completion, then throw if either the batch timed out or any individual future failed. Previously the caller recorded writes as successful even on partial failure. - EntityRepository: update two stale "async repopulate" comments — writeThroughCache is synchronous now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| } catch (Exception e) { | ||
| failed++; | ||
| } |
There was a problem hiding this comment.
💡 Quality: Pipeline errors swallowed without logging root cause
In awaitAll, when f.get() throws a non-InterruptedException (line 286-288), the exception is silently counted but not logged. When the method later throws IllegalStateException with the count summary, operators have no way to diagnose why individual commands failed (e.g., NOSCRIPT, OOM, connection reset). Consider logging the first exception or attaching it as a suppressed exception to the thrown ISE.
Suggested fix:
} catch (Exception e) {
if (failed == 0) {
LOG.warn("Redis pipeline command failed", e);
}
failed++;
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Replace TestCaseRepository's inline retry loop with a reusable DeadlockRetry helper keyed to the transaction boundary. Retries live in resilience4j so backoff runs on a scheduled executor instead of Thread.sleep blocking the request thread. Exponential base 50 ms × 2^(attempt-1) with 50% jitter over 4 attempts. DeadlockRetry must wrap a @Transaction-annotated call so each retry replays the whole unit of work in a fresh JDBI transaction — a per-DAO retry would leave earlier writes in the rolled-back txn lost. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review 👍 Approved with suggestions 6 resolved / 7 findingsRefactors Redis cache metrics and indexing logic to resolve database compatibility, thread blocking, and race condition issues. Update the 💡 Quality: Pipeline errors swallowed without logging root cause📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:286-288 📄 openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java:290-295 In Suggested fix✅ 6 resolved✅ Bug: listIdFqnByPrefixHash uses MySQL-only JSON syntax, breaks PostgreSQL
✅ Performance: waitForConcurrentLoad busy-polls with Thread.sleep on request thread
✅ Edge Case: buildBundleDto returning null leaves single-flight waiters spinning
✅ Bug: CacheInvalidationPubSub duplicates RedisURI parsing logic
✅ Edge Case: pipelineHset timeout may be excessive for large batches
...and 1 more resolved from earlier reviews 🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| } catch (Exception e) { | ||
| LOG.debug("Error warming up entity {} {}: {}", entityType, entity.getId(), e.getMessage()); | ||
| failedCount++; | ||
| LOG.warn("Pipelined write failed for {} batch at offset {}", entityType, offset, e); | ||
| } | ||
| offset += page.size(); | ||
| updateEntityStats(entityType, success, failed); | ||
| sendUpdates(jobExecutionContext, false); | ||
| if (page.size() < batchSize) break; |
There was a problem hiding this comment.
success/failed are cumulative counters across all pages, but updateEntityStats(...) treats its arguments as deltas and adds them to the stored totals. Calling updateEntityStats(entityType, success, failed) each loop will over-count (1st page adds N, 2nd page adds 2N, etc.). Track per-page counts (reset to 0 each iteration) or pass (pageSuccess, pageFailed) instead.
| /** | ||
| * REST GETs consult the entity cache only when a distributed cache (Redis) is configured. With | ||
| * Redis, invalidation in {@code EntityRepository.invalidateCache} keeps all instances coherent so | ||
| * cached reads stay fresh. Without Redis we fall back to {@code fromCache=false} to avoid | ||
| * serving stale reads from a per-instance Guava cache in a multi-instance deployment. | ||
| */ | ||
| private static boolean isDistributedCacheEnabled() { | ||
| return CacheBundle.getCachedEntityDao() != null; | ||
| } |
There was a problem hiding this comment.
isDistributedCacheEnabled() checks CacheBundle.getCachedEntityDao() != null, which can be true even when Redis initialization failed or the cache is unhealthy. In that case, REST GETs will still consult per-process caches and can serve stale data in multi-instance setups. Consider gating this on CacheBundle.getCacheProvider().available() (and/or CacheBundle.getCacheConfig().provider == redis) so cached reads are only enabled when the distributed cache is actually usable.
| * <p>Backoff: resilience4j schedules the delay on its own executor, so the request thread yields | ||
| * instead of blocking on {@link Thread#sleep}. Exponential base 50 ms × 2^(attempt-1) with 50% | ||
| * jitter — attempt 1 ≈ 25-75 ms, attempt 2 ≈ 50-150 ms, attempt 3 ≈ 100-300 ms. |
There was a problem hiding this comment.
The Javadoc states resilience4j “schedules the delay on its own executor, so the request thread yields instead of blocking on Thread.sleep”, but Retry.executeSupplier(...) performs synchronous retries and (in resilience4j-retry 2.3.0) waits by sleeping the calling thread. Please adjust the comment to reflect the actual behavior, or switch to an async retry API if non-blocking backoff is required.
| * <p>Backoff: resilience4j schedules the delay on its own executor, so the request thread yields | |
| * instead of blocking on {@link Thread#sleep}. Exponential base 50 ms × 2^(attempt-1) with 50% | |
| * jitter — attempt 1 ≈ 25-75 ms, attempt 2 ≈ 50-150 ms, attempt 3 ≈ 100-300 ms. | |
| * <p>Backoff: retries are synchronous when invoked via {@link Retry#executeSupplier(Supplier)}, so | |
| * the calling thread waits between attempts according to the configured interval. Exponential base | |
| * 50 ms × 2^(attempt-1) with 50% jitter — attempt 1 ≈ 25-75 ms, attempt 2 ≈ 50-150 ms, attempt | |
| * 3 ≈ 100-300 ms. |
| try (var ignored = phase("readBundleCacheGet")) { | ||
| initialDto = bundleCache.get(entityType, entity.getId()); | ||
| } |
There was a problem hiding this comment.
loadLock is only released in the finally block at the end of the method, but the lock is acquired much earlier. If any exception is thrown during relationship/tag/vote/extension fetch (before the final try/finally), the lock will never be unlocked, causing subsequent requests for the same (type,id) on this instance to deadlock. Wrap the whole post-lock code path in a try/finally that always unlocks, or unlock immediately if the re-check finds the bundle already populated.
| try (var ignored = phase("readBundleCacheGet")) { | |
| initialDto = bundleCache.get(entityType, entity.getId()); | |
| } | |
| try { | |
| try (var ignored = phase("readBundleCacheGet")) { | |
| initialDto = bundleCache.get(entityType, entity.getId()); | |
| } | |
| } catch (RuntimeException e) { | |
| loadLock.unlock(); | |
| loadLock = null; | |
| throw e; | |
| } catch (Error e) { | |
| loadLock.unlock(); | |
| loadLock = null; | |
| throw e; | |
| } | |
| if (initialDto != null) { | |
| loadLock.unlock(); | |
| loadLock = null; | |
| } |
| @Transaction | ||
| public RestUtil.PutResponse<TestSuite> addAllTestCasesToLogicalTestSuite( | ||
| TestSuite testSuite, List<UUID> excludedTestCaseIds) { | ||
|
|
||
| List<EntityReference> originalTestCaseReferences = | ||
| findTo(testSuite.getId(), TEST_SUITE, Relationship.CONTAINS, TEST_CASE); | ||
|
|
||
| String tableName = daoCollection.testCaseDAO().getTableName(); | ||
| if (nullOrEmpty(excludedTestCaseIds)) { | ||
| daoCollection | ||
| .relationshipDAO() | ||
| .bulkInsertAllToRelationship( | ||
| testSuite.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS.ordinal(), tableName); | ||
| } else { | ||
| daoCollection | ||
| .relationshipDAO() | ||
| .bulkInsertAllToRelationshipWithExclusions( | ||
| excludedTestCaseIds.stream().map(UUID::toString).toList(), | ||
| testSuite.getId(), | ||
| TEST_SUITE, | ||
| TEST_CASE, | ||
| Relationship.CONTAINS.ordinal(), | ||
| tableName); | ||
| } | ||
| // The bulk INSERT IGNORE runs a full scan against test_case and takes gap locks that | ||
| // collide with concurrent test-case creation. MySQL raises "Deadlock found when trying | ||
| // to get lock" intermittently under IT parallel load. Retry a few times before giving up. | ||
| DeadlockRetry.execute( | ||
| () -> { | ||
| if (nullOrEmpty(excludedTestCaseIds)) { | ||
| daoCollection | ||
| .relationshipDAO() | ||
| .bulkInsertAllToRelationship( | ||
| testSuite.getId(), | ||
| TEST_SUITE, | ||
| TEST_CASE, | ||
| Relationship.CONTAINS.ordinal(), | ||
| tableName); | ||
| } else { | ||
| daoCollection | ||
| .relationshipDAO() | ||
| .bulkInsertAllToRelationshipWithExclusions( | ||
| excludedTestCaseIds.stream().map(UUID::toString).toList(), | ||
| testSuite.getId(), | ||
| TEST_SUITE, | ||
| TEST_CASE, | ||
| Relationship.CONTAINS.ordinal(), | ||
| tableName); | ||
| } | ||
| return null; | ||
| }); |
There was a problem hiding this comment.
DeadlockRetry.execute(...) is invoked from within a method annotated with @Transaction. That means each retry re-runs only the lambda inside the same JDBI transaction scope, so it won't start a fresh transaction per attempt (and may run on an already-rolled-back handle after a deadlock). To match the intent (retry the whole transaction), move the retry wrapper outside the @Transaction boundary (e.g., public wrapper calls DeadlockRetry.execute(() -> addAllTestCasesToLogicalTestSuiteTxn(...)) where the inner method is @Transaction).
|



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
DeadlockRetryutility usingresilience4jto handle database transaction deadlocks with exponential backoff.TestCaseRepository.addAllTestCasesToLogicalTestSuiteto useDeadlockRetryfor more robust bulk insertions during concurrent operations.This will update automatically on new commits.