Skip to content

Commit f925961

Browse files
fix: review agent findings — SAN type, UUID dedup, proxy safety
- SanType::Rfc822Name → SanType::URI for urn:uuid: (correct X.509 type) - GeneralName::RFC822Name → GeneralName::URI in extraction - Reject duplicate agent UUID on enrollment (409 Conflict) - tokio::join! instead of select! for session proxy (prevents data loss) - JoinSet instead of Vec<JoinHandle> (prevents unbounded growth) - Timeout (30s) on session handshake recv_request/recv_response - Fix typos: "redeemd" → "redeemed" Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5887c21 commit f925961

5 files changed

Lines changed: 33 additions & 25 deletions

File tree

devolutions-agent/src/tunnel.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,10 @@ async fn run_session_proxy(advertise_subnets: Vec<Ipv4Network>, send: quinn::Sen
447447
let _: anyhow::Result<()> = async {
448448
let mut session: SessionStream<_, _> = (send, recv).into();
449449

450-
let connect_msg = session.recv_request().await.context("recv ConnectRequest")?;
450+
let connect_msg = tokio::time::timeout(Duration::from_secs(30), session.recv_request())
451+
.await
452+
.context("session handshake timeout")?
453+
.context("recv ConnectRequest")?;
451454

452455
info!(
453456
session_id = %connect_msg.session_id,
@@ -483,14 +486,14 @@ async fn run_session_proxy(advertise_subnets: Vec<Ipv4Network>, send: quinn::Sen
483486
let (mut send, mut recv) = session.into_inner();
484487
let (mut tcp_read, mut tcp_write) = tcp_stream.into_split();
485488

486-
tokio::select! {
487-
r = tokio::io::copy(&mut recv, &mut tcp_write) => {
488-
r.inspect_err(|e| debug!(%e, "QUIC->TCP copy ended"))?;
489-
}
490-
r = tokio::io::copy(&mut tcp_read, &mut send) => {
491-
r.inspect_err(|e| debug!(%e, "TCP->QUIC copy ended"))?;
492-
}
493-
}
489+
// Use join! (not select!) to wait for BOTH directions to finish.
490+
// select! would cancel in-flight data when one direction closes first.
491+
let (r1, r2) = tokio::join!(
492+
tokio::io::copy(&mut recv, &mut tcp_write),
493+
tokio::io::copy(&mut tcp_read, &mut send),
494+
);
495+
r1.inspect_err(|e| debug!(%e, "QUIC->TCP copy ended"))?;
496+
r2.inspect_err(|e| debug!(%e, "TCP->QUIC copy ended"))?;
494497

495498
Ok(())
496499
}

devolutions-gateway/src/agent_tunnel/cert.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl CaManager {
141141
agent_params
142142
.distinguished_name
143143
.push(DnType::OrganizationName, CA_ORG_NAME);
144-
agent_params.subject_alt_names.push(SanType::Rfc822Name(
144+
agent_params.subject_alt_names.push(SanType::URI(
145145
format!("urn:uuid:{agent_id}").try_into().context("SAN URI")?,
146146
));
147147
if let Some(hostname) = agent_hostname {
@@ -364,7 +364,7 @@ pub fn extract_agent_id_from_der(der_bytes: &[u8]) -> anyhow::Result<Uuid> {
364364
for ext in cert.extensions() {
365365
if let x509_parser::extensions::ParsedExtension::SubjectAlternativeName(san) = ext.parsed_extension() {
366366
for name in &san.general_names {
367-
if let x509_parser::extensions::GeneralName::RFC822Name(val) = name
367+
if let x509_parser::extensions::GeneralName::URI(val) = name
368368
&& let Some(uuid_str) = val.strip_prefix("urn:uuid:")
369369
{
370370
return uuid_str.parse().context("parse UUID from SAN");

devolutions-gateway/src/agent_tunnel/enrollment_store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! In-memory store for one-time enrollment tokens.
22
//!
3-
//! Tokens are generated by the webapp enrollment-string endpoint and redeemd
3+
//! Tokens are generated by the webapp enrollment-string endpoint and redeemed
44
//! by the agent enrollment endpoint. Each token is single-use and has an expiry.
55
66
use std::time::{SystemTime, UNIX_EPOCH};
@@ -50,7 +50,7 @@ impl EnrollmentTokenStore {
5050

5151
/// Consumes a token if it exists and is not expired.
5252
///
53-
/// Returns `true` if the token was valid and has been redeemd (removed).
53+
/// Returns `true` if the token was valid and has been redeemed (removed).
5454
/// Returns `false` if the token doesn't exist or is expired.
5555
pub fn redeem(&self, token: &str) -> bool {
5656
let now = current_time_secs();

devolutions-gateway/src/agent_tunnel/listener.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ impl AgentTunnelHandle {
7575
.await
7676
.map_err(|e| anyhow::anyhow!("send ConnectRequest: {e}"))?;
7777

78-
// Read ConnectResponse.
79-
let response = session
80-
.recv_response()
78+
// Read ConnectResponse (with timeout to prevent stalled peers).
79+
let response = tokio::time::timeout(Duration::from_secs(30), session.recv_response())
8180
.await
81+
.map_err(|_| anyhow::anyhow!("session handshake timeout (30s)"))?
8282
.map_err(|e| anyhow::anyhow!("recv ConnectResponse: {e}"))?;
8383

8484
if !response.is_success() {
@@ -174,7 +174,7 @@ impl devolutions_gateway_task::Task for AgentTunnelListener {
174174
let local_addr = self.endpoint.local_addr()?;
175175
info!(%local_addr, "Agent tunnel listener started");
176176

177-
let mut conn_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
177+
let mut conn_handles = tokio::task::JoinSet::new();
178178

179179
loop {
180180
tokio::select! {
@@ -195,19 +195,17 @@ impl devolutions_gateway_task::Task for AgentTunnelListener {
195195
let registry = Arc::clone(&self.registry);
196196
let agent_connections = Arc::clone(&self.agent_connections);
197197

198-
conn_handles.push(tokio::spawn(
198+
conn_handles.spawn(
199199
run_agent_connection(registry, agent_connections, incoming),
200-
));
200+
);
201201
}
202+
203+
// Reap completed connection tasks to prevent unbounded growth.
204+
Some(_) = conn_handles.join_next() => {}
202205
}
203206
}
204207

205-
for handle in &conn_handles {
206-
handle.abort();
207-
}
208-
for handle in conn_handles {
209-
let _ = handle.await;
210-
}
208+
conn_handles.shutdown().await;
211209

212210
Ok(())
213211
}

devolutions-gateway/src/api/tunnel.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ async fn enroll_agent(
114114

115115
let agent_id = req.agent_id;
116116

117+
// Reject duplicate agent IDs to prevent identity shadowing.
118+
if handle.registry().get(&agent_id).is_some() {
119+
return Err(
120+
crate::http::HttpErrorBuilder::new(axum::http::StatusCode::CONFLICT).msg("agent ID already registered")
121+
);
122+
}
123+
117124
let signed = handle
118125
.ca_manager()
119126
.sign_agent_csr(agent_id, &req.agent_name, &req.csr_pem, req.agent_hostname.as_deref())

0 commit comments

Comments
 (0)