Skip to content

Commit 5887c21

Browse files
fix: address review comments (round 2)
- Remove agent_name from EnrollResponse (agent knows it already) - Agent generates its own UUID and sends it in EnrollRequest - Rename api/agent_enrollment.rs → api/tunnel.rs (match endpoint) - Use backoff crate for reconnect loop (same pattern as subscriber.rs) - ALPN: "devolutions-agent-tunnel" → "gw-agent-tunnel/1" (versioned) - Protocol version: 2 → 1 (previous was experimental, start fresh) - Move session tests to integration test file (public API only) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a9772f2 commit 5887c21

10 files changed

Lines changed: 130 additions & 133 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/agent-tunnel-proto/src/session.rs

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -60,99 +60,3 @@ impl ConnectResponse {
6060
}
6161
}
6262
}
63-
64-
#[cfg(test)]
65-
mod tests {
66-
use super::*;
67-
use crate::stream::SessionStream;
68-
69-
#[tokio::test]
70-
async fn roundtrip_connect_request() {
71-
let msg = ConnectRequest::new(Uuid::new_v4(), "192.168.1.100:3389".to_owned());
72-
73-
let mut buf = Vec::new();
74-
let mut stream = SessionStream::new(&mut buf, &[][..]);
75-
stream.send_request(&msg).await.expect("send should succeed");
76-
77-
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
78-
let decoded = stream.recv_request().await.expect("recv should succeed");
79-
assert_eq!(msg, decoded);
80-
}
81-
82-
#[tokio::test]
83-
async fn roundtrip_connect_response_success() {
84-
let msg = ConnectResponse::success();
85-
86-
let mut buf = Vec::new();
87-
let mut stream = SessionStream::new(&mut buf, &[][..]);
88-
stream.send_response(&msg).await.expect("send should succeed");
89-
90-
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
91-
let decoded = stream.recv_response().await.expect("recv should succeed");
92-
assert_eq!(msg, decoded);
93-
}
94-
95-
#[tokio::test]
96-
async fn roundtrip_connect_response_error() {
97-
let msg = ConnectResponse::error("connection refused");
98-
99-
let mut buf = Vec::new();
100-
let mut stream = SessionStream::new(&mut buf, &[][..]);
101-
stream.send_response(&msg).await.expect("send should succeed");
102-
103-
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
104-
let decoded = stream.recv_response().await.expect("recv should succeed");
105-
assert_eq!(msg, decoded);
106-
}
107-
}
108-
109-
#[cfg(test)]
110-
mod proptests {
111-
use proptest::prelude::*;
112-
113-
use super::*;
114-
use crate::stream::SessionStream;
115-
116-
fn arb_connect_request() -> impl Strategy<Value = ConnectRequest> {
117-
("[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}:[0-9]{1,5}")
118-
.prop_map(|target| ConnectRequest::new(Uuid::new_v4(), target))
119-
}
120-
121-
fn arb_connect_response() -> impl Strategy<Value = ConnectResponse> {
122-
prop_oneof![Just(ConnectResponse::success()), ".*".prop_map(ConnectResponse::error),]
123-
}
124-
125-
proptest! {
126-
#[test]
127-
fn connect_request_roundtrip(msg in arb_connect_request()) {
128-
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
129-
rt.block_on(async {
130-
let mut buf = Vec::new();
131-
let mut stream = SessionStream::new(&mut buf, &[][..]);
132-
stream.send_request(&msg).await.expect("send should succeed");
133-
134-
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
135-
let decoded = stream.recv_request().await.expect("recv should succeed");
136-
prop_assert_eq!(&msg.target, &decoded.target);
137-
prop_assert_eq!(msg.protocol_version, decoded.protocol_version);
138-
prop_assert_eq!(msg.session_id, decoded.session_id);
139-
Ok(())
140-
})?;
141-
}
142-
143-
#[test]
144-
fn connect_response_roundtrip(msg in arb_connect_response()) {
145-
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
146-
rt.block_on(async {
147-
let mut buf = Vec::new();
148-
let mut stream = SessionStream::new(&mut buf, &[][..]);
149-
stream.send_response(&msg).await.expect("send should succeed");
150-
151-
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
152-
let decoded = stream.recv_response().await.expect("recv should succeed");
153-
prop_assert_eq!(msg, decoded);
154-
Ok(())
155-
})?;
156-
}
157-
}
158-
}

crates/agent-tunnel-proto/src/version.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/// Current protocol version.
2-
pub const CURRENT_PROTOCOL_VERSION: u16 = 2;
2+
pub const CURRENT_PROTOCOL_VERSION: u16 = 1;
33

44
/// Minimum protocol version that is still accepted.
5-
pub const MIN_SUPPORTED_VERSION: u16 = 2;
5+
pub const MIN_SUPPORTED_VERSION: u16 = 1;
66

77
/// Validate that a received protocol version is within the supported range.
88
pub fn validate_protocol_version(version: u16) -> Result<(), crate::error::ProtoError> {
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use agent_tunnel_proto::{ConnectRequest, ConnectResponse, SessionStream};
2+
use uuid::Uuid;
3+
4+
#[tokio::test]
5+
async fn roundtrip_connect_request() {
6+
let msg = ConnectRequest::new(Uuid::new_v4(), "192.168.1.100:3389".to_owned());
7+
8+
let mut buf = Vec::new();
9+
let mut stream = SessionStream::new(&mut buf, &[][..]);
10+
stream.send_request(&msg).await.expect("send should succeed");
11+
12+
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
13+
let decoded = stream.recv_request().await.expect("recv should succeed");
14+
assert_eq!(msg, decoded);
15+
}
16+
17+
#[tokio::test]
18+
async fn roundtrip_connect_response_success() {
19+
let msg = ConnectResponse::success();
20+
21+
let mut buf = Vec::new();
22+
let mut stream = SessionStream::new(&mut buf, &[][..]);
23+
stream.send_response(&msg).await.expect("send should succeed");
24+
25+
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
26+
let decoded = stream.recv_response().await.expect("recv should succeed");
27+
assert_eq!(msg, decoded);
28+
}
29+
30+
#[tokio::test]
31+
async fn roundtrip_connect_response_error() {
32+
let msg = ConnectResponse::error("connection refused");
33+
34+
let mut buf = Vec::new();
35+
let mut stream = SessionStream::new(&mut buf, &[][..]);
36+
stream.send_response(&msg).await.expect("send should succeed");
37+
38+
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
39+
let decoded = stream.recv_response().await.expect("recv should succeed");
40+
assert_eq!(msg, decoded);
41+
}
42+
43+
mod proptests {
44+
use agent_tunnel_proto::{ConnectRequest, ConnectResponse, SessionStream};
45+
use proptest::prelude::*;
46+
use uuid::Uuid;
47+
48+
fn arb_connect_request() -> impl Strategy<Value = ConnectRequest> {
49+
("[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}:[0-9]{1,5}")
50+
.prop_map(|target| ConnectRequest::new(Uuid::new_v4(), target))
51+
}
52+
53+
fn arb_connect_response() -> impl Strategy<Value = ConnectResponse> {
54+
prop_oneof![Just(ConnectResponse::success()), ".*".prop_map(ConnectResponse::error),]
55+
}
56+
57+
proptest! {
58+
#[test]
59+
fn connect_request_roundtrip(msg in arb_connect_request()) {
60+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
61+
rt.block_on(async {
62+
let mut buf = Vec::new();
63+
let mut stream = SessionStream::new(&mut buf, &[][..]);
64+
stream.send_request(&msg).await.expect("send should succeed");
65+
66+
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
67+
let decoded = stream.recv_request().await.expect("recv should succeed");
68+
prop_assert_eq!(&msg.target, &decoded.target);
69+
prop_assert_eq!(msg.protocol_version, decoded.protocol_version);
70+
prop_assert_eq!(msg.session_id, decoded.session_id);
71+
Ok(())
72+
})?;
73+
}
74+
75+
#[test]
76+
fn connect_response_roundtrip(msg in arb_connect_response()) {
77+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
78+
rt.block_on(async {
79+
let mut buf = Vec::new();
80+
let mut stream = SessionStream::new(&mut buf, &[][..]);
81+
stream.send_response(&msg).await.expect("send should succeed");
82+
83+
let mut stream = SessionStream::new(tokio::io::sink(), buf.as_slice());
84+
let decoded = stream.recv_response().await.expect("recv should succeed");
85+
prop_assert_eq!(msg, decoded);
86+
Ok(())
87+
})?;
88+
}
89+
}
90+
}

devolutions-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ workspace = true
1414
[dependencies]
1515
agent-tunnel-proto = { path = "../crates/agent-tunnel-proto" }
1616
anyhow = "1"
17+
backoff = "0.4"
1718
async-trait = "0.1"
1819
bincode = "1.3"
1920
base64 = "0.22"

devolutions-agent/src/enrollment.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use crate::config;
1313
/// Request body for enrollment API
1414
#[derive(Serialize)]
1515
struct EnrollRequest {
16+
/// Agent-generated UUID (the agent owns its identity)
17+
agent_id: Uuid,
1618
/// Friendly name for the agent
1719
agent_name: String,
1820
/// PEM-encoded Certificate Signing Request
@@ -26,7 +28,6 @@ struct EnrollRequest {
2628
#[derive(Deserialize)]
2729
struct EnrollResponse {
2830
agent_id: Uuid,
29-
agent_name: String,
3031
client_cert_pem: String,
3132
gateway_ca_cert_pem: String,
3233
quic_endpoint: String,
@@ -70,7 +71,7 @@ pub async fn bootstrap_and_persist(
7071
let (key_pem, csr_pem) = generate_key_and_csr(agent_name)?;
7172

7273
let enroll_response = request_enrollment(gateway_url, enrollment_token, agent_name, &csr_pem).await?;
73-
persist_enrollment_response(advertise_subnets, enroll_response, &key_pem)
74+
persist_enrollment_response(agent_name, advertise_subnets, enroll_response, &key_pem)
7475
}
7576

7677
/// Generate an ECDSA P-256 key pair and a CSR containing the agent name as CN.
@@ -103,6 +104,7 @@ async fn request_enrollment(
103104
.post(&enroll_url)
104105
.bearer_auth(enrollment_token)
105106
.json(&EnrollRequest {
107+
agent_id: Uuid::new_v4(),
106108
agent_name: agent_name.to_owned(),
107109
csr_pem: csr_pem.to_owned(),
108110
agent_hostname: hostname::get()
@@ -124,6 +126,7 @@ async fn request_enrollment(
124126
}
125127

126128
fn persist_enrollment_response(
129+
agent_name: &str,
127130
advertise_subnets: Vec<String>,
128131
enroll_response: EnrollResponse,
129132
key_pem: &str,
@@ -192,7 +195,7 @@ fn persist_enrollment_response(
192195

193196
Ok(PersistedEnrollment {
194197
agent_id: enroll_response.agent_id,
195-
agent_name: enroll_response.agent_name,
198+
agent_name: agent_name.to_owned(),
196199
client_cert_path,
197200
client_key_path,
198201
gateway_ca_path,

devolutions-agent/src/tunnel.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,28 +106,25 @@ impl Task for TunnelTask {
106106
type Output = anyhow::Result<()>;
107107
const NAME: &'static str = "tunnel";
108108

109-
/// Reconnect loop with exponential backoff and jitter.
109+
/// Reconnect loop with exponential backoff (using the `backoff` crate).
110110
///
111-
/// Backoff strategy:
112-
/// - Starts at 1s, doubles each retry (with ±25% jitter), caps at 60s.
113-
/// - Resets to 1s after a connection survives 30s (considered stable).
114-
///
115-
/// Example progression (without jitter):
116-
/// attempt 1: fail immediately → wait ~1s
117-
/// attempt 2: fail immediately → wait ~2s
118-
/// attempt 3: fail immediately → wait ~4s
119-
/// attempt 4: fail immediately → wait ~8s
120-
/// ...
121-
/// attempt N: fail immediately → wait 60s (cap)
122-
/// attempt M: connected 45s → next backoff resets to 1s
111+
/// Resets to initial interval after a connection survives 30s (considered stable).
123112
async fn run(self, mut shutdown_signal: ShutdownSignal) -> anyhow::Result<()> {
124-
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
125-
const MAX_BACKOFF: Duration = Duration::from_secs(60);
113+
use backoff::backoff::Backoff as _;
114+
115+
const RETRY_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
116+
const RETRY_MAX_INTERVAL: Duration = Duration::from_secs(60);
117+
const RETRY_MULTIPLIER: f64 = 2.0;
126118
const CONNECTED_THRESHOLD: Duration = Duration::from_secs(30);
127119

128120
info!("Starting QUIC agent tunnel (with auto-reconnect)");
129121

130-
let mut backoff = INITIAL_BACKOFF;
122+
let mut backoff = backoff::ExponentialBackoffBuilder::default()
123+
.with_initial_interval(RETRY_INITIAL_INTERVAL)
124+
.with_max_interval(RETRY_MAX_INTERVAL)
125+
.with_multiplier(RETRY_MULTIPLIER)
126+
.with_max_elapsed_time(None) // retry forever
127+
.build();
131128

132129
loop {
133130
let start = std::time::Instant::now();
@@ -144,23 +141,25 @@ impl Task for TunnelTask {
144141

145142
// Reset backoff if the connection was stable long enough.
146143
if start.elapsed() > CONNECTED_THRESHOLD {
147-
backoff = INITIAL_BACKOFF;
144+
backoff.reset();
148145
}
149146

150-
info!(?backoff, "Reconnecting after backoff");
147+
let Some(wait) = backoff.next_backoff() else {
148+
// Should never happen with max_elapsed_time(None), but just in case.
149+
warn!("Backoff exhausted, resetting");
150+
backoff.reset();
151+
continue;
152+
};
153+
154+
info!(?wait, "Reconnecting after backoff");
151155

152156
tokio::select! {
153157
_ = shutdown_signal.wait() => {
154158
info!("Shutdown during reconnect backoff");
155159
return Ok(());
156160
}
157-
_ = tokio::time::sleep(backoff) => {}
161+
_ = tokio::time::sleep(wait) => {}
158162
}
159-
160-
// Exponential backoff with ±25% jitter to avoid thundering herd.
161-
let jitter_factor = rand::Rng::gen_range(&mut rand::thread_rng(), 0.75..1.25);
162-
backoff =
163-
Duration::from_secs_f64((backoff.as_secs_f64() * 2.0 * jitter_factor).min(MAX_BACKOFF.as_secs_f64()));
164163
}
165164
}
166165
}
@@ -290,7 +289,7 @@ async fn run_single_connection(conf_handle: &ConfHandle, shutdown_signal: &mut S
290289
.with_client_auth_cert(certs, key)
291290
.context("build rustls client config with client auth")?;
292291

293-
client_crypto.alpn_protocols = vec![b"devolutions-agent-tunnel".to_vec()];
292+
client_crypto.alpn_protocols = vec![b"gw-agent-tunnel/1".to_vec()];
294293

295294
let mut transport = quinn::TransportConfig::default();
296295
transport

devolutions-gateway/src/agent_tunnel/cert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ impl CaManager {
299299
.with_single_cert(server_cert_chain, server_private_key)
300300
.context("build rustls ServerConfig")?;
301301

302-
tls_config.alpn_protocols = vec![b"devolutions-agent-tunnel".to_vec()];
302+
tls_config.alpn_protocols = vec![b"gw-agent-tunnel/1".to_vec()];
303303

304304
Ok(tls_config)
305305
}

devolutions-gateway/src/api/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
pub mod agent_enrollment;
21
pub mod ai;
32
pub mod config;
43
pub mod diagnostics;
@@ -16,6 +15,7 @@ pub mod rdp;
1615
pub mod session;
1716
pub mod sessions;
1817
pub mod traffic;
18+
pub mod tunnel;
1919
pub mod update;
2020
pub mod webapp;
2121

@@ -36,7 +36,7 @@ pub fn make_router<S>(state: crate::DgwState) -> axum::Router<S> {
3636
.nest("/jet/webapp", webapp::make_router(state.clone()))
3737
.nest("/jet/net", net::make_router(state.clone()))
3838
.nest("/jet/traffic", traffic::make_router(state.clone()))
39-
.nest("/jet/tunnel", agent_enrollment::make_router(state.clone()))
39+
.nest("/jet/tunnel", tunnel::make_router(state.clone()))
4040
.nest("/jet/update", update::make_router(state.clone()));
4141

4242
if state.conf_handle.get_conf().web_app.enabled {

0 commit comments

Comments
 (0)