Skip to content

Commit 6fb49f2

Browse files
committed
Storage daemon starts
1 parent 8f0203e commit 6fb49f2

2 files changed

Lines changed: 126 additions & 61 deletions

File tree

server/src/cli/cli.rs

Lines changed: 107 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ fn check_process_status_by_port(process_name: &str, port: u16) -> bool { // Re-a
9191
false
9292
}
9393

94-
// Define the StorageConfig struct to mirror storage_config.yaml
94+
// Define the StorageConfig struct to mirror the content under 'storage:' in storage_config.yaml.
95+
// This struct is now the inner representation of the storage configuration.
9596
#[derive(Debug, Deserialize)]
9697
pub struct StorageConfig {
9798
pub data_directory: String,
@@ -103,7 +104,15 @@ pub struct StorageConfig {
103104
pub use_raft_for_scale: bool,
104105
}
105106

107+
// Define a wrapper struct to match the 'storage:' key in the YAML config.
108+
// This will be used by the CLI to correctly parse the YAML.
109+
#[derive(Debug, Deserialize)]
110+
struct StorageConfigWrapper {
111+
storage: StorageConfig,
112+
}
113+
106114
/// Loads the Storage daemon configuration from `storage_daemon_server/storage_config.yaml`.
115+
/// This function now correctly handles the top-level `storage:` key.
107116
pub fn load_storage_config(config_file_path: Option<PathBuf>) -> Result<StorageConfig, anyhow::Error> {
108117
let default_config_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
109118
.parent() // Go up to the workspace root of the server crate
@@ -116,10 +125,11 @@ pub fn load_storage_config(config_file_path: Option<PathBuf>) -> Result<StorageC
116125
let config_content = fs::read_to_string(&path_to_use)
117126
.map_err(|e| anyhow::anyhow!("Failed to read storage config file {}: {}", path_to_use.display(), e))?;
118127

119-
let config: StorageConfig = serde_yaml::from_str(&config_content)
128+
// Parse into the wrapper struct which correctly handles the 'storage:' key
129+
let wrapper: StorageConfigWrapper = serde_yaml::from_str(&config_content)
120130
.map_err(|e| anyhow::anyhow!("Failed to parse storage config file {}: {}", path_to_use.display(), e))?;
121131

122-
Ok(config)
132+
Ok(wrapper.storage) // Return the inner StorageConfig
123133
}
124134

125135

@@ -550,17 +560,21 @@ async fn handle_command(
550560
let wait_timeout = Duration::from_secs(3);
551561
let poll_interval = Duration::from_millis(100);
552562
let mut port_freed = false;
553-
while start_time.elapsed() < wait_timeout {
554-
match std::net::TcpListener::bind(&addr) {
555-
Ok(_) => {
556-
port_freed = true;
557-
break;
558-
}
559-
Err(_) => {
560-
std::thread::sleep(poll_interval);
563+
// Wrap the port freeing logic in a block_on to allow async sleep
564+
tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime for port check").block_on(async {
565+
while start_time.elapsed() < wait_timeout {
566+
match tokio::net::TcpListener::bind(&addr).await { // Added .await here
567+
Ok(_) => {
568+
port_freed = true;
569+
break;
570+
}
571+
Err(_) => {
572+
tokio::time::sleep(poll_interval).await;
573+
}
561574
}
562575
}
563-
}
576+
});
577+
564578
if !port_freed {
565579
eprintln!("Failed to free up port {} after killing processes. Try again.", rest_port);
566580
return;
@@ -570,27 +584,27 @@ async fn handle_command(
570584

571585
// Daemonize REST API server
572586
let mut daemonize_builder = DaemonizeBuilder::new()
573-
.working_directory("/tmp") // This is acceptable as a temporary working directory for the daemon process, not direct file access for data storage
574-
.umask(0o027)
575-
.process_name(&format!("rest-api-{}", rest_port))
576-
.host("127.0.0.1")
577-
.port(rest_port)
578-
.skip_ports(vec![]); // REST API server should bind this port
587+
.working_directory("/tmp") // This is acceptable as a temporary working directory for the daemon process, not direct file access for data storage
588+
.umask(0o027)
589+
.process_name(&format!("rest-api-{}", rest_port))
590+
.host("127.0.0.1")
591+
.port(rest_port)
592+
.skip_ports(vec![]); // REST API server should bind this port
579593

580594
match daemonize_builder.fork_only() {
581595
Ok(child_pid) => {
582596
if child_pid == 0 {
583-
// In REST API child daemon process
597+
// In REST API child daemon: RUN THE REST API SERVER!
584598
let child_rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime for REST API daemon child");
585-
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); // Dummy channel for daemon child
599+
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
586600
let result = child_rt.block_on(start_rest_server(rest_port, shutdown_rx, "".to_string()));
587601
if let Err(e) = result {
588602
eprintln!("REST API server failed: {:?}", e);
589-
std::process::exit(1);
603+
std::process::exit(1); // Exit child process on failure
590604
}
591-
std::process::exit(0);
605+
std::process::exit(0); // Exit child process on success
592606
} else {
593-
// In parent CLI process
607+
println!("REST API server daemonized with PID {}", child_pid);
594608
let (tx, rx) = oneshot::channel();
595609
*rest_tx_guard = Some(tx);
596610
*rest_api_port_guard = Some(rest_port); // Store the actual port it's running on
@@ -599,15 +613,13 @@ async fn handle_command(
599613
let _ = rx.await; // This will just wait indefinitely unless tx is dropped
600614
});
601615
*rest_handle_guard = Some(handle);
602-
603-
println!("REST API server daemonized with PID {}", child_pid);
604616
}
605617
}
606618
Err(e) => {
607619
eprintln!("Failed to daemonize REST API server: {:?}", e);
608620
}
609621
}
610-
}
622+
} // This closing brace correctly closes the CommandType::RestApiStart match arm
611623
CommandType::RestApiStop => {
612624
let mut rest_tx_guard = rest_api_shutdown_tx_opt.lock().await;
613625
let mut rest_handle_guard = rest_api_handle.lock().await;
@@ -730,7 +742,7 @@ fn print_help() {
730742
println!(" rest health - (Use HTTP client) Check REST API health.");
731743
println!(" rest version - (Use HTTP client) Get REST API version.");
732744
println!(" rest register-user - (Use HTTP client) Register a new user via REST API.");
733-
println!(" rest authenticate - (Use HTTP client) Authenticate a user via REST API.");
745+
println!(" rest authenticate - (Use HTTP client) Authenticate a user via REST API.");
734746
println!(" rest graph-query - (Use HTTP client) Send a graph query to the REST API.");
735747
println!(" rest storage-query - (Use HTTP client) Send a storage query to the REST API.");
736748
println!("");
@@ -747,7 +759,7 @@ fn print_help() {
747759
async fn main_loop() {
748760
let daemon_handles: Arc<Mutex<HashMap<u16, (tokio::task::JoinHandle<()>, oneshot::Sender<()>)>>> = Arc::new(Mutex::new(HashMap::new()));
749761
let rest_api_shutdown_tx_opt: Arc<Mutex<Option<oneshot::Sender<()>>>> = Arc::new(Mutex::new(None));
750-
let rest_api_port_arc: Arc<Mutex<Option<u16>>> = Arc::new(Mutex::new(None)); // Tracks the port if REST API is running
762+
let rest_api_port_arc: Arc<Mutex<Option<u16>>> = Arc::new(Mutex::<Option<u16>>::new(None)); // Tracks the port if REST API is running
751763
let rest_api_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> = Arc::new(Mutex::new(None));
752764

753765

@@ -801,7 +813,7 @@ async fn main_loop() {
801813

802814
// Signal the spawned task (if it's still waiting)
803815
if let Some(tx) = rest_tx_guard.take() {
804-
let _ = tx.send(());
816+
let _ = tx.send(()); // Signal the handle if it's waiting
805817
}
806818

807819
// Attempt to kill the process directly using lsof/kill
@@ -1013,17 +1025,22 @@ pub fn start_cli() {
10131025
let wait_timeout = Duration::from_secs(3);
10141026
let poll_interval = Duration::from_millis(100);
10151027
let mut port_freed = false;
1016-
while start_time.elapsed() < wait_timeout {
1017-
match std::net::TcpListener::bind(&addr) {
1018-
Ok(_) => {
1019-
port_freed = true;
1020-
break;
1021-
}
1022-
Err(_) => {
1023-
std::thread::sleep(poll_interval);
1028+
// Wrap the port freeing logic in a block_on to allow async sleep
1029+
rt.block_on(async { // Moved this block_on to wrap the entire port freeing logic
1030+
while start_time.elapsed() < wait_timeout {
1031+
match tokio::net::TcpListener::bind(&addr).await { // Added .await here
1032+
Ok(_) => {
1033+
port_freed = true;
1034+
break;
1035+
}
1036+
Err(_) => {
1037+
tokio::time::sleep(poll_interval).await;
1038+
}
10241039
}
10251040
}
1026-
}
1041+
});
1042+
1043+
10271044
if !port_freed {
10281045
eprintln!("Failed to free up port {} after killing processes. Try again.", rest_port);
10291046
rest_api_status_msg = format!("Failed to free up port {}.", rest_port);
@@ -1098,22 +1115,28 @@ pub fn start_cli() {
10981115
let wait_timeout = Duration::from_secs(3);
10991116
let poll_interval = Duration::from_millis(100);
11001117
let mut port_freed = false;
1101-
while start_time.elapsed() < wait_timeout {
1102-
match std::net::TcpListener::bind(&addr) {
1103-
Ok(_) => {
1104-
port_freed = true;
1105-
break;
1106-
}
1107-
Err(_) => {
1108-
std::thread::sleep(poll_interval);
1118+
// Wrap the port freeing logic in a block_on to allow async sleep
1119+
rt.block_on(async { // Moved this block_on to wrap the entire port freeing logic
1120+
while start_time.elapsed() < wait_timeout {
1121+
match tokio::net::TcpListener::bind(&addr).await { // Added .await here
1122+
Ok(_) => {
1123+
port_freed = true;
1124+
break;
1125+
}
1126+
Err(_) => {
1127+
tokio::time::sleep(poll_interval).await;
1128+
}
11091129
}
11101130
}
1111-
}
1131+
});
1132+
11121133
if !port_freed {
11131134
eprintln!("Failed to free up storage port {} after killing processes. Try again.", s_port);
11141135
storage_status_msg = format!("Failed to free up port {}.", s_port);
11151136
} else {
11161137
println!("Starting Storage daemon on port {}...", s_port);
1138+
// The CLI's load_storage_config is for displaying metrics, not for the daemon's actual startup.
1139+
// We attempt to load it here to provide immediate feedback on the config file's format.
11171140
let loaded_storage_config = load_storage_config(storage_config_file.clone());
11181141
match loaded_storage_config {
11191142
Ok(cfg) => {
@@ -1128,7 +1151,8 @@ pub fn start_cli() {
11281151
println!(" Use Raft for Scale: {}", cfg.use_raft_for_scale);
11291152
}
11301153
Err(e) => {
1131-
eprintln!("Error loading storage config: {:?}", e);
1154+
eprintln!("Error loading storage config for CLI display: {:?}", e);
1155+
// Do not exit, allow daemonization to proceed, as the daemon will load its own config.
11321156
}
11331157
}
11341158

@@ -1143,7 +1167,7 @@ pub fn start_cli() {
11431167

11441168
// Daemonize Storage daemon
11451169
let mut daemonize_builder = DaemonizeBuilder::new()
1146-
.working_directory("/tmp") // Temporary working directory for the daemon process
1170+
.working_directory("/tmp") // Temporary working directory for the daemon process, not direct file access for data storage
11471171
.umask(0o027)
11481172
.process_name(&format!("storage-daemon-{}", s_port))
11491173
.host("127.0.0.1")
@@ -1156,15 +1180,45 @@ pub fn start_cli() {
11561180
// In Storage daemon child process: RUN THE STORAGE SERVER!
11571181
let child_rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime for Storage daemon child");
11581182
// The run_storage_daemon function handles its own Ctrl+C listener and shutdown channel.
1159-
let result = child_rt.block_on(start_storage_server(Some(s_port), actual_storage_config_path)); // Corrected arguments
1183+
// It expects `Option<u16>` for port and `PathBuf` for config file.
1184+
let result = child_rt.block_on(start_storage_server(Some(s_port), actual_storage_config_path));
11601185
if let Err(e) = result {
11611186
eprintln!("Storage daemon failed: {:?}", e);
11621187
std::process::exit(1); // Exit child process on failure
11631188
}
11641189
std::process::exit(0); // Exit child process on success
11651190
} else {
11661191
println!("Storage daemon daemonized with PID {}", child_pid);
1167-
storage_status_msg = format!("Running on port: {}", s_port);
1192+
// --- ADD HEALTH CHECK HERE IN PARENT PROCESS ---
1193+
let addr_check = format!("127.0.0.1:{}", s_port);
1194+
let health_check_timeout = Duration::from_secs(5); // Give it a few seconds
1195+
let poll_interval = Duration::from_millis(200);
1196+
let mut started_ok = false;
1197+
let start_time = Instant::now();
1198+
1199+
// Use rt.block_on to run the async TCP connect in the sync context
1200+
rt.block_on(async {
1201+
while start_time.elapsed() < health_check_timeout {
1202+
match tokio::net::TcpStream::connect(&addr_check).await { // Added .await here
1203+
Ok(_) => {
1204+
println!("Storage daemon on port {} responded to health check.", s_port);
1205+
started_ok = true;
1206+
break;
1207+
}
1208+
Err(_) => {
1209+
tokio::time::sleep(poll_interval).await;
1210+
}
1211+
}
1212+
}
1213+
});
1214+
1215+
if started_ok {
1216+
storage_status_msg = format!("Running on port: {}", s_port);
1217+
} else {
1218+
eprintln!("Warning: Storage daemon daemonized with PID {} but did not become reachable on port {} within {:?}. This might indicate an internal startup failure.",
1219+
child_pid, s_port, health_check_timeout);
1220+
storage_status_msg = format!("Daemonized but failed to become reachable on port {}", s_port);
1221+
}
11681222
}
11691223
}
11701224
Err(e) => {
@@ -1191,8 +1245,8 @@ pub fn start_cli() {
11911245
println!(" Use Raft for Scale: {}", cfg.use_raft_for_scale);
11921246
}
11931247
Err(e) => {
1194-
eprintln!("Error loading default storage config: {:?}", e);
1195-
storage_status_msg = format!("Failed to load default config ({:?})", e);
1248+
eprintln!("Error loading default storage config for CLI display: {:?}", e);
1249+
storage_status_msg = format!("Failed to load default config for display ({:?})", e);
11961250
}
11971251
}
11981252
}

0 commit comments

Comments
 (0)