diff --git a/modules/meteroid/tests/integration/e2e.rs b/modules/meteroid/tests/integration/e2e.rs index bde473566..53ef9e15b 100644 --- a/modules/meteroid/tests/integration/e2e.rs +++ b/modules/meteroid/tests/integration/e2e.rs @@ -58,14 +58,14 @@ async fn test_metering_e2e() { let postgres_connection_string = meteroid_it::container::create_test_database().await; - let (_kafka_container, kafka_port) = metering_it::container::start_kafka() + let kafka_setup = metering_it::container::start_kafka() .await .expect("Could not start kafka"); let (_clickhouse_container, clickhouse_http_port, clickhouse_tcp_port) = metering_it::container::start_clickhouse().await; - metering_it::kafka::create_topic(kafka_port, "meteroid-events-raw") + metering_it::kafka::create_topic(kafka_setup.port, "meteroid-events-raw") .await .expect("Could not create topic"); @@ -79,7 +79,8 @@ async fn test_metering_e2e() { metering_port, clickhouse_http_port, clickhouse_tcp_port, - kafka_port, + kafka_setup.port, + kafka_setup.internal_addr, "meteroid-events-raw".to_string(), "meteroid-events-preprocessed".to_string(), ); diff --git a/modules/meteroid/tests/integration/metering_it/config.rs b/modules/meteroid/tests/integration/metering_it/config.rs index b710520e1..8d8e9f8ad 100644 --- a/modules/meteroid/tests/integration/metering_it/config.rs +++ b/modules/meteroid/tests/integration/metering_it/config.rs @@ -13,6 +13,7 @@ pub fn mocked_config( clickhouse_http_port: HttpPort, clickhouse_tcp_port: TcpPort, kafka_port: u16, + kafka_internal_addr: String, kafka_raw_topic: String, kafka_preprocessed_topic: String, ) -> Config { @@ -41,7 +42,7 @@ pub fn mocked_config( sasl_username: None, sasl_password: None, }, - kafka_internal_addr: format!("it_redpanda:{}", 29092), + kafka_internal_addr, kafka_raw_topic, kafka_preprocessed_topic, kafka_producer_linger_ms: 5, diff --git a/modules/meteroid/tests/integration/metering_it/container.rs b/modules/meteroid/tests/integration/metering_it/container.rs index e226f05ce..80e8ee73d 100644 --- a/modules/meteroid/tests/integration/metering_it/container.rs +++ b/modules/meteroid/tests/integration/metering_it/container.rs @@ -1,7 +1,8 @@ +use backon::{ConstantBuilder, Retryable}; use std::time::Duration; use testcontainers::core::{IntoContainerPort, WaitFor}; use testcontainers::runners::AsyncRunner; -use testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use testcontainers::{ContainerAsync, GenericImage, ImageExt, TestcontainersError}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::transport::Channel; @@ -63,25 +64,41 @@ impl Drop for MeteringSetup { } pub async fn start_clickhouse() -> (ContainerAsync, HttpPort, TcpPort) { - let local_http = free_local_port().expect("Could not get free http port"); let internal_http_port = 8123; - - let local_tcp = free_local_port().expect("Could not get free tcp port"); let internal_tcp_port = 9000; - - let container = GenericImage::new(clickhouse::CONTAINER_NAME, clickhouse::CONTAINER_VERSION) - .with_exposed_port(local_http.tcp()) - .with_exposed_port(local_tcp.tcp()) - .with_mapped_port(local_http, internal_http_port.tcp()) - .with_mapped_port(local_tcp, internal_tcp_port.tcp()) - .with_env_var("CLICKHOUSE_DB", "meteroid") - .with_env_var("CLICKHOUSE_USER", "default") - .with_env_var("CLICKHOUSE_PASSWORD", "default") - .with_container_name("it_clickhouse") - .with_network("meteroid_net") - .start() - .await - .unwrap(); + let suffix = uuid::Uuid::now_v7(); + + let container = (|| async { + let local_http = free_local_port().expect("Could not get free http port"); + let local_tcp = free_local_port().expect("Could not get free tcp port"); + + GenericImage::new(clickhouse::CONTAINER_NAME, clickhouse::CONTAINER_VERSION) + .with_exposed_port(local_http.tcp()) + .with_exposed_port(local_tcp.tcp()) + .with_mapped_port(local_http, internal_http_port.tcp()) + .with_mapped_port(local_tcp, internal_tcp_port.tcp()) + .with_env_var("CLICKHOUSE_DB", "meteroid") + .with_env_var("CLICKHOUSE_USER", "default") + .with_env_var("CLICKHOUSE_PASSWORD", "default") + .with_container_name(format!("it_clickhouse_{suffix}")) + .with_network(format!("meteroid_net_{suffix}")) + .start() + .await + }) + .retry( + ConstantBuilder::default() + .with_delay(Duration::from_secs(1)) + .with_max_times(3), + ) + .notify(|err: &TestcontainersError, dur: Duration| { + log::warn!( + "Retrying clickhouse container start after {:?}: {:?}", + dur, + err + ); + }) + .await + .unwrap(); let http_port = HttpPort( container @@ -106,43 +123,66 @@ pub async fn start_clickhouse() -> (ContainerAsync, HttpPort, TcpP (container, http_port, tcp_port) } -pub async fn start_kafka() -> anyhow::Result<(ContainerAsync, u16)> { - let kafka_port = free_local_port().expect("Could not get free port"); - let args = [ - "redpanda", - "start", - "--overprovisioned", - "--smp", - "1", - "--memory", - "512M", - "--reserve-memory=0M", - "--node-id=1", - "--check=false", - "--kafka-addr=INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092", - format!( - "--advertise-kafka-addr=INTERNAL://it_redpanda:29092,EXTERNAL://localhost:{kafka_port}" - ) - .as_str(), - "--set", - "redpanda.disable_metrics=true", - "--set", - "redpanda.enable_admin_api=false", - "--set", - "redpanda.developer_mode=true", - ] - .map(String::from); - - let container = GenericImage::new(CONTAINER_NAME, CONTAINER_VERSION) - .with_wait_for(WaitFor::message_on_stderr("Successfully started Redpanda!")) - .with_mapped_port(kafka_port, 9092_u16.tcp()) - .with_container_name("it_redpanda") - .with_network("meteroid_net") - .with_cmd(args) - .start() - .await?; +pub struct KafkaSetup { + pub container: ContainerAsync, + pub port: u16, + pub internal_addr: String, +} + +pub async fn start_kafka() -> anyhow::Result { + let suffix = uuid::Uuid::now_v7(); + let container_name = format!("it_redpanda_{suffix}"); + let network_name = format!("meteroid_net_{suffix}"); + let internal_addr = format!("{container_name}:29092"); + + let container = (|| async { + let kafka_port = free_local_port().expect("Could not get free port"); + + let args = [ + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "512M", + "--reserve-memory=0M", + "--node-id=1", + "--check=false", + "--kafka-addr=INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092", + format!( + "--advertise-kafka-addr=INTERNAL://{container_name}:29092,EXTERNAL://localhost:{kafka_port}" + ) + .as_str(), + "--set", + "redpanda.disable_metrics=true", + "--set", + "redpanda.enable_admin_api=false", + "--set", + "redpanda.developer_mode=true", + ] + .map(String::from); + + GenericImage::new(CONTAINER_NAME, CONTAINER_VERSION) + .with_wait_for(WaitFor::message_on_stderr("Successfully started Redpanda!")) + .with_mapped_port(kafka_port, 9092_u16.tcp()) + .with_container_name(container_name.clone()) + .with_network(network_name.clone()) + .with_cmd(args) + .start() + .await + }) + .retry(ConstantBuilder::default().with_delay(Duration::from_secs(1)).with_max_times(3)) + .notify(|err: &TestcontainersError, dur: Duration| { + log::warn!("Retrying redpanda container start after {:?}: {:?}", dur, err); + }) + .await?; let port = container.get_host_port_ipv4(9092).await?; - Ok((container, port)) + Ok(KafkaSetup { + container, + port, + internal_addr, + }) } diff --git a/modules/meteroid/tests/integration/test_metering_ingestion.rs b/modules/meteroid/tests/integration/test_metering_ingestion.rs index 3fb43cbef..108c8416e 100644 --- a/modules/meteroid/tests/integration/test_metering_ingestion.rs +++ b/modules/meteroid/tests/integration/test_metering_ingestion.rs @@ -30,14 +30,14 @@ async fn test_metering_ingestion() { let postgres_connection_string = meteroid_it::container::create_test_database().await; - let (_kafka_container, kafka_port) = metering_it::container::start_kafka() + let kafka_setup = metering_it::container::start_kafka() .await .expect("Could not start kafka"); let (_clickhouse_container, ch_http_port, ch_tcp_port) = metering_it::container::start_clickhouse().await; - metering_it::kafka::create_topic(kafka_port, "meteroid-events-raw") + metering_it::kafka::create_topic(kafka_setup.port, "meteroid-events-raw") .await .expect("Could not create topic"); @@ -51,7 +51,8 @@ async fn test_metering_ingestion() { metering_port, ch_http_port, ch_tcp_port, - kafka_port, + kafka_setup.port, + kafka_setup.internal_addr, "meteroid-events-raw".to_string(), "meteroid-events-preprocessed".to_string(), );