Skip to content

Commit b20c649

Browse files
ryan-williamsclaude
andcommitted
\dvx run\ now caches output blobs to local DVC cache
Previously, \`dvx run\` updated the \`.dvc\` file with new output hashes but did NOT copy output blobs into the local cache. Historical versions were lost as soon as the next run overwrote the working copy — \`dvx checkout -R HEAD^\` would fail with "cache missing". Added \`cache_blob()\` in \`dvx.cache\` (public wrapper over existing \`_cache_file()\`) and call it in executor after \`compute_md5(out)\` for both regular outputs and co-outputs. Idempotent — no-op if the blob is already cached. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b3ffe4d commit b20c649

File tree

4 files changed

+149
-0
lines changed

4 files changed

+149
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# `dvx run` should add outputs to local cache
2+
3+
## Problem
4+
5+
After `dvx run` completes a stage that produces an output file, it updates the `.dvc` file with the new output hash but does NOT copy the output blob into the local DVX cache (`.dvc/cache/files/md5/...`).
6+
7+
Consequence: the output exists only in the working tree. If it gets overwritten by a subsequent stage (e.g. a daily pipeline overwriting `crash-log.parquet`), the previous hash's content is lost locally. Tools that look up historical versions by hash (e.g. `dxdc -R <commit>`, `dvx checkout`) fail with "cache missing".
8+
9+
## Reproduction
10+
11+
```bash
12+
# Initial state
13+
dvx run njsp/data/crash-log.parquet.dvc # produces hash A
14+
git commit -am "crash log A"
15+
16+
# Cache check
17+
ls .dvc/cache/files/md5/${A:0:2}/${A:2} # MISSING — bug
18+
19+
# Next day
20+
dvx run njsp/data/crash-log.parquet.dvc # produces hash B, overwrites file
21+
git commit -am "crash log B"
22+
23+
# Try to look up A
24+
dvx checkout -R HEAD^ njsp/data/crash-log.parquet
25+
# Error: Cache missing for 'HEAD^': dd0a841755c4afe199c3ca3f3853ec11
26+
```
27+
28+
## Compare: `dvx add`
29+
30+
`dvx add` (CLI) calls `add_to_cache()` in `dvx/cache.py`, which copies the file into the cache keyed by MD5. `dvx run` has no equivalent call in its executor path.
31+
32+
## Fix
33+
34+
After a non-side-effect stage produces its output successfully, call `add_to_cache()` on the output path before writing the `.dvc` file. Side-effect stages don't produce outputs, so no change there.
35+
36+
In `executor.py` around line 531-544, after computing `md5 = compute_md5(out)`:
37+
38+
```python
39+
from dvx.cache import add_to_cache
40+
try:
41+
add_to_cache(out, force=True) # idempotent if blob already present
42+
except Exception as e:
43+
self._log(f"{path}: couldn't cache output: {e}")
44+
```
45+
46+
## Related
47+
48+
Also consider: should `dvx run --push` push cached blobs to the remote? Currently `--push` refers to git push, but the S3 blob push is ambiguous. Could be `--push-cache` or similar.

src/dvx/cache.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,35 @@ def _hash_single_file(file_path) -> str:
483483
return md5.hexdigest()
484484

485485

486+
def cache_blob(file_path, md5: str, force: bool = False):
487+
"""Copy a file into the DVC cache keyed by its MD5.
488+
489+
Idempotent: returns immediately if the blob is already cached
490+
(unless force=True).
491+
492+
Args:
493+
file_path: Path to the file to cache
494+
md5: MD5 hash of the file (must be precomputed)
495+
force: Overwrite existing cache entry
496+
497+
Returns:
498+
Path to the cached blob
499+
"""
500+
from pathlib import Path
501+
502+
from dvc.repo import Repo as DVCRepo
503+
504+
try:
505+
root = DVCRepo.find_root()
506+
except Exception:
507+
root = "."
508+
cache_dir = Path(root) / ".dvc" / "cache" / "files" / "md5"
509+
cache_dir.mkdir(parents=True, exist_ok=True)
510+
511+
_cache_file(file_path, md5, cache_dir, force)
512+
return cache_dir / md5[:2] / md5[2:]
513+
514+
486515
def _cache_file(file_path, file_hash: str, cache_dir, force: bool = False):
487516
"""Copy a file to DVC cache atomically."""
488517
import shutil

src/dvx/run/executor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,13 @@ def _execute_artifact(self, artifact: Artifact) -> ExecutionResult:
531531
md5 = compute_md5(out)
532532
size = compute_file_size(out)
533533

534+
# Cache the output blob so historical versions can be retrieved
535+
try:
536+
from dvx.cache import cache_blob
537+
cache_blob(out, md5)
538+
except Exception as e:
539+
self._log(f" ⚠ {path}: couldn't cache output: {e}")
540+
534541
dvc_file = write_dvc_file(
535542
output_path=out,
536543
md5=md5,
@@ -586,6 +593,13 @@ def _handle_co_output(self, artifact: Artifact, cmd: str) -> ExecutionResult:
586593
md5 = compute_md5(out)
587594
size = compute_file_size(out)
588595

596+
# Cache the co-output blob
597+
try:
598+
from dvx.cache import cache_blob
599+
cache_blob(out, md5)
600+
except Exception as e:
601+
self._log(f" ⚠ {path}: couldn't cache co-output: {e}")
602+
589603
deps_hashes = {}
590604
git_deps_hashes = {}
591605
if self.config.provenance and artifact.computation:

tests/test_executor.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,3 +422,61 @@ def test_after_ordering(tmp_path):
422422
assert a_level is not None
423423
assert b_level is not None
424424
assert a_level < b_level, f"stage_a (level {a_level}) should be before stage_b (level {b_level})"
425+
426+
427+
def test_run_caches_output_blob(tmp_path):
428+
"""dvx run copies output blobs to local cache so historical versions persist."""
429+
import os
430+
431+
os.chdir(tmp_path)
432+
433+
# Initialize a DVC repo (need .dvc dir for cache_blob)
434+
subprocess.run(["dvc", "init", "--no-scm"], cwd=tmp_path, capture_output=True, check=True)
435+
436+
output = tmp_path / "result.txt"
437+
artifact = Artifact(
438+
path=str(output),
439+
computation=Computation(cmd=f"echo 'hello world' > {output}"),
440+
)
441+
442+
config = ExecutionConfig(max_workers=1)
443+
output_log = StringIO()
444+
executor = ParallelExecutor([artifact], config, output_log)
445+
results = executor.execute()
446+
447+
assert all(r.success for r in results)
448+
assert output.exists()
449+
450+
# Verify the output blob was added to cache
451+
from dvx.run.hash import compute_md5
452+
md5 = compute_md5(output)
453+
cache_path = tmp_path / ".dvc" / "cache" / "files" / "md5" / md5[:2] / md5[2:]
454+
assert cache_path.exists(), f"Output blob should be cached at {cache_path}"
455+
assert cache_path.read_text() == output.read_text()
456+
457+
458+
def test_run_cache_idempotent(tmp_path):
459+
"""Caching is idempotent — re-running doesn't error if blob already cached."""
460+
import os
461+
462+
os.chdir(tmp_path)
463+
subprocess.run(["dvc", "init", "--no-scm"], cwd=tmp_path, capture_output=True, check=True)
464+
465+
output = tmp_path / "data.txt"
466+
artifact = Artifact(
467+
path=str(output),
468+
computation=Computation(cmd=f"echo 'data' > {output}"),
469+
)
470+
471+
# Run twice
472+
for _ in range(2):
473+
config = ExecutionConfig(max_workers=1, force=True)
474+
executor = ParallelExecutor([artifact], config, StringIO())
475+
results = executor.execute()
476+
assert all(r.success for r in results)
477+
478+
# Blob should still be cached
479+
from dvx.run.hash import compute_md5
480+
md5 = compute_md5(output)
481+
cache_path = tmp_path / ".dvc" / "cache" / "files" / "md5" / md5[:2] / md5[2:]
482+
assert cache_path.exists()

0 commit comments

Comments
 (0)