Purpose: Reference for implementing bidirectional connectors
Dual Runtime Support:
- Tokio (
std): Servers, edge, cloud - Embassy (
no_std): Embedded MCUs
Reference Implementation: MQTT connector (aimdb-mqtt-connector/)
pub trait ConnectorBuilder<R: aimdb_executor::Spawn + 'static>: Send + Sync {
fn build<'a>(&'a self, db: &'a AimDb<R>)
-> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
fn scheme(&self) -> &str; // e.g., "mqtt", "kafka", "modbus"
}pub trait Connector: Send + Sync {
fn publish(&self, destination: &str, config: &ConnectorConfig, payload: &[u8])
-> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>>;
}Sequence (in ConnectorBuilder::build()):
- Collect inbound routes:
db.collect_inbound_routes(scheme) - Build router:
RouterBuilder::from_routes(inbound_routes).build() - Create connector instance
- Collect outbound routes:
db.collect_outbound_routes(scheme) - Spawn outbound publishers:
connector.spawn_outbound_publishers(db, outbound_routes) - Spawn background tasks (connection management, inbound event loop)
- Return
Arc<dyn Connector>
Critical: Must call spawn_outbound_publishers() or outbound data won't flow.
Outbound (AimDB → External):
Record → Consumer → Serializer → Connector.publish() → External
Inbound (External → AimDB):
External → Event Handler → Router.route() → Producer → Record
Required implementation:
impl MyConnector {
fn spawn_outbound_publishers<R: aimdb_executor::Spawn + 'static>(
&self,
db: &AimDb<R>,
routes: Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>,
) -> DbResult<()> {
for (destination, consumer, serializer, _config) in routes {
let connector_clone = self.clone();
db.runtime().spawn(async move {
let mut reader = consumer.subscribe_any().await.unwrap();
while let Ok(value_any) = reader.recv_any().await {
let bytes = serializer(&*value_any).unwrap();
connector_clone.publish(&destination, &Default::default(), &bytes).await.ok();
}
})?;
}
Ok(())
}
}let db = AimDbBuilder::new()
.runtime(runtime)
.with_connector(MyConnectorBuilder::new("proto://host:port"))
.configure::<Temperature>(|reg| {
reg.link_to("proto://sensor/temp")
.with_serializer(|t| serde_json::to_vec(t).unwrap());
})
.configure::<Command>(|reg| {
reg.link_from("proto://cmd/device")
.with_deserializer(|b| serde_json::from_slice(b).unwrap())
.with_buffer(BufferCfg::SingleLatest);
})
.build().await?;Place in spawned background task, not in publish():
db.runtime().spawn(async move {
loop {
match connect_and_run(&url).await {
Ok(_) => break,
Err(e) => {
eprintln!("Connection failed: {e:?}, retrying...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
});Use router for automatic dispatch:
// Subscribe to all router topics
let topics = router.resource_ids();
client.subscribe(&topics).await?;
// Route incoming messages
router.route(topic, payload).await?;Extract from ConnectorConfig:
fn publish(&self, dest: &str, config: &ConnectorConfig, payload: &[u8]) -> ... {
let qos = config.qos.unwrap_or(0);
let retain = config.retain.unwrap_or(false);
client.publish(dest, qos, retain, payload).await
}Dependencies:
[features]
tokio-runtime = ["std", "tokio", "protocol-client-crate"]
[dependencies]
tokio = { workspace = true, optional = true }
# Add protocol-specific client libraryKey patterns:
- Use
stdtypes:std::sync::Arc,std::string::String - Spawn:
tokio::spawn(async move { ... }) - Logging:
tracing::{info, warn, error} - Async client libraries (e.g.,
rumqttc)
See: aimdb-mqtt-connector/ for complete Tokio implementation
Dependencies:
[features]
embassy-runtime = ["aimdb-core/alloc", "embassy-net", "embassy-sync"]
[dependencies]
embassy-net = { workspace = true, optional = true }
embassy-sync = { workspace = true, optional = true }
static-cell = "2.0"Key patterns:
- Use
alloctypes:alloc::sync::Arc,alloc::string::String - Wrap futures:
SendFutureWrapper(async move { ... }) - Static allocation:
StaticCell<T> - Logging:
defmt::info!()(behind#[cfg(feature = "defmt")]) - Network access:
R: EmbassyNetworktrait bound - Unsafe
Send + Syncimpls for single-threaded safety
SendFutureWrapper helper:
struct SendFutureWrapper<F>(F);
unsafe impl<F> Send for SendFutureWrapper<F> {}
impl<F: Future> Future for SendFutureWrapper<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>)
-> core::task::Poll<Self::Output>
{
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll(cx) }
}
}See: examples/embassy-mqtt-connector-demo/ for Embassy patterns
Missing outbound publishers:
// ❌ Wrong - no data flows out
connector.build(db).await
// ✅ Correct - spawn publishers
let routes = db.collect_outbound_routes(scheme);
connector.spawn_outbound_publishers(db, routes)?;Not using Router:
// ❌ Manual routing
if topic == "sensor/temp" { temp_producer.send(data).await; }
// ✅ Router handles it
router.route(topic, data).await?;Embassy lifetime issues:
// ❌ Stack allocation
let channel = Channel::new();
// ✅ Static allocation
static CH: StaticCell<Channel<...>> = StaticCell::new();
let ch = CH.init(Channel::new());Missing Send wrapper (Embassy):
// ❌ Not Send
Box::pin(async move { ... })
// ✅ Send-wrapped
Box::pin(SendFutureWrapper(async move { ... }))MQTT Connector: aimdb-mqtt-connector/ - Complete production reference
Working Examples:
examples/tokio-mqtt-connector-demo/- Tokio runtimeexamples/embassy-mqtt-connector-demo/- Embassy runtime
Documentation:
Note: Always refer to the MQTT connector implementation in aimdb-mqtt-connector/ for complete, tested patterns. It demonstrates all the concepts in this guide for both Tokio and Embassy runtimes.
✅ Do: Same user-facing API
// Works with both Tokio and Embassy
.with_connector(MyConnectorBuilder::new("url"))❌ Don't: Different APIs per runtime
// Bad - different APIs
.with_tokio_connector(...)
.with_embassy_connector(...)Reconnection logic belongs in the spawned event loop task, not in publish().
✅ Do: Reconnect in background task
// In spawned background task
tokio::spawn(async move {
loop {
match connect_and_run(&broker_url).await {
Ok(_) => { /* Connection closed gracefully */ }
Err(e) => {
eprintln!("Connection failed: {:?}, reconnecting...", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
});❌ Don't: Block publish() waiting for reconnection
fn publish(...) {
// Bad - blocks all publishers
if !self.connected {
reconnect().await?;
}
client.publish(...).await
}Why? The background task maintains the connection continuously. The publish() method should fail fast if disconnected, letting the application decide how to handle it.
// Automatically routes to correct producer
router.route(topic, payload).await?;let topics = router.resource_ids();
for topic in &topics {
client.subscribe(topic).await?;
}// Convert protocol errors to PublishError
.map_err(|e| {
eprintln!("Protocol error: {:?}", e);
PublishError::ConnectionFailed
})Always implement and call spawn_outbound_publishers() in ConnectorBuilder::build():
✅ Do: Spawn publishers explicitly
impl ConnectorBuilder for MyConnectorBuilder {
fn build<R: RuntimeAdapter + 'static>(&self, db: &AimDb<R>) -> DbResult<Arc<dyn Connector>> {
// ... setup code ...
let connector = MyConnector { /* fields */ };
// Collect and spawn outbound routes
let outbound_routes = db.collect_outbound_routes(self.protocol_name());
connector.spawn_outbound_publishers(db, outbound_routes)?;
Ok(Arc::new(connector))
}
}❌ Don't: Forget to spawn outbound publishers
impl ConnectorBuilder for MyConnectorBuilder {
fn build<R: RuntimeAdapter + 'static>(&self, db: &AimDb<R>) -> DbResult<Arc<dyn Connector>> {
// ... setup code ...
// Missing: No outbound publisher spawning!
Ok(Arc::new(MyConnector { /* fields */ }))
}
}Why? Outbound publishers consume from AimDB records and publish to external systems. Without explicit spawning, records configured with .link_to() won't actually send data.
Tokio (std):
use tracing::{info, warn, error};
info!("Connected to broker");Embassy (no_std):
#[cfg(feature = "defmt")]
defmt::info!("Connected to broker");// Good - uses StaticCell
static CHANNEL: StaticCell<Channel<...>> = StaticCell::new();
// Bad - stack allocation won't work
let channel = Channel::new(); // Doesn't live long enoughPass through ConnectorConfig to support protocol-specific options:
impl Connector for MyConnectorImpl {
fn publish(
&self,
destination: &str,
config: &ConnectorConfig, // ← Use this!
payload: &[u8],
) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
// Extract QoS, retain, timeout, etc.
let qos = config.qos.unwrap_or(0);
let retain = config.retain.unwrap_or(false);
// Pass to protocol client
client.publish(destination, qos, retain, payload).await
}
}Users configure it per link:
.link_to("mqtt://sensors/temp")
.with_config(ConnectorConfig {
qos: Some(2), // Exactly-once delivery
retain: Some(true), // Keep last message
})- Create crate with
tokio-runtimeandembassy-runtimefeatures - Implement
ConnectorBuilder<R>trait withbuild()andscheme() - Implement
Connectortrait withpublish() - In
build(): Collect inbound routes viadb.collect_inbound_routes(scheme) - In
build(): BuildRouterfrom inbound routes - In
build(): Create protocol client instance - In
build(): Spawn background task for connection management - In
build(): Spawn inbound event router task - In
build(): Subscribe client to all router topics - In
build(): Collect outbound routes viadb.collect_outbound_routes(scheme) - In
build(): Callspawn_outbound_publishers(db, outbound_routes) - Implement
spawn_outbound_publishers()method - Implement reconnection logic in background task
- Add comprehensive error handling and logging
- Test cross-compilation for embedded targets (if supporting Embassy)
❌ Wrong:
// Task is never spawned - connector won't work!
async fn background_task() { /* ... */ }✅ Correct:
db.runtime().spawn(SendFutureWrapper(async move {
background_task().await
}));❌ Wrong:
// Manually finding producers - doesn't scale
if topic == "sensor/temp" {
temp_producer.produce(payload).await?;
}✅ Correct:
// Router automatically finds the right producer
router.route(topic, payload).await?;❌ Wrong:
Box::pin(async move { /* ... */ })
// Error: future is not `Send`✅ Correct:
Box::pin(SendFutureWrapper(async move { /* ... */ }))❌ Wrong:
let channel = Channel::new(); // Doesn't live long enough✅ Correct:
static CHANNEL: StaticCell<Channel<...>> = StaticCell::new();
let channel = CHANNEL.init(Channel::new());❌ Wrong:
impl ConnectorBuilder for MyConnectorBuilder {
fn build<R>(&self, db: &AimDb<R>) -> DbResult<Arc<dyn Connector>> {
let connector = MyConnector { /* ... */ };
// Missing spawn_outbound_publishers!
Ok(Arc::new(connector))
}
}
// Result: Records linked with .link_to() don't publish data✅ Correct:
impl ConnectorBuilder for MyConnectorBuilder {
fn build<R>(&self, db: &AimDb<R>) -> DbResult<Arc<dyn Connector>> {
let connector = MyConnector { /* ... */ };
// Always spawn outbound publishers
let routes = db.collect_outbound_routes(self.protocol_name());
connector.spawn_outbound_publishers(db, routes)?;
Ok(Arc::new(connector))
}
}Symptom: Inbound messages work (external → AimDB), but outbound messages fail silently (AimDB → external). Records configured with .link_to("mqtt", "topic") don't send data.
Why? The ConsumerTrait-based outbound routing requires explicit spawning. Unlike inbound routing (which works via Router), outbound publishers must be spawned during connector build.
- MQTT Connector - Complete reference implementation
- Tokio MQTT Example
- Embassy MQTT Example
For questions or contributions:
- GitHub Issues: https://github.com/aimdb-dev/aimdb/issues
- Discussions: https://github.com/aimdb-dev/aimdb/discussions
- Examples: See
examples/directory
Happy Connector Building! 🚀
The MQTT connector implementation serves as the reference for all patterns described in this guide. When in doubt, refer to aimdb-mqtt-connector/ for a complete, production-ready example.