diff --git a/tmll/common/models/experiment.py b/tmll/common/models/experiment.py index 2e984ec..3d44543 100644 --- a/tmll/common/models/experiment.py +++ b/tmll/common/models/experiment.py @@ -35,26 +35,36 @@ def from_tsp_experiment(cls, tsp_experiment) -> 'Experiment': def assign_outputs(self, outputs: List[Output]) -> None: self.outputs = outputs + # If there is multiple outputs with the same name, use number to differentiate them + name_counts = {} + for output in self.outputs: + name = output.name + count = name_counts.get(name, 0) + 1 + name_counts[name] = count + + if count > 1 or name in [o.name for o in self.outputs if o != output]: + output.name = f"{name} ({count})" + def __repr__(self) -> str: return f"Experiment(name={self.name}, uuid={self.uuid}, start={self.start}, end={self.end}, num_events={self.num_events}, indexing={self.indexing}, traces={self.traces}, outputs={self.outputs})" def find_outputs(self, keyword: Optional[Union[str, List[str]]] = None, type: Optional[Union[str, List[str]]] = None, match_any: bool = False) -> List[Output]: """ Find outputs based on various criteria with flexible matching logic. - + Examples: # AND logic (default) find_outputs(keyword="cpu", type="time_series") # Must match both - + # OR logic find_outputs(keyword="cpu", type="event", match_any=True) # Can match either - + # Multiple values for each criteria find_outputs( keyword=["cpu", "memory"], # With match_any=False: must contain both words type=["time_graph", "xy"], # With match_any=True: can contain either word ) - + :param keyword: The keyword(s) to search for in the output name, description, and id :type keyword: str or List[str], optional :param type: The type(s) to search for in the output type @@ -67,7 +77,7 @@ def find_outputs(self, keyword: Optional[Union[str, List[str]]] = None, type: Op # Convert single strings to lists for consistent processing keywords = [keyword] if isinstance(keyword, str) else keyword types = [type] if isinstance(type, str) else type - + # If no criteria provided, return all outputs if not any([keywords, types]): return self.outputs @@ -79,7 +89,7 @@ def matches_keywords(text: str, keys: List[str], match_any: bool) -> bool: if not keys: return True - + if match_any: return any(k in text for k in keys) return all(k in text for k in keys) @@ -89,25 +99,54 @@ def matches_keywords(text: str, keys: List[str], match_any: bool) -> bool: # Check keywords in name, description, and id search_text = f"{output.name} {output.description} {output.id}" keywords_match = matches_keywords(search_text, keywords or [], match_any) - + # Check type type_match = matches_keywords(output.type, types or [], match_any) - + if keywords_match and type_match: matches.append(output) - + return sorted(matches, key=lambda x: x.name) - - def get_output(self, output_id: str) -> Optional[Output]: + + def get_output_by_id(self, output_id: str) -> Optional[Output]: """ Get the output with the given ID. - + :param output_id: The ID of the output to get :type output_id: str :return: The output with the given ID, or None if not found :rtype: Optional[Output] """ - for output in self.outputs: - if output.id == output_id: - return output - return None \ No newline at end of file + return next((o for o in self.outputs if o.id == output_id), None) + + def get_output_by_name(self, output_name: str, partial_match: bool = False) -> Optional[Output]: + """ + Get the output with the given name (case insensitive). + + :param output_name: The name of the output to get + :type output_name: str + :param partial_match: Whether to allow partial matches + :type partial_match: bool, optional + :return: The output with the given name, or None if not found + :rtype: Optional[Output] + """ + if partial_match: + return next((o for o in self.outputs if output_name.lower() in o.name.lower()), None) + + return next((o for o in self.outputs if o.name.lower() == output_name.lower()), None) + + def get_outputs_by_name(self, output_name: str, partial_match: bool = False) -> List[Output]: + """ + Get all outputs with the given name (case insensitive). + + :param output_name: The name of the output to get + :type output_name: str + :param partial_match: Whether to allow partial matches + :type partial_match: bool, optional + :return: The list of outputs with the given name + :rtype: List[Output] + """ + if partial_match: + return [o for o in self.outputs if output_name.lower() in o.name.lower()] + + return [o for o in self.outputs if o.name.lower() == output_name.lower()] diff --git a/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py b/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py index b03b6d0..3bfa402 100644 --- a/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py +++ b/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py @@ -145,7 +145,9 @@ def plot_anomalies(self, anomaly_detection_results: Optional[AnomalyDetectionRes fig_dpi = kwargs.get("fig_dpi", 500) colors = plt.colormaps.get_cmap("tab10") - for idx, (name, dataframe) in enumerate(self.dataframes.items()): + for idx, (id, dataframe) in enumerate(self.dataframes.items()): + output = self.experiment.get_output_by_id(id) + name = output.name if output else id plots = [] # Plot the original data plots.append({ @@ -158,8 +160,7 @@ def plot_anomalies(self, anomaly_detection_results: Optional[AnomalyDetectionRes }) # Append the anomaly periods to the plots as span plot - for start, end in anomaly_detection_results.anomaly_periods[name]: - # print(f"Anomaly detected from {start} to {end}") + for start, end in anomaly_detection_results.anomaly_periods[id]: plots.append({ "label": "Anomaly Period", "plot_type": "span", @@ -172,13 +173,13 @@ def plot_anomalies(self, anomaly_detection_results: Optional[AnomalyDetectionRes }) anomaly_points_list = [] - for point in anomaly_detection_results.anomalies[name].index: - if not anomaly_detection_results.anomalies[name].loc[point].any(): + for point in anomaly_detection_results.anomalies[id].index: + if not anomaly_detection_results.anomalies[id].loc[point].any(): continue # Check if the point is within any anomaly period in_anomaly_period = False - for start, end in anomaly_detection_results.anomaly_periods[name]: + for start, end in anomaly_detection_results.anomaly_periods[id]: if start <= point <= end: in_anomaly_period = True break diff --git a/tmll/ml/modules/anomaly_detection/memory_leak_detection_module.py b/tmll/ml/modules/anomaly_detection/memory_leak_detection_module.py index 5679040..91de929 100644 --- a/tmll/ml/modules/anomaly_detection/memory_leak_detection_module.py +++ b/tmll/ml/modules/anomaly_detection/memory_leak_detection_module.py @@ -98,23 +98,29 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: **kwargs) def _post_process(self, **kwargs) -> None: - if "Events Table" in self.dataframes: - df = self.dataframes["Events Table"] - if not all(col in df.columns for col in ["size", "ptr"]): + events_df = self._get_events_dataframe() + if events_df is not None: + events_output = self.experiment.get_output_by_name("Events Table") + id = events_output.id if events_output is not None else None + if id is None: + return + + if not all(col in events_df.columns for col in ["size", "ptr"]): self.logger.warning("Events table does not contain necessary columns for memory leak analysis") - self.dataframes["Events Table"] = pd.DataFrame() return - df["event_category"] = "other" - df.loc[df["Event type"].str.contains("malloc", na=False), "event_category"] = "allocation" - df.loc[df["Event type"].str.contains("free", na=False), "event_category"] = "deallocation" - df = df[df["event_category"] != "other"] + events_df["event_category"] = "other" + events_df.loc[events_df["Event type"].str.contains("malloc", na=False), "event_category"] = "allocation" + events_df.loc[events_df["Event type"].str.contains("free", na=False), "event_category"] = "deallocation" + events_df = events_df[events_df["event_category"] != "other"] - df = df.rename({"size": "allocation_size"}, axis=1) - df["allocation_size"] = df["allocation_size"].astype(float) - df["ptr"] = df["ptr"].astype(str) + events_df = events_df.rename({"size": "allocation_size"}, axis=1) + events_df["allocation_size"] = events_df["allocation_size"].astype(float) + events_df["ptr"] = events_df["ptr"].astype(str) - self.dataframes["Events Table"] = df + events_df = self._separate_events(events_df) + + self.dataframes[id] = events_df def _separate_events(self, dataframe: pd.DataFrame) -> pd.DataFrame: """ @@ -138,6 +144,31 @@ def _separate_events(self, dataframe: pd.DataFrame) -> pd.DataFrame: return dataframe + def _get_memory_usage_dataframe(self) -> Optional[pd.DataFrame]: + """ + Get the memory usage dataframe from the experiment. + + :return: The memory usage dataframe + :rtype: Optional[pd.DataFrame] + """ + mu_outputs = self.experiment.get_outputs_by_name("Memory Usage", True) + ust_mu_output = next((o for o in mu_outputs if "ust" in o.id.lower()), None) + if ust_mu_output is not None: + return self.dataframes.get(ust_mu_output.id, None) + return None + + def _get_events_dataframe(self) -> Optional[pd.DataFrame]: + """ + Get the events table dataframe from the experiment. + + :return: The events dataframe + :rtype: Optional[pd.DataFrame] + """ + et_output = self.experiment.get_output_by_name("Events Table") + if et_output is not None: + return self.dataframes.get(et_output.id, None) + return None + def analyze_memory_leaks(self, window_size: str = MemoryThresholds.window_size, fragmentation_threshold: float = MemoryThresholds.fragmentation, slope_threshold: float = MemoryThresholds.growth_slope) -> LeakAnalysisResult: @@ -161,9 +192,20 @@ def analyze_memory_leaks(self, window_size: str = MemoryThresholds.window_size, :return: The results of the memory leak analysis :rtype: LeakAnalysisResult """ - if any([self.dataframes.get("Events Table") is None or self.dataframes["Events Table"].empty, - self.dataframes.get("Memory Usage") is None or self.dataframes["Memory Usage"].empty]): - self.logger.warning("Insufficient data available for memory leak analysis") + et_df = self._get_events_dataframe() + if et_df is None: + self.logger.warning("Events table output not found in the experiment") + return LeakAnalysisResult( + severity=MemoryLeakSeverity.NONE, + confidence_score=0.0, + metrics=MemoryMetrics(0, 0, 0, 0, 0, 0, 0, 0), + detected_patterns=[], + suspicious_locations=pd.DataFrame() + ) + + mu_df = self._get_memory_usage_dataframe() + if mu_df is None: + self.logger.warning("Memory usage output not found in the experiment") return LeakAnalysisResult( severity=MemoryLeakSeverity.NONE, confidence_score=0.0, @@ -201,12 +243,11 @@ def _track_pointer_lifecycle(self) -> pd.DataFrame: :return: A DataFrame containing information about memory leaks :rtype: pd.DataFrame """ - if "Events Table" not in self.dataframes or self.dataframes["Events Table"].empty: + events_df = self._get_events_dataframe() + if events_df is None or events_df.empty: self.logger.warning("No events data available for memory leak analysis") return pd.DataFrame() - events_df = self.dataframes["Events Table"] - # Separate allocations and deallocations allocations = events_df[events_df["event_category"] == "allocation"] deallocations = events_df[events_df["event_category"] == "deallocation"] @@ -246,22 +287,23 @@ def _analyze_memory_trend(self) -> Dict[str, Any]: :return: The results of the memory usage trend analysis :rtype: Dict[str, Any] """ - if "Memory Usage" not in self.dataframes or self.dataframes["Memory Usage"].empty: + memory_df = self._get_memory_usage_dataframe() + if memory_df is None or memory_df.empty: self.logger.warning("No memory usage data available for trend analysis") return {} - memory_df = self.data_preprocessor.normalize(self.dataframes["Memory Usage"]) + memory_df = self.data_preprocessor.normalize(memory_df) # Convert timestamps to seconds from start time_seconds = (memory_df.index - memory_df.index[0]).total_seconds() # Calculate rolling statistics window_size = pd.Timedelta(self.thresholds.window_size) - rolling_mean = memory_df["Memory Usage"].rolling(window=window_size).mean() - rolling_std = memory_df["Memory Usage"].rolling(window=window_size).std() + rolling_mean = memory_df.iloc[:, 0].rolling(window=window_size).mean() + rolling_std = memory_df.iloc[:, 0].rolling(window=window_size).std() # Perform linear regression with actual time intervals - slope, intercept, r_value, p_value, _ = stats.linregress(time_seconds, memory_df["Memory Usage"].values) + slope, intercept, r_value, p_value, _ = stats.linregress(time_seconds, memory_df.iloc[:, 0].values) slope = cast(float, slope) intercept = cast(float, intercept) r_value = cast(float, r_value) @@ -290,11 +332,11 @@ def _analyze_allocation_patterns(self) -> Dict[str, Any]: :return: The results of the allocation pattern analysis :rtype: Dict[str, Any] """ - if "Events Table" not in self.dataframes or self.dataframes["Events Table"].empty: + events_df = self._get_events_dataframe() + if events_df is None or events_df.empty: self.logger.warning("No events data available for allocation pattern analysis") return {} - events_df = self.dataframes["Events Table"] allocation_events = events_df[events_df["event_category"] == "allocation"] # Check for null values in allocation size @@ -430,12 +472,11 @@ def _identify_suspicious_locations(self, ptr_tracking: pd.DataFrame) -> pd.DataF :return: The top suspicious memory allocation locations :rtype: pd.DataFrame """ - if "Events Table" not in self.dataframes or self.dataframes["Events Table"].empty: + events_df = self._get_events_dataframe() + if events_df is None or events_df.empty: self.logger.warning("No events data available for suspicious location analysis") return pd.DataFrame() - events_df = self.dataframes["Events Table"] - # Find allocations without matching deallocations unfreed_ptrs = ptr_tracking[ptr_tracking["deallocation_time"].isna()]["ptr"] @@ -523,31 +564,34 @@ def interpret(self, analysis_result: LeakAnalysisResult) -> None: "Top 5 Suspicious Locations" ) - memory_df = self.dataframes["Memory Usage"] - peak_val, peak_unit = Formatter.format_bytes(memory_df["Memory Usage"].max()) - avg_val, avg_unit = Formatter.format_bytes(memory_df["Memory Usage"].mean()) - std_val, std_unit = Formatter.format_bytes(memory_df["Memory Usage"].std()) - DocumentGenerator.metrics_group("Memory Usage Statistics", { - "Peak Memory Usage": f"{peak_val:.2f} {peak_unit}", - "Average Memory Usage": f"{avg_val:.2f} {avg_unit}", - "Memory Usage Std Dev": f"{std_val:.2f} {std_unit}" - }) + memory_df = self._get_memory_usage_dataframe() + if memory_df is not None: + peak_val, peak_unit = Formatter.format_bytes(memory_df.iloc[:, 0].max()) + avg_val, avg_unit = Formatter.format_bytes(memory_df.iloc[:, 0].mean()) + std_val, std_unit = Formatter.format_bytes(memory_df.iloc[:, 0].std()) + DocumentGenerator.metrics_group("Memory Usage Statistics", { + "Peak Memory Usage": f"{peak_val:.2f} {peak_unit}", + "Average Memory Usage": f"{avg_val:.2f} {avg_unit}", + "Memory Usage Std Dev": f"{std_val:.2f} {std_unit}" + }) - allocation_events = self.dataframes["Events Table"][ - self.dataframes["Events Table"]["event_category"] == "allocation" - ] - deallocation_events = self.dataframes["Events Table"][ - self.dataframes["Events Table"]["event_category"] == "deallocation" - ] - unmatched_allocations = allocation_events.loc[ - ~allocation_events["ptr"].isin(deallocation_events["ptr"]) - ]["ptr"].unique() - - DocumentGenerator.metrics_group("Allocation Statistics", { - "Total Allocations": f"{len(allocation_events):,}", - "Total Deallocations": f"{len(deallocation_events):,}", - "Unmatched Allocations": f"{len(unmatched_allocations):,}" - }) + events_df = self._get_events_dataframe() + if events_df is not None: + allocation_events = events_df[ + events_df["event_category"] == "allocation" + ] + deallocation_events = events_df[ + events_df["event_category"] == "deallocation" + ] + unmatched_allocations = allocation_events.loc[ + ~allocation_events["ptr"].isin(deallocation_events["ptr"]) + ]["ptr"].unique() + + DocumentGenerator.metrics_group("Allocation Statistics", { + "Total Allocations": f"{len(allocation_events):,}", + "Total Deallocations": f"{len(deallocation_events):,}", + "Unmatched Allocations": f"{len(unmatched_allocations):,}" + }) ptr_tracking = self._track_pointer_lifecycle() lifetimes = ptr_tracking["lifetime"].dropna() @@ -569,19 +613,19 @@ def plot_memory_leak_analysis(self, analysis_result: LeakAnalysisResult, **kwarg :type analysis_result: LeakAnalysisResult :param kwargs: Additional keyword arguments """ - memory_df = self.dataframes.get("Memory Usage", pd.DataFrame()) - events_df = self.dataframes.get("Events Table", pd.DataFrame()) + memory_df = self._get_memory_usage_dataframe() + events_df = self._get_events_dataframe() fig_size = kwargs.get("fig_size", (15, 5)) fig_dpi = kwargs.get("fig_dpi", 100) colors = plt.get_cmap("tab10") - if not memory_df.empty: + if memory_df is not None and not memory_df.empty: points_per_window = max(len(memory_df) // 10, 1) # Scale the trend line as it was trained on normalized data, so we need to scale it back - analysis_result.metrics.regression_intercept = analysis_result.metrics.regression_intercept * memory_df["Memory Usage"].max() - analysis_result.metrics.regression_slope = analysis_result.metrics.regression_slope * memory_df["Memory Usage"].max() + analysis_result.metrics.regression_intercept = analysis_result.metrics.regression_intercept * memory_df.iloc[:, 0].max() + analysis_result.metrics.regression_slope = analysis_result.metrics.regression_slope * memory_df.iloc[:, 0].max() trend_line = pd.DataFrame({ "timestamp": memory_df.index, "trend_line": analysis_result.metrics.regression_slope * @@ -594,7 +638,6 @@ def plot_memory_leak_analysis(self, analysis_result: LeakAnalysisResult, **kwarg { "plot_type": "time_series", "data": memory_df, - "y": "Memory Usage", "label": "Memory Usage", "alpha": 0.8, "linewidth": 2, @@ -603,7 +646,6 @@ def plot_memory_leak_analysis(self, analysis_result: LeakAnalysisResult, **kwarg { "plot_type": "time_series", "data": memory_df.rolling(window=points_per_window, min_periods=1, center=True).mean(), - "y": "Memory Usage", "label": "Rolling Mean", "alpha": 0.9, "linewidth": 2, @@ -625,7 +667,7 @@ def plot_memory_leak_analysis(self, analysis_result: LeakAnalysisResult, **kwarg self._plot(plots, plot_size=fig_size, dpi=fig_dpi, fig_title="Memory Usage Over Time", fig_xlabel="Time", fig_ylabel="Memory Usage", grid=True) - if not events_df.empty: + if events_df is not None and not events_df.empty: allocation_events = events_df[events_df["event_category"] == "allocation"] deallocation_events = events_df[events_df["event_category"] == "deallocation"] ptr_tracking = self._track_pointer_lifecycle() diff --git a/tmll/ml/modules/base_module.py b/tmll/ml/modules/base_module.py index ebba2f3..f6b64d8 100644 --- a/tmll/ml/modules/base_module.py +++ b/tmll/ml/modules/base_module.py @@ -145,14 +145,14 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: # Process each output for output_key, output_data in data.items(): - shortened = output_key.split("$")[0] - converted = next(iter(output for output in outputs if output.id == shortened), None) if outputs else None - shortened = converted.name if converted else shortened + output = next(iter(output for output in outputs if output.id == output_key.split("$")[0]), None) if outputs else None + if output is None: + continue - if shortened not in self.dataframes: + if output.id not in self.dataframes: df = output_data - if converted and converted.type == "TIME_GRAPH": + if output.type == "TIME_GRAPH": df = df.rename({"start_time": "timestamp"}, axis=1) df["end_time"] = pd.to_datetime(df["end_time"]) @@ -166,7 +166,7 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: if kwargs.get("remove_minimum", False): df = self.data_preprocessor.remove_minimum(df) - self.dataframes[shortened] = df + self.dataframes[output.id] = df # Filter out dataframes with less than min_size instances min_size = kwargs.get("min_size", 1) diff --git a/tmll/ml/modules/common/data_fetch.py b/tmll/ml/modules/common/data_fetch.py index 17f98be..0abaf08 100644 --- a/tmll/ml/modules/common/data_fetch.py +++ b/tmll/ml/modules/common/data_fetch.py @@ -60,7 +60,7 @@ def fetch_data(self, experiment: Experiment, target_outputs: Optional[List[Outpu for key, value in data[output.id].items(): if isinstance(value, pd.DataFrame): dataframe = value - dataframe = dataframe.rename(columns={'y': output.name, 'x': 'timestamp'}) + dataframe = dataframe.rename(columns={'y': output.id, 'x': 'timestamp'}) if dataframes.get(f"{output.id}${key}", None) is None: dataframes[f"{output.id}${key}"] = dataframe else: diff --git a/tmll/ml/modules/performance_trend/change_point_module.py b/tmll/ml/modules/performance_trend/change_point_module.py index ddf14c5..ba0b36a 100644 --- a/tmll/ml/modules/performance_trend/change_point_module.py +++ b/tmll/ml/modules/performance_trend/change_point_module.py @@ -266,6 +266,16 @@ def get_change_points(self, metrics: Optional[List[str]] = None, self.window_size = window_size results = ChangePointAnalysisResult(metrics={}) + + if metrics: + for idx, metric in enumerate(metrics): + output = self.experiment.get_output_by_name(metric) + if output is not None: + metrics[idx] = output.id + else: + metrics[idx] = "unknown" + metrics = [metric for metric in metrics if metric != "unknown"] + metrics_to_analyze = metrics if metrics else list(self.dataframes.keys()) if not methods: @@ -474,48 +484,33 @@ def plot_change_points(self, results: Optional[ChangePointAnalysisResult] = None fig_size = kwargs.get("fig_size", (15, 3)) fig_dpi = kwargs.get("fig_dpi", 100) - - combined_plots = [] colors = plt.get_cmap("tab10") - for idx, metric in enumerate(self.combined_df.columns): - # Add time series plot for each metric (combined) - combined_plots.append({ - "plot_type": "time_series", - "data": pd.DataFrame({metric: self.combined_df[metric]}, index=self.combined_df.index), - "label": metric, - "title": metric, - "y": metric, - "color": colors(idx % 10), - "linewidth": 1.5, - "xlabel": "Time", - "ylabel": metric - }) - self._plot(plots=combined_plots, plot_size=fig_size, dpi=fig_dpi, - fig_title="Time Series of Metrics", - fig_xlabel="Time", - fig_ylabel="Value (normalized)") - def _get_metric_data(metric_name: str) -> Tuple[str, Optional[pd.Series]]: - match metric_name: + def _get_metric_data(metric: str) -> Tuple[str, Optional[pd.Series]]: + match metric: case "zscore": return "Combined z-score Analysis", \ - results.metrics[metric_name].kwargs.get("combined_score") if ( + results.metrics[metric].kwargs.get("combined_score") if ( "zscore" in results.metrics and results.metrics["zscore"].kwargs is not None) else None case "voting": return "Voting-based Analysis", \ - results.metrics[metric_name].kwargs.get("vote_matrix", None) if ( + results.metrics[metric].kwargs.get("vote_matrix", None) if ( "voting" in results.metrics and results.metrics["voting"].kwargs is not None) else None case "pca": return "PCA Analysis", \ - results.metrics[metric_name].kwargs.get("principal_component", None) if ( + results.metrics[metric].kwargs.get("principal_component", None) if ( "pca" in results.metrics and results.metrics["pca"].kwargs is not None) else None case _: - return f"Change Points for {metric_name}", \ - self.dataframes[metric_name].iloc[:, 0] if metric_name in self.dataframes else None + return f"Change Points for {metric}", \ + self.dataframes[metric].iloc[:, 0] if metric in self.dataframes else None - for idx, (metric_name, metric_result) in enumerate(results.metrics.items()): + for idx, (metric, result) in enumerate(results.metrics.items()): plots = [] - title, series = _get_metric_data(metric_name) + title, series = _get_metric_data(metric) + output = self.experiment.get_output_by_id(metric) + metric_name = output.name if output else metric + title = title.replace(metric, metric_name) + # Add individual metric plots plots.append({ "plot_type": "time_series", @@ -528,7 +523,7 @@ def _get_metric_data(metric_name: str) -> Tuple[str, Optional[pd.Series]]: }) # Add change points to the plot - for cp, mag in zip(metric_result.change_points, metric_result.magnitudes): + for cp, mag in zip(result.change_points, result.magnitudes): change_time = self.combined_df.index[cp] # Normalize magnitude based on the metric diff --git a/tmll/ml/modules/predictive_maintenance/capacity_planning_module.py b/tmll/ml/modules/predictive_maintenance/capacity_planning_module.py index 4186c17..2e424c0 100644 --- a/tmll/ml/modules/predictive_maintenance/capacity_planning_module.py +++ b/tmll/ml/modules/predictive_maintenance/capacity_planning_module.py @@ -438,13 +438,10 @@ def forecast_capacity(self, resource_types: Optional[List[ResourceType]] = None, for resource_type, columns in resource_columns_by_type.items(): if resource_type == ResourceType.CPU: threshold = self.thresholds.cpu_threshold - units = "%" elif resource_type == ResourceType.MEMORY: threshold = self.thresholds.memory_threshold - units = "bytes" else: threshold = self.thresholds.disk_threshold - units = "bytes/s" type_metrics = {} forecast_timestamps = [] @@ -452,8 +449,7 @@ def forecast_capacity(self, resource_types: Optional[List[ResourceType]] = None, original_series = self.combined_df[f"{name}_original"] forecast_values, forecast_timestamps = resource_metrics[name] - forecast_values = self.scalers[name].inverse_transform(np.array(forecast_values) - .reshape(-1, 1)).flatten().tolist() + forecast_values = self.scalers[name].inverse_transform(np.array(forecast_values).reshape(-1, 1)).flatten().tolist() # type: ignore violations = self._detect_threshold_violations( forecast_values, @@ -513,7 +509,7 @@ def interpret(self, forecast_results: Dict[ResourceType, CapacityForecastResult] DocumentGenerator.metrics_group("Analysis Parameters", parameters) # Resource metrics - for resource_name, metrics in result.resource_metrics.items(): + for resource_id, metrics in result.resource_metrics.items(): resource_metrics = { "Current Usage": self._get_formatted_resource_property(metrics.current_usage, resource_type), "Peak Usage": self._get_formatted_resource_property(metrics.peak_usage, resource_type), @@ -533,6 +529,8 @@ def interpret(self, forecast_results: Dict[ResourceType, CapacityForecastResult] ) resource_metrics["Total Violations"] = str(len(metrics.threshold_violations)) + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id DocumentGenerator.metrics_group(f"Resource: {resource_name}", resource_metrics) if metrics.threshold_violations: @@ -572,12 +570,15 @@ def interpret(self, forecast_results: Dict[ResourceType, CapacityForecastResult] } for resource_type, result in forecast_results.items(): - for resource_name, metrics in result.resource_metrics.items(): + for resource_id, metrics in result.resource_metrics.items(): average_usage = metrics.average_usage peak_usage = metrics.peak_usage pattern = metrics.utilization_pattern violations = metrics.threshold_violations + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id + threshold = ( result.thresholds_used.cpu_threshold if resource_type.name == ResourceType.CPU.name else result.thresholds_used.memory_threshold if resource_type.name == ResourceType.MEMORY.name @@ -685,8 +686,8 @@ def plot_capacity_forecast(self, forecast_results: Dict[ResourceType, CapacityFo colors = plt.get_cmap("tab10") for idx, (resource_type, result) in enumerate(forecast_results.items()): - for resource_name, metrics in result.resource_metrics.items(): - historical_data = self.combined_df[f"{resource_name}_original"] + for resource_id, metrics in result.resource_metrics.items(): + historical_data = self.combined_df[f"{resource_id}_original"] # Get the last 10% of the data for zoomed view if zoomed: @@ -695,7 +696,7 @@ def plot_capacity_forecast(self, forecast_results: Dict[ResourceType, CapacityFo forecast_data = pd.Series( metrics.forecast_values, index=metrics.forecast_timestamps, - name=resource_name + name=resource_id ) plots = [] @@ -771,6 +772,8 @@ def plot_capacity_forecast(self, forecast_results: Dict[ResourceType, CapacityFo usage_str = self._get_formatted_resource_property(metrics.average_usage, resource_type) peak_str = self._get_formatted_resource_property(metrics.peak_usage, resource_type) + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id title = ( f"{resource_type.name} Capacity Forecast: {resource_name}\n" f"(Average Usage: {usage_str}, " diff --git a/tmll/ml/modules/resource_optimization/idle_resource_detection_module.py b/tmll/ml/modules/resource_optimization/idle_resource_detection_module.py index 2d994f5..466c3ff 100644 --- a/tmll/ml/modules/resource_optimization/idle_resource_detection_module.py +++ b/tmll/ml/modules/resource_optimization/idle_resource_detection_module.py @@ -101,8 +101,13 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: **kwargs) def _post_process(self, **kwargs) -> None: - resources_df: Optional[pd.DataFrame] = next((df for name, df in self.dataframes.items() if "resources status" in name.lower()), None) - if resources_df is None: + resources_output = self.experiment.get_output_by_name("Resources Status") + if resources_output is None: + self.logger.warning("No Resources Status output found") + return + + resources_df = self.dataframes.get(resources_output.id, pd.DataFrame()) + if resources_df.empty: self.logger.warning("No CPU resources found in data") return @@ -156,7 +161,7 @@ def _post_process(self, **kwargs) -> None: # Set multi-index with timestamp and cpu_id resources_df = resources_df.set_index(["timestamp", "cpu_id"]) - self.dataframes["Resources Status"] = resources_df + self.dataframes[resources_output.id] = resources_df def _get_formatted_resource_property(self, resource_property: float, resource_type: ResourceType) -> str: """ @@ -245,10 +250,14 @@ def analyze_cpu_scheduling(self) -> Dict[int, SchedulingMetrics]: :return: Analysis results for each CPU core :rtype: Dict[int, SchedulingMetrics] """ - resources_df = self.dataframes.get("Resources Status", pd.DataFrame()) + resources_output = self.experiment.get_output_by_name("Resources Status") + if resources_output is None: + self.logger.warning("No Resources Status output found") + return {} + resources_df = self.dataframes.get(resources_output.id, pd.DataFrame()) if resources_df.empty: - self.logger.warning("Empty \"Resources Status\" dataframe provided") + self.logger.warning("No Resources Status data found") return {} cpu_ids = resources_df.index.get_level_values("cpu_id").unique() @@ -285,8 +294,8 @@ def analyze_cpu_scheduling(self) -> Dict[int, SchedulingMetrics]: unique_tasks=len(task_counts), most_common_tasks=task_counts.head(5).to_dict(), number_of_idle_periods=len(idle_period_lengths), - longest_idle_period=idle_period_lengths.max() if not idle_period_lengths.empty else 0, - average_idle_period=idle_period_lengths.mean() if not idle_period_lengths.empty else 0, + longest_idle_period=idle_period_lengths.max() if not idle_period_lengths.empty else 0, # type: ignore + average_idle_period=idle_period_lengths.mean() if not idle_period_lengths.empty else 0, # type: ignore context_switches=task_changes, context_switches_per_second=task_changes / total_duration if total_duration > 0 else 0, task_distribution={ @@ -408,7 +417,7 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys # Overall Resource Utilization - Show for all resource types overall_metrics = {} for resource_type in ResourceType: - if not any(resource_type.name.lower() in name.lower() for name in self.dataframes): + if not any(resource_type.name.lower() in id.lower() for id in self.dataframes): overall_metrics[f"{resource_type.name} Average Usage"] = "N/A (No data available)" overall_metrics[f"{resource_type.name} Monitoring Duration"] = "N/A (No data available)" continue @@ -424,7 +433,7 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys usage_str = f"{self._get_formatted_resource_property(sum(m.average_usage for m in result.idle_resources.values()), resource_type)} (Idle periods detected)" else: # Calculate metrics for resources without idle periods - df = next((df for name, df in self.dataframes.items() if resource_type.name.lower() in name.lower()), None) + df = next((df for id, df in self.dataframes.items() if resource_type.name.lower() in id.lower()), None) if df is not None: usage_str = self._get_formatted_resource_property(df.mean().mean(), resource_type) total_duration = (df.index[-1] - df.index[0]).total_seconds() @@ -456,7 +465,7 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys DocumentGenerator.metrics_group("Analysis Parameters", threshold_metrics) - for resource_name, metrics in result.idle_resources.items(): + for resource_id, metrics in result.idle_resources.items(): resource_metrics = {} resource_metrics["Average Usage"] = self._get_formatted_resource_property(metrics.average_usage, resource_type) resource_metrics["Peak Usage"] = self._get_formatted_resource_property(metrics.peak_usage, resource_type) @@ -472,6 +481,8 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys longest_idle = max((end - start).total_seconds() for start, end in metrics.idle_periods) resource_metrics["Longest Idle Period"] = f"{longest_idle:.2f}s" + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id DocumentGenerator.metrics_group(f"Resource: {resource_name}", resource_metrics) if metrics.idle_periods: @@ -556,7 +567,9 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys variable_cpus = [] high_usage_cpus = [] - for resource_name, metrics in cpu_result.idle_resources.items(): + for resource_id, metrics in cpu_result.idle_resources.items(): + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id idle_cpus.append(f"{resource_name} ({metrics.idle_percentage:.1f}% idle)") if metrics.usage_pattern == "Highly variable": @@ -587,7 +600,9 @@ def interpret(self, idle_results: Optional[Dict[ResourceType, IdleResourceAnalys idle_memory = [] inefficient_memory = [] - for resource_name, metrics in memory_result.idle_resources.items(): + for resource_id, metrics in memory_result.idle_resources.items(): + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id val, unit = Formatter.format_bytes(metrics.average_usage) peak_val, peak_unit = Formatter.format_bytes(metrics.peak_usage) @@ -686,8 +701,10 @@ def plot_resource_utilization(self, analysis_results: Dict[ResourceType, IdleRes if not result.idle_resources: continue - for resource_name, metrics in result.idle_resources.items(): - df = self.dataframes[resource_name] + for resource_id, metrics in result.idle_resources.items(): + df = self.dataframes[resource_id] + output = self.experiment.get_output_by_id(resource_id) + resource_name = output.name if output else resource_id plots = [] # Resource usage plot @@ -762,9 +779,14 @@ def plot_cpu_scheduling(self, scheduling_results: Dict[int, SchedulingMetrics], self.logger.warning("No CPU scheduling results to plot") return - resources_df = self.dataframes.get("Resources Status", pd.DataFrame()) + resources_output = self.experiment.get_output_by_name("Resources Status") + if resources_output is None: + self.logger.warning("No Resources Status output found") + return + + resources_df = self.dataframes.get(resources_output.id, pd.DataFrame()) if resources_df.empty: - self.logger.warning("No time graph data available for plotting") + self.logger.warning("No Resources Status data found") return fig_size = kwargs.get("fig_size", (15, 4)) diff --git a/tmll/ml/modules/root_cause/correlation_module.py b/tmll/ml/modules/root_cause/correlation_module.py index e60ef39..458c80b 100644 --- a/tmll/ml/modules/root_cause/correlation_module.py +++ b/tmll/ml/modules/root_cause/correlation_module.py @@ -118,40 +118,40 @@ def analyze_correlations(self, start_time: Optional[pd.Timestamp] = None, return None period_data = { - name: df[(df.index >= start_time) & (df.index <= end_time)] - for name, df in self.dataframes.items() + id: df[(df.index >= start_time) & (df.index <= end_time)] + for id, df in self.dataframes.items() } else: period_data = self.dataframes # Remove dataframes with no data in the period - period_data = {name: df for name, df in period_data.items() if not df.empty} + period_data = {id: df for id, df in period_data.items() if not df.empty} # Calculate all pairwise correlations - names = list(period_data.keys()) - correlation_matrix = pd.DataFrame(index=names, columns=names, dtype=float) - p_values_matrix = pd.DataFrame(index=names, columns=names, dtype=float) - methods_matrix = pd.DataFrame(index=names, columns=names, dtype=str) + ids = list(period_data.keys()) + correlation_matrix = pd.DataFrame(index=ids, columns=ids, dtype=float) + p_values_matrix = pd.DataFrame(index=ids, columns=ids, dtype=float) + methods_matrix = pd.DataFrame(index=ids, columns=ids, dtype=str) - for i, name1 in enumerate(names): - for j, name2 in enumerate(names): + for i, id1 in enumerate(ids): + for j, id2 in enumerate(ids): if i < j: # Only calculate upper triangle - series1 = period_data[name1][name1] - series2 = period_data[name2][name2] + series1 = period_data[id1][id1] + series2 = period_data[id2][id2] corr, p_value = self._calculate_correlation(series1, series2, method) used_method = method or Statistics.get_correlation_method(series1, series2) - correlation_matrix.loc[name1, name2] = corr - correlation_matrix.loc[name2, name1] = corr - p_values_matrix.loc[name1, name2] = p_value - p_values_matrix.loc[name2, name1] = p_value - methods_matrix.loc[name1, name2] = used_method - methods_matrix.loc[name2, name1] = used_method + correlation_matrix.loc[id1, id2] = corr + correlation_matrix.loc[id2, id1] = corr + p_values_matrix.loc[id1, id2] = p_value + p_values_matrix.loc[id2, id1] = p_value + methods_matrix.loc[id1, id2] = used_method + methods_matrix.loc[id2, id1] = used_method elif i == j: - correlation_matrix.loc[name1, name2] = 1.0 - p_values_matrix.loc[name1, name2] = 0.0 - methods_matrix.loc[name1, name2] = "identity" + correlation_matrix.loc[id1, id2] = 1.0 + p_values_matrix.loc[id1, id2] = 0.0 + methods_matrix.loc[id1, id2] = "identity" results["correlations"] = correlation_matrix results["p_values"] = p_values_matrix @@ -176,12 +176,14 @@ def analyze_lags(self, series1_name: str, series2_name: str, :return: Lag analysis results :rtype: Optional[LagAnalysisResult] """ - if series1_name not in self.dataframes or series2_name not in self.dataframes: + output1 = self.experiment.get_output_by_name(series1_name) + output2 = self.experiment.get_output_by_name(series2_name) + if output1 is None or output2 is None: self.logger.warning("Series not found in dataframes.") return None - series1 = self.dataframes[series1_name][series1_name] - series2 = self.dataframes[series2_name][series2_name] + series1 = self.dataframes[output1.id][output1.id] + series2 = self.dataframes[output2.id][output2.id] lag_results = [] for lag in range(-max_lag, max_lag + 1): @@ -207,8 +209,8 @@ def analyze_lags(self, series1_name: str, series2_name: str, return LagAnalysisResult(lag_analysis=lag_results, optimal_lag=optimal_lag, - series1=series1_name, - series2=series2_name) + series1=output1.id, + series2=output2.id) def plot_correlation_matrix(self, results: Optional[CorrelationAnalysisResult], **kwargs) -> None: """Plot correlation matrix heatmap from analysis results. @@ -234,12 +236,17 @@ def plot_correlation_matrix(self, results: Optional[CorrelationAnalysisResult], fig_size = kwargs.get("fig_size", (10, 8)) fig_dpi = kwargs.get("fig_dpi", 100) - y_ticks = results.correlations.index - x_ticks = results.correlations.columns + correlation_matrix = results.correlations.copy().fillna(0) + get_name = lambda x: (output := self.experiment.get_output_by_id(x)) and output.name or x + correlation_matrix.columns = pd.Index([get_name(col) for col in correlation_matrix.columns]) + correlation_matrix.index = pd.Index([get_name(idx) for idx in correlation_matrix.index]) + + y_ticks = correlation_matrix.index + x_ticks = correlation_matrix.columns self._plot([{ "plot_type": "heatmap", - "data": results.correlations.fillna(0), + "data": correlation_matrix, "mask": mask, "cmap": "RdBu", }], plot_size=fig_size, dpi=fig_dpi, fig_title="Correlation Matrix", grid=False, @@ -306,10 +313,11 @@ def plot_lag_analysis(self, lag_results: Optional[LagAnalysisResult], **kwargs) } ] + get_name = lambda x: (output := self.experiment.get_output_by_id(x)) and output.name or x fig_size = kwargs.get("fig_size", (10, 6)) fig_dpi = kwargs.get("fig_dpi", 100) self._plot(plots, plot_size=fig_size, dpi=fig_dpi, - fig_title=f"Lag Analysis: {lag_results.series1} vs {lag_results.series2}", + fig_title=f"Lag Analysis: {get_name(lag_results.series1)} vs {get_name(lag_results.series2)}", fig_xlabel="Lag", fig_ylabel="Correlation") def plot_time_series(self, series: List[str], @@ -335,19 +343,20 @@ def plot_time_series(self, series: List[str], plots = [] for idx, name in enumerate(series): - if name not in self.dataframes: + output = self.experiment.get_output_by_name(name) + if output is None: self.logger.warning(f"Series {name} not found in dataframes.") continue - df = self.dataframes[name].copy() + df = self.dataframes[output.id].copy() if start_time and end_time: df = df[(df.index >= start_time) & (df.index <= end_time)] plots.append({ "plot_type": "time_series", "data": df, - "y": name, - "label": name, + "y": output.id, + "label": output.name, "color": colors(idx % 10), "alpha": 0.85, "linewidth": 2