Skip to content

Commit 5607981

Browse files
committed
feat: ttl support for atomic operations
1 parent c877189 commit 5607981

File tree

10 files changed

+522
-8
lines changed

10 files changed

+522
-8
lines changed

crates/storage-ledger/src/backend.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ impl LedgerBackend {
395395
///
396396
/// Returns [`StorageError::Internal`] if the system clock is set before
397397
/// the Unix epoch.
398-
fn compute_expiration_timestamp(ttl: Duration) -> StorageResult<u64> {
398+
pub(crate) fn compute_expiration_timestamp(ttl: Duration) -> StorageResult<u64> {
399399
std::time::SystemTime::now()
400400
.duration_since(std::time::UNIX_EPOCH)
401401
.map(|d| d.as_secs() + ttl.as_secs())
@@ -809,6 +809,70 @@ impl StorageBackend for LedgerBackend {
809809
result
810810
}
811811

812+
/// Performs a compare-and-set operation with TTL, retrying on transient errors.
813+
///
814+
/// Combines CAS precondition checking with automatic key expiration.
815+
/// Conflict errors (`FailedPrecondition`) are **not** retried.
816+
#[tracing::instrument(skip(self, key, expected, new_value), fields(key_len = key.len(), ttl_ms = ttl.as_millis() as u64))]
817+
async fn compare_and_set_with_ttl(
818+
&self,
819+
key: &[u8],
820+
expected: Option<&[u8]>,
821+
new_value: Vec<u8>,
822+
ttl: Duration,
823+
) -> StorageResult<()> {
824+
self.check_cancelled()?;
825+
self.check_circuit()?;
826+
self.check_sizes(key, &new_value)?;
827+
let start = std::time::Instant::now();
828+
let encoded_key = encode_key(key);
829+
let expires_at = Self::compute_expiration_timestamp(ttl)?;
830+
831+
let condition = match expected {
832+
None => SetCondition::NotExists,
833+
Some(expected_value) => SetCondition::ValueEquals(expected_value.to_vec()),
834+
};
835+
836+
use inferadb_ledger_sdk::SdkError;
837+
use tonic::Code;
838+
839+
let result = with_retry_timeout(
840+
&self.retry_config,
841+
self.timeout_config.write_timeout,
842+
None,
843+
"compare_and_set_with_ttl",
844+
|| async {
845+
match self
846+
.client
847+
.write(
848+
self.organization,
849+
self.vault,
850+
vec![Operation::SetEntity {
851+
key: encoded_key.clone(),
852+
value: new_value.clone(),
853+
expires_at: Some(expires_at),
854+
condition: Some(condition.clone()),
855+
}],
856+
)
857+
.await
858+
{
859+
Ok(_) => Ok(()),
860+
Err(SdkError::Rpc { code: Code::FailedPrecondition, .. }) => {
861+
Err(StorageError::conflict())
862+
},
863+
Err(e) => Err(StorageError::from(LedgerStorageError::from(e))),
864+
}
865+
},
866+
)
867+
.await;
868+
self.record_circuit_result(&result);
869+
self.metrics.record_set_org(start.elapsed(), &self.org_str());
870+
if result.is_err() {
871+
self.metrics.record_error_org(&self.org_str());
872+
}
873+
result
874+
}
875+
812876
/// Creates a new [`LedgerTransaction`] for buffered atomic writes.
813877
#[tracing::instrument(skip(self))]
814878
async fn transaction(&self) -> StorageResult<Box<dyn Transaction>> {

crates/storage-ledger/src/transaction.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use std::{
88
collections::{HashMap, HashSet},
99
sync::Arc,
10+
time::Duration,
1011
};
1112

1213
use async_trait::async_trait;
@@ -27,6 +28,8 @@ struct CasOperation {
2728
expected: Option<Vec<u8>>,
2829
/// New value to set if condition is met.
2930
new_value: Vec<u8>,
31+
/// Optional TTL for the new value.
32+
ttl: Option<Duration>,
3033
}
3134

3235
/// Transaction for atomic operations on the Ledger backend.
@@ -120,6 +123,9 @@ pub struct LedgerTransaction {
120123
/// Pending delete operations: hex-encoded keys.
121124
pending_deletes: HashSet<String>,
122125

126+
/// Pending TTLs for set operations: hex-encoded key -> duration.
127+
pending_ttls: HashMap<String, Duration>,
128+
123129
/// Pending compare-and-set operations.
124130
pending_cas: Vec<CasOperation>,
125131
}
@@ -151,6 +157,7 @@ impl LedgerTransaction {
151157
read_consistency,
152158
pending_sets: HashMap::new(),
153159
pending_deletes: HashSet::new(),
160+
pending_ttls: HashMap::new(),
154161
pending_cas: Vec::new(),
155162
}
156163
}
@@ -199,6 +206,9 @@ impl Transaction for LedgerTransaction {
199206
// Remove from pending deletes if it was marked for deletion
200207
self.pending_deletes.remove(&encoded_key);
201208

209+
// Clear any pending TTL — a plain set produces a non-expiring key
210+
self.pending_ttls.remove(&encoded_key);
211+
202212
// Add to pending sets
203213
self.pending_sets.insert(encoded_key, value);
204214
}
@@ -207,6 +217,9 @@ impl Transaction for LedgerTransaction {
207217
fn delete(&mut self, key: Vec<u8>) {
208218
let encoded_key = encode_key(&key);
209219

220+
// Clear any pending TTL
221+
self.pending_ttls.remove(&encoded_key);
222+
210223
// Remove from pending sets if it was set in this transaction
211224
self.pending_sets.remove(&encoded_key);
212225

@@ -224,7 +237,40 @@ impl Transaction for LedgerTransaction {
224237
let encoded_key = encode_key(&key);
225238

226239
// Buffer the CAS operation - it will be applied at commit time
227-
self.pending_cas.push(CasOperation { key: encoded_key, expected, new_value });
240+
self.pending_cas.push(CasOperation { key: encoded_key, expected, new_value, ttl: None });
241+
Ok(())
242+
}
243+
244+
/// Buffers a set operation with TTL for atomic commit.
245+
fn set_with_ttl(&mut self, key: Vec<u8>, value: Vec<u8>, ttl: Duration) {
246+
let encoded_key = encode_key(&key);
247+
248+
// Remove from pending deletes if it was marked for deletion
249+
self.pending_deletes.remove(&encoded_key);
250+
251+
// Track the TTL for this key
252+
self.pending_ttls.insert(encoded_key.clone(), ttl);
253+
254+
// Add to pending sets
255+
self.pending_sets.insert(encoded_key, value);
256+
}
257+
258+
/// Buffers a compare-and-set operation with TTL for atomic commit.
259+
fn compare_and_set_with_ttl(
260+
&mut self,
261+
key: Vec<u8>,
262+
expected: Option<Vec<u8>>,
263+
new_value: Vec<u8>,
264+
ttl: Duration,
265+
) -> StorageResult<()> {
266+
let encoded_key = encode_key(&key);
267+
268+
self.pending_cas.push(CasOperation {
269+
key: encoded_key,
270+
expected,
271+
new_value,
272+
ttl: Some(ttl),
273+
});
228274
Ok(())
229275
}
230276

@@ -253,12 +299,24 @@ impl Transaction for LedgerTransaction {
253299
None => SetCondition::NotExists,
254300
Some(expected_value) => SetCondition::ValueEquals(expected_value),
255301
};
256-
operations.push(Operation::set_entity_if(cas.key, cas.new_value, condition));
302+
let expires_at =
303+
cas.ttl.map(crate::LedgerBackend::compute_expiration_timestamp).transpose()?;
304+
operations.push(Operation::SetEntity {
305+
key: cas.key,
306+
value: cas.new_value,
307+
expires_at,
308+
condition: Some(condition),
309+
});
257310
}
258311

259312
// Add regular set operations
260313
for (key, value) in self.pending_sets {
261-
operations.push(Operation::set_entity(key, value));
314+
if let Some(ttl) = self.pending_ttls.get(&key) {
315+
let expires_at = crate::LedgerBackend::compute_expiration_timestamp(*ttl)?;
316+
operations.push(Operation::set_entity_with_expiry(key, value, expires_at));
317+
} else {
318+
operations.push(Operation::set_entity(key, value));
319+
}
262320
}
263321

264322
// Add delete operations

crates/storage/src/backend.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::{
5757
/// | [`get_range`](StorageBackend::get_range) | Retrieve multiple keys in a range |
5858
/// | [`clear_range`](StorageBackend::clear_range) | Delete multiple keys in a range |
5959
/// | [`set_with_ttl`](StorageBackend::set_with_ttl) | Store with automatic expiration |
60+
/// | [`compare_and_set_with_ttl`](StorageBackend::compare_and_set_with_ttl) | Atomic compare-and-set with expiration |
6061
/// | [`transaction`](StorageBackend::transaction) | Begin an atomic transaction |
6162
/// | [`health_check`](StorageBackend::health_check) | Verify backend availability |
6263
///
@@ -407,6 +408,42 @@ pub trait StorageBackend: Send + Sync {
407408
#[must_use = "storage operations may fail and errors must be handled"]
408409
async fn set_with_ttl(&self, key: Vec<u8>, value: Vec<u8>, ttl: Duration) -> StorageResult<()>;
409410

411+
/// Atomically sets a key's value if it matches the expected current value,
412+
/// with automatic expiration.
413+
///
414+
/// Combines the semantics of [`compare_and_set`](StorageBackend::compare_and_set)
415+
/// with [`set_with_ttl`](StorageBackend::set_with_ttl): the CAS precondition
416+
/// is evaluated first, and on success the new value is stored with the
417+
/// given TTL.
418+
///
419+
/// # Semantics
420+
///
421+
/// See [`compare_and_set`](StorageBackend::compare_and_set) for the full
422+
/// CAS precondition rules. The only difference is that on success the key
423+
/// is given a TTL instead of becoming non-expiring.
424+
///
425+
/// # Arguments
426+
///
427+
/// * `key` — The key to update.
428+
/// * `expected` — The expected current value. Use `None` for insert-if-absent.
429+
/// * `new_value` — The new value to set if the comparison succeeds.
430+
/// * `ttl` — Time-to-live duration after which the key expires.
431+
///
432+
/// # Errors
433+
///
434+
/// - [`StorageError::Conflict`](crate::StorageError) — the current value does not match
435+
/// `expected`.
436+
/// - [`StorageError::SizeLimitExceeded`] — `key` or `new_value` exceeds the configured size
437+
/// limits.
438+
#[must_use = "compare-and-set may fail with a conflict and errors must be handled"]
439+
async fn compare_and_set_with_ttl(
440+
&self,
441+
key: &[u8],
442+
expected: Option<&[u8]>,
443+
new_value: Vec<u8>,
444+
ttl: Duration,
445+
) -> StorageResult<()>;
446+
410447
/// Begins a new transaction.
411448
///
412449
/// Returns a [`Transaction`] handle that can be used to perform

crates/storage/src/batch.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,23 @@ mod tests {
10571057
self.inner.get_mut().expect("lock poisoned").compare_and_set(key, expected, new_value)
10581058
}
10591059

1060+
fn set_with_ttl(&mut self, key: Vec<u8>, value: Vec<u8>, ttl: std::time::Duration) {
1061+
self.inner.get_mut().expect("lock poisoned").set_with_ttl(key, value, ttl);
1062+
}
1063+
1064+
fn compare_and_set_with_ttl(
1065+
&mut self,
1066+
key: Vec<u8>,
1067+
expected: Option<Vec<u8>>,
1068+
new_value: Vec<u8>,
1069+
ttl: std::time::Duration,
1070+
) -> StorageResult<()> {
1071+
self.inner
1072+
.get_mut()
1073+
.expect("lock poisoned")
1074+
.compare_and_set_with_ttl(key, expected, new_value, ttl)
1075+
}
1076+
10601077
async fn commit(self: Box<Self>) -> StorageResult<()> {
10611078
if self.should_fail {
10621079
Err(StorageError::connection("simulated commit failure"))
@@ -1085,6 +1102,16 @@ mod tests {
10851102
self.inner.compare_and_set(key, expected, new_value).await
10861103
}
10871104

1105+
async fn compare_and_set_with_ttl(
1106+
&self,
1107+
key: &[u8],
1108+
expected: Option<&[u8]>,
1109+
new_value: Vec<u8>,
1110+
ttl: std::time::Duration,
1111+
) -> StorageResult<()> {
1112+
self.inner.compare_and_set_with_ttl(key, expected, new_value, ttl).await
1113+
}
1114+
10881115
async fn delete(&self, key: &[u8]) -> StorageResult<()> {
10891116
self.inner.delete(key).await
10901117
}

crates/storage/src/conformance.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,72 @@ pub async fn tx_cas_conflict_rejects_commit<B: StorageBackend>(backend: &B) {
308308
assert_eq!(val, Some(Bytes::from("v_concurrent")));
309309
}
310310

311+
// ============================================================================
312+
// Transaction TTL — set_with_ttl and compare_and_set_with_ttl in transactions (3 tests)
313+
// ============================================================================
314+
315+
/// `set_with_ttl` in a transaction applies the TTL on commit, and the key
316+
/// expires after the TTL elapses.
317+
pub async fn tx_set_with_ttl_applies_on_commit<B: StorageBackend>(backend: &B) {
318+
let mut tx = backend.transaction().await.expect("transaction");
319+
tx.set_with_ttl(
320+
b"txtl:expire".to_vec(),
321+
b"ephemeral".to_vec(),
322+
std::time::Duration::from_millis(50),
323+
);
324+
tx.commit().await.expect("commit");
325+
326+
// Immediately after commit the key should be readable
327+
let val = backend.get(b"txtl:expire").await.expect("get");
328+
assert_eq!(val, Some(Bytes::from("ephemeral")));
329+
330+
// After sleeping past the TTL the key should be gone
331+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
332+
let val = backend.get(b"txtl:expire").await.expect("get after ttl");
333+
assert_eq!(val, None);
334+
}
335+
336+
/// `set_with_ttl` in a transaction supports read-your-writes: a `get` in the
337+
/// same transaction returns the buffered value.
338+
pub async fn tx_set_with_ttl_read_your_writes<B: StorageBackend>(backend: &B) {
339+
let mut tx = backend.transaction().await.expect("transaction");
340+
tx.set_with_ttl(b"txtl:ryw".to_vec(), b"buffered".to_vec(), std::time::Duration::from_secs(60));
341+
342+
let val = tx.get(b"txtl:ryw").await.expect("get");
343+
assert_eq!(val, Some(Bytes::from("buffered")));
344+
345+
// Drop without commit — the key should not be visible
346+
drop(tx);
347+
let val = backend.get(b"txtl:ryw").await.expect("get after drop");
348+
assert_eq!(val, None);
349+
}
350+
351+
/// `compare_and_set_with_ttl` in a transaction applies both the CAS
352+
/// precondition and the TTL on commit.
353+
pub async fn tx_compare_and_set_with_ttl<B: StorageBackend>(backend: &B) {
354+
// Insert a key so we can CAS against it
355+
backend.set(b"txtl:cas".to_vec(), b"v1".to_vec()).await.expect("set");
356+
357+
let mut tx = backend.transaction().await.expect("transaction");
358+
tx.compare_and_set_with_ttl(
359+
b"txtl:cas".to_vec(),
360+
Some(b"v1".to_vec()),
361+
b"v2".to_vec(),
362+
std::time::Duration::from_millis(50),
363+
)
364+
.expect("buffer CAS with TTL");
365+
tx.commit().await.expect("commit");
366+
367+
// Immediately after commit the key should hold the new value
368+
let val = backend.get(b"txtl:cas").await.expect("get");
369+
assert_eq!(val, Some(Bytes::from("v2")));
370+
371+
// After sleeping past the TTL the key should be gone
372+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
373+
let val = backend.get(b"txtl:cas").await.expect("get after ttl");
374+
assert_eq!(val, None);
375+
}
376+
311377
// ============================================================================
312378
// CAS — compare_and_set precondition checks (4 tests)
313379
// ============================================================================
@@ -517,6 +583,11 @@ pub async fn run_all<B: StorageBackend + 'static>(backend: Arc<B>) {
517583
tx_delete_then_get_returns_none(backend.as_ref()).await;
518584
tx_cas_conflict_rejects_commit(backend.as_ref()).await;
519585

586+
// Transaction TTL
587+
tx_set_with_ttl_applies_on_commit(backend.as_ref()).await;
588+
tx_set_with_ttl_read_your_writes(backend.as_ref()).await;
589+
tx_compare_and_set_with_ttl(backend.as_ref()).await;
590+
520591
// CAS
521592
cas_insert_if_absent(backend.as_ref()).await;
522593
cas_insert_if_absent_fails_when_key_exists(backend.as_ref()).await;

0 commit comments

Comments
 (0)