Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions src/player/filtergateway/src/manager.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice bug fix

Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,34 @@ impl FilterGatewayManager {
for scenario in etcd_scenario {
let scenario: Scenario = serde_yaml::from_str(&scenario)?;
println!("Scenario: {:?}", scenario);
let topic_name = scenario
.get_conditions()
.as_ref()
.map(|cond| cond.get_operand_value())
.unwrap_or_default();
let data_type_name = scenario
.get_conditions()
.as_ref()
.map(|cond| cond.get_operand_value())
.unwrap_or_default();
let mut vehicle_manager = self.vehicle_manager.lock().await;
if let Err(e) = vehicle_manager
.subscribe_topic(topic_name, data_type_name)
.await
{
eprintln!("Error subscribing to vehicle data: {:?}", e);

// Only subscribe to vehicle data if the scenario has conditions with valid topic
if let Some(conditions) = scenario.get_conditions().as_ref() {
let topic_name = conditions.get_operand_value();
let data_type_name = conditions.get_operand_value();

// Skip subscription if topic name is empty
if !topic_name.is_empty() {
let mut vehicle_manager = self.vehicle_manager.lock().await;
if let Err(e) = vehicle_manager
.subscribe_topic(topic_name, data_type_name)
.await
{
eprintln!("Error subscribing to vehicle data: {:?}", e);
}
} else {
println!(
"Skipping DDS subscription for scenario '{}': empty topic name",
scenario.get_name()
);
}
} else {
println!(
"Scenario '{}' has no conditions, skipping DDS subscription",
scenario.get_name()
);
}

self.launch_scenario_filter(scenario).await?;
}

Expand Down Expand Up @@ -185,25 +196,31 @@ impl FilterGatewayManager {
match param.action {
0 => {
// Allow
// Subscribe to vehicle data
let topic_name = param
.scenario
.get_conditions()
.as_ref()
.map(|cond| cond.get_operand_value())
.unwrap_or_default();
let data_type_name = param
.scenario
.get_conditions()
.as_ref()
.map(|cond| cond.get_operand_value())
.unwrap_or_default();
let mut vehicle_manager = self.vehicle_manager.lock().await;
if let Err(e) = vehicle_manager
.subscribe_topic(topic_name, data_type_name)
.await
{
eprintln!("Error subscribing to vehicle data: {:?}", e);
// Subscribe to vehicle data only if conditions exist and topic is valid
if let Some(conditions) = param.scenario.get_conditions().as_ref() {
let topic_name = conditions.get_operand_value();
let data_type_name = conditions.get_operand_value();

// Only subscribe if topic name is not empty
if !topic_name.is_empty() {
let mut vehicle_manager = self.vehicle_manager.lock().await;
if let Err(e) = vehicle_manager
.subscribe_topic(topic_name, data_type_name)
.await
{
eprintln!("Error subscribing to vehicle data: {:?}", e);
}
} else {
println!(
"Skipping DDS subscription for scenario '{}': empty topic name",
param.scenario.get_name()
);
}
} else {
println!(
"Scenario '{}' has no conditions, skipping DDS subscription",
param.scenario.get_name()
);
}
self.launch_scenario_filter(param.scenario).await?;
}
Expand Down
44 changes: 31 additions & 13 deletions src/server/apiserver/src/artifact/mod.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good.
However, no decision has yet been made on whether to allow multi-scenarios.

Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,32 @@ use common::spec::artifact::Volume;
/// ### Parametets
/// * `body: &str` - whole yaml string of piccolo artifact
/// ### Returns
/// * `Result(String, String)` - scenario and package yaml in downloaded artifact
/// * `Result(Vec<String>)` - vector of scenario yaml strings in downloaded artifact
/// ### Description
/// Write artifact in etcd
pub async fn apply(body: &str) -> common::Result<String> {
/// Note: Returns ALL scenarios from the YAML, not just the last one
pub async fn apply(body: &str) -> common::Result<Vec<String>> {
use std::time::Instant;
let total_start = Instant::now();

let docs: Vec<&str> = body.split("---").collect();
let mut scenario_str = String::new();
let mut package_str = String::new();
let mut scenario_strs: Vec<String> = Vec::new();
let mut has_package = false;

for doc in docs {
// Skip empty documents
if doc.trim().is_empty() {
continue;
}

let parse_start = Instant::now();
let value: serde_yaml::Value = serde_yaml::from_str(doc)?;
let value: serde_yaml::Value = match serde_yaml::from_str(doc) {
Ok(v) => v,
Err(e) => {
println!("apply: failed to parse YAML document: {:?}", e);
continue;
}
};
let artifact_str = serde_yaml::to_string(&value)?;
let parse_elapsed = parse_start.elapsed();
println!("apply: YAML parse elapsed = {:?}", parse_elapsed);
Expand All @@ -60,7 +72,8 @@ pub async fn apply(body: &str) -> common::Result<String> {

match kind {
"Scenario" => {
scenario_str = artifact_str;
// Collect ALL scenarios, not just the last one
scenario_strs.push(artifact_str);

// Set initial scenario state to idle via StateManager
println!("🔄 SCENARIO STATE INITIALIZATION: ApiServer Setting Initial State");
Expand Down Expand Up @@ -98,21 +111,22 @@ pub async fn apply(body: &str) -> common::Result<String> {
println!(" ✅ Successfully set scenario {} to idle state", name);
}
}
"Package" => package_str = artifact_str,
"Package" => has_package = true,
_ => continue,
};
}
}

let total_elapsed = total_start.elapsed();
println!("apply: total elapsed = {:?}", total_elapsed);
println!("apply: found {} scenarios", scenario_strs.len());

if scenario_str.is_empty() {
if scenario_strs.is_empty() {
Err("There is not any scenario in yaml string".into())
} else if package_str.is_empty() {
} else if !has_package {
Err("There is not any package in yaml string".into())
} else {
Ok(scenario_str)
Ok(scenario_strs)
}
}

Expand Down Expand Up @@ -241,9 +255,13 @@ spec:
result.err()
);

// Assert: scenario and package strings should not be empty
let scenario = result.unwrap();
assert!(!scenario.is_empty(), "Scenario YAML should not be empty");
// Assert: scenarios vector should not be empty
let scenarios = result.unwrap();
assert!(!scenarios.is_empty(), "Scenarios vector should not be empty");
assert!(
!scenarios[0].is_empty(),
"First scenario YAML should not be empty"
);
}

/// Test apply() with missing `action` field (invalid Scenario)
Expand Down
44 changes: 28 additions & 16 deletions src/server/apiserver/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ async fn reload() {
/// ### Description
/// write artifact in etcd
/// (optional) make yaml, kube files for Bluechi
/// send a gRPC message to gateway
/// send a gRPC message to gateway for ALL scenarios in the YAML
pub async fn apply_artifact(body: &str) -> common::Result<()> {
let scenario = crate::artifact::apply(body).await?;
let scenarios = crate::artifact::apply(body).await?;

let handle_yaml = HandleYamlRequest {
yaml: body.to_string(),
Expand Down Expand Up @@ -246,11 +246,21 @@ pub async fn apply_artifact(body: &str) -> common::Result<()> {
}
}

let req: HandleScenarioRequest = HandleScenarioRequest {
action: Action::Apply.into(),
scenario,
};
crate::grpc::sender::filtergateway::send(req).await?;
// Send ALL scenarios to filtergateway, not just the last one
println!(
"apply_artifact: Sending {} scenarios to FilterGateway",
scenarios.len()
);
for scenario in scenarios {
let req: HandleScenarioRequest = HandleScenarioRequest {
action: Action::Apply.into(),
scenario,
};
if let Err(e) = crate::grpc::sender::filtergateway::send(req).await {
eprintln!("Error sending scenario to FilterGateway: {:?}", e);
}
}

Ok(())
}

Expand Down Expand Up @@ -774,21 +784,23 @@ spec:
}

/// Mocked version of apply_artifact function.
/// Instead of full production logic, this sends a gRPC request to the mock server.
/// Instead of full production logic, this sends gRPC requests for ALL scenarios to the mock server.
async fn apply_artifact(
body: &str,
grpc_addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let scenario = crate::artifact::apply(body).await?;
let scenarios = crate::artifact::apply(body).await?;

// Prepare the gRPC request with Apply action
let req = HandleScenarioRequest {
action: Action::Apply.into(),
scenario,
};
// Send ALL scenarios to the mock gRPC server
for scenario in scenarios {
let req = HandleScenarioRequest {
action: Action::Apply.into(),
scenario,
};
mock_send(req, grpc_addr).await?;
}

// Send request to the mock gRPC server
mock_send(req, grpc_addr).await
Ok(())
}

/// Mocked version of withdraw_artifact function.
Expand Down
43 changes: 41 additions & 2 deletions src/server/apiserver/src/node/node_lookup.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deeply appreciate your contribution.

This section inserts a random node when the package fails to find a node to run on. As a result, the code shouldn't be executed, and it would be better to fundamentally change the part before this point.

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,30 @@ use prost::Message;
use std::error::Error;

/// Find a node by IP address from simplified node keys
/// Filters out invalid IPs like 0.0.0.0 and empty strings
pub async fn find_node_by_simple_key() -> Option<String> {
println!("Checking simplified node keys in etcd...");
match etcd::get_all_with_prefix("nodes/").await {
Ok(kvs) => {
println!("Found {} simplified node keys", kvs.len());
if let Some(kv) = kvs.first() {
// Iterate through all nodes and find the first valid IP
for kv in kvs {
println!("Node key: {}", kv.key);
let ip_address = kv.key.trim_start_matches("nodes/");
println!("Found node IP directly from key: {}", ip_address);

// Skip invalid IPs: empty, 0.0.0.0, or localhost when looking for remote nodes
if ip_address.is_empty()
|| ip_address == "0.0.0.0"
|| ip_address == "127.0.0.1"
{
println!("Skipping invalid IP: {}", ip_address);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, IP can be 127.0.0.1 or 0.0.0.0.

}

println!("Found valid node IP from key: {}", ip_address);
return Some(ip_address.to_string());
}
println!("No valid node IPs found in simplified keys");
None
}
Err(e) => {
Expand All @@ -32,6 +45,32 @@ pub async fn find_node_by_simple_key() -> Option<String> {
}
}

/// Find a node by specific target IP address from simplified node keys
/// Returns the IP if it exists and is valid, None otherwise
pub async fn find_node_by_target_ip(target_ip: &str) -> Option<String> {
if target_ip.is_empty() || target_ip == "0.0.0.0" {
return None;
}

println!("Looking for specific node IP: {}", target_ip);
let key = format!("nodes/{}", target_ip);

match etcd::get(&key).await {
Ok(Some(_)) => {
println!("Found target node IP in etcd: {}", target_ip);
Some(target_ip.to_string())
}
Ok(None) => {
println!("Target node IP not found in etcd: {}", target_ip);
None
}
Err(e) => {
println!("Error checking for target IP {}: {}", target_ip, e);
None
}
}
}

/// Find a node directly from etcd using cluster/nodes/ prefix
pub async fn find_node_from_etcd() -> Option<String> {
println!("Checking cluster/nodes/ prefix in etcd...");
Expand Down