Skip to content

feat: add real WebSocket and gRPC streaming transports#90

Merged
vsilent merged 8 commits intotrydirect:masterfrom
vsilent:feature/streaming-pipes
Apr 18, 2026
Merged

feat: add real WebSocket and gRPC streaming transports#90
vsilent merged 8 commits intotrydirect:masterfrom
vsilent:feature/streaming-pipes

Conversation

@vsilent
Copy link
Copy Markdown
Collaborator

@vsilent vsilent commented Apr 16, 2026

  • Replace WS stub with real tokio-tungstenite client (ws_fetch_source, ws_send_target, connect_and_stream)
  • Add gRPC client transport (grpc_fetch_source, grpc_send_target) with tonic/prost and JSON↔prost_types conversion
  • Add proto/pipe.proto with PipeService (Send + Subscribe RPCs)
  • Add build.rs for tonic-build protobuf compilation
  • Route ws:// and grpc:// target URLs in handle_trigger_pipe to appropriate streaming transports
  • Add tokio-tungstenite, tonic, prost, prost-types dependencies
  • All 371 lib tests + 19 integration tests passing

- Replace WS stub with real tokio-tungstenite client (ws_fetch_source,
  ws_send_target, connect_and_stream)
- Add gRPC client transport (grpc_fetch_source, grpc_send_target) with
  tonic/prost and JSON↔prost_types conversion
- Add proto/pipe.proto with PipeService (Send + Subscribe RPCs)
- Add build.rs for tonic-build protobuf compilation
- Route ws:// and grpc:// target URLs in handle_trigger_pipe to
  appropriate streaming transports
- Add tokio-tungstenite, tonic, prost, prost-types dependencies
- All 371 lib tests + 19 integration tests passing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 16, 2026 19:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements real streaming transports for the “trigger_pipe” mechanism by adding WebSocket (tokio-tungstenite) and gRPC (tonic/prost) client implementations, plus protobuf generation wiring, and routes ws:///grpc:// targets to the new transports.

Changes:

  • Replaced the WebSocket transport stub with real client send/fetch/stream functions using tokio-tungstenite.
  • Added a tonic-based gRPC client transport with a new PipeService proto and JSON ↔ prost_types::Struct conversion helpers.
  • Updated trigger-pipe routing to dispatch external targets based on URL scheme (ws://, wss://, grpc://) and added build/dependency plumbing for proto compilation.

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
src/transport/websocket.rs Implements real WS client functions (ws_fetch_source, ws_send_target, connect_and_stream).
src/transport/mod.rs Exposes the new grpc_client module.
src/transport/grpc_client.rs Adds tonic client + JSON/prost-types conversion helpers + a small unit test.
src/commands/stacker.rs Routes external trigger_pipe targets to WS or gRPC transports based on URL prefix.
proto/pipe.proto Defines PipeService with Send and Subscribe RPCs plus message types.
build.rs Adds tonic-build protobuf compilation for pipe.proto.
Cargo.toml Adds tokio-tungstenite/tonic/prost deps and tonic-build build dependency.
Cargo.lock Locks new dependency graph (including additional versions pulled by new deps).
CLAUDE.md Adds repository guidance / agent workflow documentation.
.claude/settings.local.json Adds local tool-permissions configuration for Claude tooling.
.claude/agents/tester.md Adds a “tester” agent spec for local automation.
.claude/agents/planner.md Adds a “planner” agent spec for local automation.
.claude/agents/code-reviewer.md Adds a “code-reviewer” agent spec for local automation.

Comment thread src/transport/websocket.rs Outdated
Comment on lines +64 to +75
let (_write, mut read) = ws_stream.split();

while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
debug!(len = text.len(), "stream message received");
}
Ok(Message::Ping(_)) => {
debug!("stream ping received");
}
Ok(Message::Close(_)) => {
info!("stream closed by server");
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect_and_stream also drops the write half and only logs pings. Tungstenite does not automatically send Pong responses, so this can cause the server to close an otherwise healthy stream. Keep write and send Message::Pong(...) when receiving Message::Ping(...) (and consider handling Close handshake).

Suggested change
let (_write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
debug!(len = text.len(), "stream message received");
}
Ok(Message::Ping(_)) => {
debug!("stream ping received");
}
Ok(Message::Close(_)) => {
info!("stream closed by server");
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
debug!(len = text.len(), "stream message received");
}
Ok(Message::Ping(payload)) => {
debug!(len = payload.len(), "stream ping received");
if let Err(e) = write.send(Message::Pong(payload)).await {
warn!(error = %e, "failed to send pong");
break;
}
}
Ok(Message::Close(frame)) => {
info!("stream closed by server");
if let Err(e) = write.send(Message::Close(frame)).await {
warn!(error = %e, "failed to acknowledge close frame");
}

Copilot uses AI. Check for mistakes.
Comment on lines +100 to +104
let kind = match value {
Value::Null => Kind::NullValue(0),
Value::Bool(b) => Kind::BoolValue(*b),
Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
Value::String(s) => Kind::StringValue(s.clone()),
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json_to_prost_value converts numbers via as_f64().unwrap_or(0.0), which silently corrupts values it can’t represent (e.g., large integers become 0.0). Prefer failing the conversion with a clear error (propagate Result) or at least preserve intent (e.g., stringify) rather than silently substituting 0.0.

Copilot uses AI. Check for mistakes.
Comment thread src/commands/stacker.rs
Comment on lines +2141 to +2156
if target_value.starts_with("ws://") || target_value.starts_with("wss://") {
crate::transport::websocket::ws_send_target(&target_value, &mapped_data)
.await
.map_err(|e| anyhow::anyhow!(e))
} else if target_value.starts_with("grpc://") {
let grpc_endpoint = target_value.replacen("grpc://", "http://", 1);
crate::transport::grpc_client::grpc_send_target(
&grpc_endpoint,
&data.pipe_instance_id,
"",
&mapped_data,
)
.await
.map_err(|e| anyhow::anyhow!(e))
} else {
send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New ws:// and grpc:// branches in handle_trigger_pipe are not covered by tests, but this file already has trigger_pipe_handler_tests. Add tests that exercise URL routing for WS and gRPC targets (at least verifying the correct transport function is invoked / correct error handling), so regressions are caught.

Copilot uses AI. Check for mistakes.
Comment thread build.rs
Comment on lines +1 to +5
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.build_client(true)
.compile_protos(&["proto/pipe.proto"], &["proto"])?;
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a build script that compiles protos introduces an implicit dependency on protoc being available in build environments (including CI and cross-compiles). Consider using a vendored protoc (e.g., via protoc-bin-vendored) or clearly documenting/enforcing the requirement; otherwise builds can fail unexpectedly.

Copilot uses AI. Check for mistakes.
Comment thread Cargo.toml Outdated
chrono = { version = "0.4", features = ["serde"] }
serde_yaml = "0.9"
futures-util = "0.3"
tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] }
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio-tungstenite = 0.24 introduces a second tokio-tungstenite/tungstenite version alongside Axum’s dependency (Cargo.lock shows 0.28 as well). This increases binary size and can cause subtle type/version mismatches. Prefer aligning your direct dependency version with the one pulled in by axum (or otherwise justify the pin).

Suggested change
tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] }
tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] }

Copilot uses AI. Check for mistakes.
Comment thread src/commands/stacker.rs Outdated
Comment on lines +2147 to +2154
crate::transport::grpc_client::grpc_send_target(
&grpc_endpoint,
&data.pipe_instance_id,
"",
&mapped_data,
)
.await
.map_err(|e| anyhow::anyhow!(e))
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC path always passes an empty step_id to grpc_send_target. If step_id is required for routing/observability on the receiver side, this will break downstream behavior. Consider deriving step_id from existing TriggerPipeCommand fields (or extend the command schema) and fail early if it’s missing rather than sending an empty string.

Suggested change
crate::transport::grpc_client::grpc_send_target(
&grpc_endpoint,
&data.pipe_instance_id,
"",
&mapped_data,
)
.await
.map_err(|e| anyhow::anyhow!(e))
let step_id = data.pipe_instance_id.trim();
if step_id.is_empty() {
Err(anyhow::anyhow!(
"trigger_pipe gRPC target requires a non-empty step_id"
))
} else {
crate::transport::grpc_client::grpc_send_target(
&grpc_endpoint,
&data.pipe_instance_id,
step_id,
&mapped_data,
)
.await
.map_err(|e| anyhow::anyhow!(e))
}

Copilot uses AI. Check for mistakes.
Comment thread src/commands/stacker.rs Outdated
Comment on lines +2145 to +2146
} else if target_value.starts_with("grpc://") {
let grpc_endpoint = target_value.replacen("grpc://", "http://", 1);
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpc:// targets are rewritten to http:// unconditionally, which forces plaintext even though the dependency enables TLS. Consider supporting grpcs:// (map to https://) and/or honoring grpc:// vs grpcs:// so operators can choose secure defaults.

Suggested change
} else if target_value.starts_with("grpc://") {
let grpc_endpoint = target_value.replacen("grpc://", "http://", 1);
} else if target_value.starts_with("grpc://")
|| target_value.starts_with("grpcs://")
{
let grpc_endpoint = if target_value.starts_with("grpcs://") {
target_value.replacen("grpcs://", "https://", 1)
} else {
target_value.replacen("grpc://", "http://", 1)
};

Copilot uses AI. Check for mistakes.
Comment thread build.rs
Comment on lines +1 to +5
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.build_client(true)
.compile_protos(&["proto/pipe.proto"], &["proto"])?;
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build.rs doesn’t emit any cargo:rerun-if-changed=... directives. This causes Cargo to rerun the build script more often than necessary, and makes incremental builds slower. Add println!("cargo:rerun-if-changed=proto/pipe.proto"); (and for any imported protos if applicable).

Copilot uses AI. Check for mistakes.
Comment thread .claude/settings.local.json Outdated
Comment on lines +3 to +11
"allow": [
"Bash(cargo test:*)",
"Bash(cargo build:*)",
"Bash(gh pr:*)",
"Bash(gh api:*)",
"Bash(cargo clippy:*)",
"Bash(cargo update:*)",
"Bash(cargo audit:*)"
]
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.claude/settings.local.json appears to be a local, developer-specific tool-permissions file. Committing it can unintentionally standardize local permissions or leak workflow assumptions. Consider removing it from version control and adding .claude/ (or at least settings.local.json) to .gitignore, or rename to a non-local template (e.g., settings.json.example).

Suggested change
"allow": [
"Bash(cargo test:*)",
"Bash(cargo build:*)",
"Bash(gh pr:*)",
"Bash(gh api:*)",
"Bash(cargo clippy:*)",
"Bash(cargo update:*)",
"Bash(cargo audit:*)"
]
"allow": []

Copilot uses AI. Check for mistakes.
Comment thread src/transport/websocket.rs Outdated
Comment on lines +14 to +32
let (_write, mut read) = ws_stream.split();

match read.next().await {
Some(Ok(Message::Text(text))) => {
debug!(len = text.len(), "ws_fetch_source: received text");
serde_json::from_str::<Value>(&text)
.with_context(|| "ws_fetch_source: failed to parse JSON")
}
Some(Ok(Message::Binary(bin))) => {
debug!(len = bin.len(), "ws_fetch_source: received binary");
serde_json::from_slice::<Value>(&bin)
.with_context(|| "ws_fetch_source: failed to parse binary JSON")
}
Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })),
Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")),
None => Err(anyhow::anyhow!(
"ws_fetch_source: stream closed without data"
)),
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ws_fetch_source drops the write half of the socket (let (_write, mut read) = ...), so it cannot respond to Ping frames. If the server sends a Ping before the first data message (common with some gateways), the connection can be closed and this function may return a raw ping/close instead of the expected payload. Keep the write half and explicitly reply with Pong (and ignore/continue on Ping/other control frames until a data message arrives).

Suggested change
let (_write, mut read) = ws_stream.split();
match read.next().await {
Some(Ok(Message::Text(text))) => {
debug!(len = text.len(), "ws_fetch_source: received text");
serde_json::from_str::<Value>(&text)
.with_context(|| "ws_fetch_source: failed to parse JSON")
}
Some(Ok(Message::Binary(bin))) => {
debug!(len = bin.len(), "ws_fetch_source: received binary");
serde_json::from_slice::<Value>(&bin)
.with_context(|| "ws_fetch_source: failed to parse binary JSON")
}
Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })),
Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")),
None => Err(anyhow::anyhow!(
"ws_fetch_source: stream closed without data"
)),
}
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
debug!(len = text.len(), "ws_fetch_source: received text");
return serde_json::from_str::<Value>(&text)
.with_context(|| "ws_fetch_source: failed to parse JSON");
}
Ok(Message::Binary(bin)) => {
debug!(len = bin.len(), "ws_fetch_source: received binary");
return serde_json::from_slice::<Value>(&bin)
.with_context(|| "ws_fetch_source: failed to parse binary JSON");
}
Ok(Message::Ping(payload)) => {
debug!(len = payload.len(), "ws_fetch_source: received ping");
write
.send(Message::Pong(payload))
.await
.with_context(|| "ws_fetch_source: failed to send pong")?;
}
Ok(Message::Pong(payload)) => {
debug!(len = payload.len(), "ws_fetch_source: received pong");
}
Ok(Message::Close(frame)) => {
return Err(anyhow::anyhow!(
"ws_fetch_source: stream closed before data: {:?}",
frame
));
}
Ok(other) => {
debug!(message = %other.to_string(), "ws_fetch_source: ignoring non-data frame");
}
Err(e) => return Err(anyhow::anyhow!("ws_fetch_source read error: {e}")),
}
}
Err(anyhow::anyhow!(
"ws_fetch_source: stream closed without data"
))

Copilot uses AI. Check for mistakes.
vsilent and others added 7 commits April 17, 2026 11:36
… tool-permissions file. Committing it can unintentionally standardize local permissions or leak workflow assumptions. Consider removing it from version control and adding .claude/ (or at least settings.local.json) to .gitignore, or rename to a non-local template. Fix
@vsilent vsilent merged commit fb04793 into trydirect:master Apr 18, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants