diff --git a/src/pyrecest/utils/__init__.py b/src/pyrecest/utils/__init__.py index 839247db6..202bd4a47 100644 --- a/src/pyrecest/utils/__init__.py +++ b/src/pyrecest/utils/__init__.py @@ -9,8 +9,8 @@ from .association_models import LogisticPairwiseAssociationModel from .history_recorder import HistoryRecorder from .metrics import ( - anis, anees, + anis, chi_square_confidence_bounds, chi_square_confidence_interval, consistency_fraction, diff --git a/src/pyrecest/utils/metrics.py b/src/pyrecest/utils/metrics.py index 911294c2d..e37275a8f 100644 --- a/src/pyrecest/utils/metrics.py +++ b/src/pyrecest/utils/metrics.py @@ -67,7 +67,12 @@ def squared_error(estimates: ArrayLike, groundtruths: ArrayLike) -> np.ndarray: return np.sum(errors * errors, axis=-1) -def mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def mean_squared_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return mean squared component error. ``axis=None`` averages all entries. Use ``axis=0`` to obtain a component-wise @@ -77,12 +82,22 @@ def mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: i return np.mean(errors * errors, axis=axis) -def root_mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def root_mean_squared_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return root mean squared component error.""" return np.sqrt(mean_squared_error(estimates, groundtruths, axis=axis)) -def mean_absolute_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def mean_absolute_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return mean absolute component error.""" return np.mean(np.abs(error_vectors(estimates, groundtruths)), axis=axis) @@ -92,7 +107,11 @@ def mean_absolute_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: mae = mean_absolute_error -def normalized_estimation_error_squared(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike | None = None): +def normalized_estimation_error_squared( + estimates: ArrayLike, + uncertainties: ArrayLike, + groundtruths: ArrayLike | None = None, +): """Return normalized estimation error squared values. If ``groundtruths`` is omitted, ``estimates`` is interpreted as an error @@ -100,19 +119,43 @@ def normalized_estimation_error_squared(estimates: ArrayLike, uncertainties: Arr returned values have expected value equal to the state dimension for a consistent covariance. """ - errors = _as_numeric_array(estimates, "estimates") if groundtruths is None else error_vectors(estimates, groundtruths) + errors = ( + _as_numeric_array(estimates, "estimates") + if groundtruths is None + else error_vectors(estimates, groundtruths) + ) errors = _as_sample_matrix(errors, "errors") - covariances = _as_covariance_stack(uncertainties, errors.shape[0], errors.shape[1], "uncertainties") + covariances = _as_covariance_stack( + uncertainties, errors.shape[0], errors.shape[1], "uncertainties" + ) values = _quadratic_forms(errors, covariances) - return float(values[0]) if _is_single_vector(estimates) and groundtruths is None else values + return ( + float(values[0]) + if _is_single_vector(estimates) and groundtruths is None + else values + ) -def average_nees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike | None = None) -> float: +def average_nees( + estimates: ArrayLike, + uncertainties: ArrayLike, + groundtruths: ArrayLike | None = None, +) -> float: """Return average normalized estimation error squared.""" - return float(np.mean(np.atleast_1d(normalized_estimation_error_squared(estimates, uncertainties, groundtruths)))) + return float( + np.mean( + np.atleast_1d( + normalized_estimation_error_squared( + estimates, uncertainties, groundtruths + ) + ) + ) + ) -def anees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike) -> float: +def anees( + estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike +) -> float: """Backward-compatible alias for average NEES.""" return average_nees(estimates, uncertainties, groundtruths) @@ -120,7 +163,11 @@ def anees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLik nees = normalized_estimation_error_squared -def normalized_innovation_squared(innovations_or_measurements: ArrayLike, predicted_or_covariances: ArrayLike, innovation_covariances: ArrayLike | None = None): +def normalized_innovation_squared( + innovations_or_measurements: ArrayLike, + predicted_or_covariances: ArrayLike, + innovation_covariances: ArrayLike | None = None, +): """Return normalized innovation squared values. Two call styles are supported: @@ -132,24 +179,51 @@ def normalized_innovation_squared(innovations_or_measurements: ArrayLike, predic innovations = _as_numeric_array(innovations_or_measurements, "innovations") covariances = predicted_or_covariances else: - innovations = error_vectors(innovations_or_measurements, predicted_or_covariances) + innovations = error_vectors( + innovations_or_measurements, predicted_or_covariances + ) covariances = innovation_covariances innovations = _as_sample_matrix(innovations, "innovations") - covariance_stack = _as_covariance_stack(covariances, innovations.shape[0], innovations.shape[1], "innovation_covariances") + covariance_stack = _as_covariance_stack( + covariances, + innovations.shape[0], + innovations.shape[1], + "innovation_covariances", + ) values = _quadratic_forms(innovations, covariance_stack) - return float(values[0]) if innovations.shape[0] == 1 and _is_single_vector(innovations_or_measurements) else values + return ( + float(values[0]) + if innovations.shape[0] == 1 and _is_single_vector(innovations_or_measurements) + else values + ) -def average_nis(innovations_or_measurements: ArrayLike, predicted_or_covariances: ArrayLike, innovation_covariances: ArrayLike | None = None) -> float: +def average_nis( + innovations_or_measurements: ArrayLike, + predicted_or_covariances: ArrayLike, + innovation_covariances: ArrayLike | None = None, +) -> float: """Return average normalized innovation squared.""" - return float(np.mean(np.atleast_1d(normalized_innovation_squared(innovations_or_measurements, predicted_or_covariances, innovation_covariances)))) + return float( + np.mean( + np.atleast_1d( + normalized_innovation_squared( + innovations_or_measurements, + predicted_or_covariances, + innovation_covariances, + ) + ) + ) + ) nis = normalized_innovation_squared anis = average_nis -def chi_square_confidence_bounds(degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def chi_square_confidence_bounds( + degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return two-sided chi-square bounds for an averaged NEES/NIS statistic.""" if int(degrees_of_freedom) <= 0: raise ValueError("degrees_of_freedom must be positive") @@ -164,40 +238,74 @@ def chi_square_confidence_bounds(degrees_of_freedom: int, *, n_samples: int = 1, return float(lower), float(upper) -def chi_square_confidence_interval(degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def chi_square_confidence_interval( + degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`chi_square_confidence_bounds`.""" - return chi_square_confidence_bounds(degrees_of_freedom, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) -def nees_confidence_bounds(state_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nees_confidence_bounds( + state_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return chi-square consistency bounds for NEES or ANEES.""" - return chi_square_confidence_bounds(state_dim, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + state_dim, n_samples=n_samples, confidence=confidence + ) -def nees_confidence_interval(state_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nees_confidence_interval( + state_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`nees_confidence_bounds`.""" return nees_confidence_bounds(state_dim, n_samples=n_samples, confidence=confidence) -def nis_confidence_bounds(measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nis_confidence_bounds( + measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return chi-square consistency bounds for NIS or ANIS.""" - return chi_square_confidence_bounds(measurement_dim, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + measurement_dim, n_samples=n_samples, confidence=confidence + ) -def nis_confidence_interval(measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nis_confidence_interval( + measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`nis_confidence_bounds`.""" - return nis_confidence_bounds(measurement_dim, n_samples=n_samples, confidence=confidence) + return nis_confidence_bounds( + measurement_dim, n_samples=n_samples, confidence=confidence + ) -def is_chi_square_consistent(statistic: float, degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> bool: +def is_chi_square_consistent( + statistic: float, + degrees_of_freedom: int, + *, + n_samples: int = 1, + confidence: float = 0.95, +) -> bool: """Return whether a scalar statistic lies inside chi-square bounds.""" - lower, upper = chi_square_confidence_bounds(degrees_of_freedom, n_samples=n_samples, confidence=confidence) + lower, upper = chi_square_confidence_bounds( + degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) return bool(lower <= float(statistic) <= upper) -def is_within_chi_square_confidence_interval(statistic: float, degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> bool: +def is_within_chi_square_confidence_interval( + statistic: float, + degrees_of_freedom: int, + *, + n_samples: int = 1, + confidence: float = 0.95, +) -> bool: """Alias for :func:`is_chi_square_consistent`.""" - return is_chi_square_consistent(statistic, degrees_of_freedom, n_samples=n_samples, confidence=confidence) + return is_chi_square_consistent( + statistic, degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) def consistency_fraction(values: ArrayLike, lower: float, upper: float) -> float: @@ -205,7 +313,9 @@ def consistency_fraction(values: ArrayLike, lower: float, upper: float) -> float values_array = _as_numeric_array(values, "values") if values_array.size == 0: return 0.0 - return float(np.mean((values_array >= float(lower)) & (values_array <= float(upper)))) + return float( + np.mean((values_array >= float(lower)) & (values_array <= float(upper))) + ) def ospa_distance( @@ -226,14 +336,25 @@ def ospa_distance( if normalizer == 0: return _components_or_distance("ospa", 0.0, 0.0, 0.0, 0, return_components) if n_estimated == 0 or n_reference == 0: - return _components_or_distance("ospa", cutoff, 0.0, cutoff, 0, return_components) + return _components_or_distance( + "ospa", cutoff, 0.0, cutoff, 0, return_components + ) - assignment_cost, assignments = _clipped_assignment_cost(estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn) + assignment_cost, assignments = _clipped_assignment_cost( + estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn + ) cardinality_cost = cutoff**order * abs(n_estimated - n_reference) localization_component = assignment_cost / float(normalizer) cardinality_component = cardinality_cost / float(normalizer) distance = (localization_component + cardinality_component) ** (1.0 / order) - return _components_or_distance("ospa", distance, localization_component ** (1.0 / order), cardinality_component ** (1.0 / order), len(assignments), return_components) + return _components_or_distance( + "ospa", + distance, + localization_component ** (1.0 / order), + cardinality_component ** (1.0 / order), + len(assignments), + return_components, + ) def mospa_distance( @@ -247,9 +368,20 @@ def mospa_distance( ) -> float | tuple[float, np.ndarray]: """Return mean OSPA over a sequence of finite-set estimates.""" if len(estimated_point_sets) != len(reference_point_sets): - raise ValueError("estimated_point_sets and reference_point_sets must have the same length") + raise ValueError( + "estimated_point_sets and reference_point_sets must have the same length" + ) distances = np.asarray( - [ospa_distance(estimated, reference, cutoff=cutoff, order=order, distance_fn=distance_fn) for estimated, reference in zip(estimated_point_sets, reference_point_sets)], + [ + ospa_distance( + estimated, + reference, + cutoff=cutoff, + order=order, + distance_fn=distance_fn, + ) + for estimated, reference in zip(estimated_point_sets, reference_point_sets) + ], dtype=float, ) mean_distance = float(np.mean(distances)) if distances.size else 0.0 @@ -274,10 +406,21 @@ def gospa_distance( if estimated.shape[0] == 0 and reference.shape[0] == 0: return _components_or_distance("gospa", 0.0, 0.0, 0.0, 0, return_components) - assignment_cost, assignments = _clipped_assignment_cost(estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn) - cardinality_cost = cutoff**order / float(alpha) * abs(estimated.shape[0] - reference.shape[0]) + assignment_cost, assignments = _clipped_assignment_cost( + estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn + ) + cardinality_cost = ( + cutoff**order / float(alpha) * abs(estimated.shape[0] - reference.shape[0]) + ) distance = (assignment_cost + cardinality_cost) ** (1.0 / order) - return _components_or_distance("gospa", distance, assignment_cost ** (1.0 / order), cardinality_cost ** (1.0 / order), len(assignments), return_components) + return _components_or_distance( + "gospa", + distance, + assignment_cost ** (1.0 / order), + cardinality_cost ** (1.0 / order), + len(assignments), + return_components, + ) def iou_polygon(polygon1, polygon2): @@ -301,7 +444,14 @@ def extent_intersection_over_union(shape1, shape2) -> float: return eot_shape_iou(shape1, shape2) -def gaussian_wasserstein_distance(mean1: ArrayLike, covariance1: ArrayLike, mean2: ArrayLike, covariance2: ArrayLike, *, squared: bool = False) -> float: +def gaussian_wasserstein_distance( + mean1: ArrayLike, + covariance1: ArrayLike, + mean2: ArrayLike, + covariance2: ArrayLike, + *, + squared: bool = False, +) -> float: """Return the 2-Wasserstein distance between Gaussian distributions.""" mean1_np = _as_numeric_array(mean1, "mean1").reshape(-1) mean2_np = _as_numeric_array(mean2, "mean2").reshape(-1) @@ -319,30 +469,46 @@ def gaussian_wasserstein_distance(mean1: ArrayLike, covariance1: ArrayLike, mean mean_error = mean1_np - mean2_np mean_term = float(mean_error @ mean_error) covariance1_sqrt = _symmetric_matrix_square_root(covariance1_np) - middle_sqrt = _symmetric_matrix_square_root(covariance1_sqrt @ covariance2_np @ covariance1_sqrt) + middle_sqrt = _symmetric_matrix_square_root( + covariance1_sqrt @ covariance2_np @ covariance1_sqrt + ) trace_term = float(np.trace(covariance1_np + covariance2_np - 2.0 * middle_sqrt)) squared_distance = max(mean_term + trace_term, 0.0) return float(squared_distance) if squared else float(np.sqrt(squared_distance)) -def extent_wasserstein_distance(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, squared: bool = False) -> float: +def extent_wasserstein_distance( + estimated_extent: ArrayLike, reference_extent: ArrayLike, *, squared: bool = False +) -> float: """Return the covariance/extent part of the Gaussian 2-Wasserstein distance.""" estimated = _as_numeric_array(estimated_extent, "estimated_extent") reference = _as_numeric_array(reference_extent, "reference_extent") _validate_square_matrix(estimated, "estimated_extent") _validate_square_matrix(reference, "reference_extent") if estimated.shape != reference.shape: - raise ValueError("estimated_extent and reference_extent must have the same shape") + raise ValueError( + "estimated_extent and reference_extent must have the same shape" + ) zeros = np.zeros(estimated.shape[0], dtype=float) - return gaussian_wasserstein_distance(zeros, estimated, zeros, reference, squared=squared) + return gaussian_wasserstein_distance( + zeros, estimated, zeros, reference, squared=squared + ) -def extent_matrix_error(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, ord: str | int | float = "fro", relative: bool = False) -> float: +def extent_matrix_error( + estimated_extent: ArrayLike, + reference_extent: ArrayLike, + *, + ord: str | int | float = "fro", + relative: bool = False, +) -> float: """Return matrix-norm error between estimated and reference extents.""" estimated = _as_numeric_array(estimated_extent, "estimated_extent") reference = _as_numeric_array(reference_extent, "reference_extent") if estimated.shape != reference.shape: - raise ValueError("estimated_extent and reference_extent must have the same shape") + raise ValueError( + "estimated_extent and reference_extent must have the same shape" + ) error = float(np.linalg.norm(estimated - reference, ord=ord)) if not relative: return error @@ -352,9 +518,17 @@ def extent_matrix_error(estimated_extent: ArrayLike, reference_extent: ArrayLike return error / denominator -def extent_error(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, ord: str | int | float = "fro", relative: bool = False) -> float: +def extent_error( + estimated_extent: ArrayLike, + reference_extent: ArrayLike, + *, + ord: str | int | float = "fro", + relative: bool = False, +) -> float: """Alias for :func:`extent_matrix_error`.""" - return extent_matrix_error(estimated_extent, reference_extent, ord=ord, relative=relative) + return extent_matrix_error( + estimated_extent, reference_extent, ord=ord, relative=relative + ) def _as_numeric_array(value: ArrayLike, name: str) -> np.ndarray: @@ -392,7 +566,9 @@ def _is_single_vector(values: ArrayLike) -> bool: return _as_numeric_array(values, "values").ndim == 1 -def _as_covariance_stack(covariances: ArrayLike, n_samples: int, dim: int, name: str) -> np.ndarray: +def _as_covariance_stack( + covariances: ArrayLike, n_samples: int, dim: int, name: str +) -> np.ndarray: covariance_array = _as_numeric_array(covariances, name) if covariance_array.ndim == 2: if covariance_array.shape != (dim, dim): @@ -420,7 +596,9 @@ def _validate_order_cutoff(order: float, cutoff: float) -> tuple[float, float]: return order, cutoff -def _coerce_point_sets(points1: ArrayLike, points2: ArrayLike) -> tuple[np.ndarray, np.ndarray]: +def _coerce_point_sets( + points1: ArrayLike, points2: ArrayLike +) -> tuple[np.ndarray, np.ndarray]: first = _coerce_point_set(points1, "estimated_points") second = _coerce_point_set(points2, "reference_points") if first.shape[0] and second.shape[0] and first.shape[1] != second.shape[1]: @@ -439,7 +617,14 @@ def _coerce_point_set(points: ArrayLike, name: str) -> np.ndarray: return points_np -def _clipped_assignment_cost(estimated: np.ndarray, reference: np.ndarray, *, order: float, cutoff: float, distance_fn: DistanceFunction | None) -> tuple[float, list[tuple[int, int]]]: +def _clipped_assignment_cost( + estimated: np.ndarray, + reference: np.ndarray, + *, + order: float, + cutoff: float, + distance_fn: DistanceFunction | None, +) -> tuple[float, list[tuple[int, int]]]: if estimated.shape[0] == 0 or reference.shape[0] == 0: return 0.0, [] distances = _pairwise_distances(estimated, reference, distance_fn) @@ -449,7 +634,9 @@ def _clipped_assignment_cost(estimated: np.ndarray, reference: np.ndarray, *, or return assignment_cost, [(int(row), int(col)) for row, col in zip(row_ind, col_ind)] -def _pairwise_distances(estimated: np.ndarray, reference: np.ndarray, distance_fn: DistanceFunction | None) -> np.ndarray: +def _pairwise_distances( + estimated: np.ndarray, reference: np.ndarray, distance_fn: DistanceFunction | None +) -> np.ndarray: if distance_fn is None: differences = estimated[:, None, :] - reference[None, :, :] return np.linalg.norm(differences, axis=-1) @@ -460,7 +647,14 @@ def _pairwise_distances(estimated: np.ndarray, reference: np.ndarray, distance_f return distances -def _components_or_distance(name: str, distance: float, localization_component: float, cardinality_component: float, assignments: int, return_components: bool) -> float | dict[str, float | int]: +def _components_or_distance( + name: str, + distance: float, + localization_component: float, + cardinality_component: float, + assignments: int, + return_components: bool, +) -> float | dict[str, float | int]: if not return_components: return float(distance) return { diff --git a/src/pyrecest/utils/track_metrics.py b/src/pyrecest/utils/track_metrics.py index 684298b51..f3e84fc01 100644 --- a/src/pyrecest/utils/track_metrics.py +++ b/src/pyrecest/utils/track_metrics.py @@ -25,7 +25,9 @@ ] -def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) -> dict[str, float | int]: +def score_track_purity( + predicted_track_matrix: Any, reference_track_matrix: Any +) -> dict[str, float | int]: """Return predicted-track identity purity metrics. A predicted track's purity is the fraction of its observations that belong @@ -33,7 +35,9 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) matrix count as impure in ``mean_track_purity`` and ``observation_weighted_track_purity``. """ - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) reference_lookup = _single_observation_lookup(reference) purities: list[float] = [] @@ -51,7 +55,11 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) empty_tracks += 1 continue total_observations += len(observations) - matched_reference_ids = [reference_lookup[observation] for observation in observations if observation in reference_lookup] + matched_reference_ids = [ + reference_lookup[observation] + for observation in observations + if observation in reference_lookup + ] counts = Counter(matched_reference_ids) labeled_count = int(sum(counts.values())) dominant_count = max(counts.values(), default=0) @@ -72,24 +80,38 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) "pure_tracks": int(pure_tracks), "impure_tracks": int(impure_tracks), "mean_track_purity": _mean_or_zero(purities), - "observation_weighted_track_purity": _zero_ratio(dominant_observations, total_observations), + "observation_weighted_track_purity": _zero_ratio( + dominant_observations, total_observations + ), "mean_labeled_track_purity": _mean_or_zero(labeled_purities), - "observation_weighted_labeled_track_purity": _zero_ratio(dominant_observations, labeled_observations), + "observation_weighted_labeled_track_purity": _zero_ratio( + dominant_observations, labeled_observations + ), "unreferenced_predicted_observations": int(unreferenced_observations), - "unreferenced_observation_rate": _zero_ratio(unreferenced_observations, total_observations), + "unreferenced_observation_rate": _zero_ratio( + unreferenced_observations, total_observations + ), } def track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) -> float: """Return observation-weighted predicted-track purity.""" - return float(score_track_purity(predicted_track_matrix, reference_track_matrix)["observation_weighted_track_purity"]) + return float( + score_track_purity(predicted_track_matrix, reference_track_matrix)[ + "observation_weighted_track_purity" + ] + ) -def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> dict[str, float | int]: +def score_false_tracks( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> dict[str, float | int]: """Return metrics for predicted tracks that contain no reference observation.""" if int(min_length) <= 0: raise ValueError("min_length must be positive") - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) reference_lookup = _single_observation_lookup(reference) evaluated_tracks = 0 @@ -102,7 +124,9 @@ def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, if len(observations) < int(min_length): continue evaluated_tracks += 1 - matched = sum(1 for observation in observations if observation in reference_lookup) + matched = sum( + 1 for observation in observations if observation in reference_lookup + ) unreferenced_observations += len(observations) - matched if matched == 0: false_tracks += 1 @@ -114,20 +138,32 @@ def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, "false_track_rate": _zero_ratio(false_tracks, evaluated_tracks), "false_track_observations": int(false_track_observations), "unreferenced_predicted_observations": int(unreferenced_observations), - "unreferenced_predicted_observation_rate": _zero_ratio(unreferenced_observations, total_observations), + "unreferenced_predicted_observation_rate": _zero_ratio( + unreferenced_observations, total_observations + ), } -def false_track_rate(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> float: +def false_track_rate( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> float: """Return the fraction of evaluated predicted tracks that are false.""" - return float(score_false_tracks(predicted_track_matrix, reference_track_matrix, min_length=min_length)["false_track_rate"]) + return float( + score_false_tracks( + predicted_track_matrix, reference_track_matrix, min_length=min_length + )["false_track_rate"] + ) -def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> dict[str, float | int]: +def score_missed_tracks( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> dict[str, float | int]: """Return metrics for reference tracks with no predicted observation support.""" if int(min_length) <= 0: raise ValueError("min_length must be positive") - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) predicted_lookup = _single_observation_lookup(predicted) evaluated_tracks = 0 @@ -140,7 +176,9 @@ def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any if len(observations) < int(min_length): continue evaluated_tracks += 1 - recovered = sum(1 for observation in observations if observation in predicted_lookup) + recovered = sum( + 1 for observation in observations if observation in predicted_lookup + ) missed_observations += len(observations) - recovered if recovered == 0: missed_tracks += 1 @@ -153,18 +191,34 @@ def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any "missed_track_evaluated_reference_tracks": int(evaluated_tracks), "missed_track_rate": _zero_ratio(missed_tracks, evaluated_tracks), "missed_reference_observations": int(missed_observations), - "missed_reference_observation_rate": _zero_ratio(missed_observations, total_observations), + "missed_reference_observation_rate": _zero_ratio( + missed_observations, total_observations + ), } -def missed_track_rate(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> float: +def missed_track_rate( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> float: """Return the fraction of evaluated reference tracks that are missed.""" - return float(score_missed_tracks(predicted_track_matrix, reference_track_matrix, min_length=min_length)["missed_track_rate"]) - - -def track_latencies(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None, missed_value=np.nan) -> np.ndarray: + return float( + score_missed_tracks( + predicted_track_matrix, reference_track_matrix, min_length=min_length + )["missed_track_rate"] + ) + + +def track_latencies( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, + missed_value=np.nan, +) -> np.ndarray: """Return first-detection latency for each non-empty reference track.""" - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) predicted_lookup = _single_observation_lookup(predicted) times = _session_times(reference.shape[1], session_times) values: list[float] = [] @@ -172,17 +226,30 @@ def track_latencies(predicted_track_matrix: Any, reference_track_matrix: Any, *, if not observations: continue first_reference_session = min(session for session, _ in observations) - detected_sessions = [session for session, observation in observations if (session, observation) in predicted_lookup] + detected_sessions = [ + session + for session, observation in observations + if (session, observation) in predicted_lookup + ] if not detected_sessions: values.append(float(missed_value)) continue - values.append(float(times[min(detected_sessions)] - times[first_reference_session])) + values.append( + float(times[min(detected_sessions)] - times[first_reference_session]) + ) return np.asarray(values, dtype=float) -def score_track_latency(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None) -> dict[str, float | int]: +def score_track_latency( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, +) -> dict[str, float | int]: """Return aggregate first-detection latency metrics.""" - values = track_latencies(predicted_track_matrix, reference_track_matrix, session_times=session_times) + values = track_latencies( + predicted_track_matrix, reference_track_matrix, session_times=session_times + ) detected_values = values[np.isfinite(values)] return { "latency_reference_tracks": int(values.size), @@ -190,32 +257,58 @@ def score_track_latency(predicted_track_matrix: Any, reference_track_matrix: Any "latency_missed_tracks": int(values.size - detected_values.size), "track_detection_rate": _zero_ratio(detected_values.size, values.size), "mean_track_latency": _mean_or_zero(detected_values), - "median_track_latency": float(np.median(detected_values)) if detected_values.size else 0.0, - "max_track_latency": float(np.max(detected_values)) if detected_values.size else 0.0, + "median_track_latency": ( + float(np.median(detected_values)) if detected_values.size else 0.0 + ), + "max_track_latency": ( + float(np.max(detected_values)) if detected_values.size else 0.0 + ), } -def score_track_outcomes(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None) -> dict[str, float | int]: +def score_track_outcomes( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, +) -> dict[str, float | int]: """Return fragmentation, purity, false-track, missed-track, and latency metrics.""" scores: dict[str, float | int] = {} - scores.update(score_track_fragmentation(predicted_track_matrix, reference_track_matrix)) + scores.update( + score_track_fragmentation(predicted_track_matrix, reference_track_matrix) + ) scores.update(score_track_purity(predicted_track_matrix, reference_track_matrix)) scores.update(score_false_tracks(predicted_track_matrix, reference_track_matrix)) scores.update(score_missed_tracks(predicted_track_matrix, reference_track_matrix)) - scores.update(score_track_latency(predicted_track_matrix, reference_track_matrix, session_times=session_times)) + scores.update( + score_track_latency( + predicted_track_matrix, reference_track_matrix, session_times=session_times + ) + ) return scores -def _normalized_pair(predicted_track_matrix: Any, reference_track_matrix: Any) -> tuple[np.ndarray, np.ndarray]: +def _normalized_pair( + predicted_track_matrix: Any, reference_track_matrix: Any +) -> tuple[np.ndarray, np.ndarray]: predicted = normalize_track_matrix(predicted_track_matrix) reference = normalize_track_matrix(reference_track_matrix) if predicted.shape[1] != reference.shape[1]: - raise ValueError("Predicted and reference matrices must have the same number of sessions") + raise ValueError( + "Predicted and reference matrices must have the same number of sessions" + ) return predicted, reference def _observations_by_track(matrix: np.ndarray) -> list[list[Observation]]: - return [[(int(session_idx), int(value)) for session_idx, value in enumerate(row) if value is not None] for row in matrix] + return [ + [ + (int(session_idx), int(value)) + for session_idx, value in enumerate(row) + if value is not None + ] + for row in matrix + ] def _single_observation_lookup(matrix: np.ndarray) -> dict[Observation, int]: @@ -227,12 +320,16 @@ def _single_observation_lookup(matrix: np.ndarray) -> dict[Observation, int]: return lookup -def _session_times(n_sessions: int, session_times: Sequence[float] | None) -> np.ndarray: +def _session_times( + n_sessions: int, session_times: Sequence[float] | None +) -> np.ndarray: if session_times is None: return np.arange(n_sessions, dtype=float) times = np.asarray(session_times, dtype=float) if times.shape != (n_sessions,): - raise ValueError("session_times must have length equal to the number of sessions") + raise ValueError( + "session_times must have length equal to the number of sessions" + ) return times diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 5c96bfe47..614d928da 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -90,13 +90,19 @@ def test_nees_nis_and_bounds(self): groundtruths = array([[0.0, 0.0], [0.0, 0.0]]) covariances = array([np.eye(2), np.eye(2)]) - npt.assert_allclose(nees(estimates, covariances, groundtruths), np.array([2.0, 4.0])) + npt.assert_allclose( + nees(estimates, covariances, groundtruths), np.array([2.0, 4.0]) + ) self.assertEqual(anees(estimates, covariances, groundtruths), 3.0) innovations = array([[1.0, 0.0], [0.0, 2.0]]) innovation_covariances = array([np.eye(2), 2.0 * np.eye(2)]) - npt.assert_allclose(nis(innovations, innovation_covariances), np.array([1.0, 2.0])) - npt.assert_allclose(nis(innovations, groundtruths, innovation_covariances), np.array([1.0, 2.0])) + npt.assert_allclose( + nis(innovations, innovation_covariances), np.array([1.0, 2.0]) + ) + npt.assert_allclose( + nis(innovations, groundtruths, innovation_covariances), np.array([1.0, 2.0]) + ) self.assertEqual(anis(innovations, innovation_covariances), 1.5) lower, upper = nees_confidence_bounds(2, n_samples=2) @@ -105,16 +111,27 @@ def test_nees_nis_and_bounds(self): self.assertTrue(is_chi_square_consistent(3.0, 2, n_samples=2)) nis_lower, nis_upper = nis_confidence_bounds(2, n_samples=2) self.assertEqual((lower, upper), (nis_lower, nis_upper)) - self.assertEqual(consistency_fraction([lower - 1.0, 2.0, upper + 1.0], lower, upper), 1.0 / 3.0) + self.assertEqual( + consistency_fraction([lower - 1.0, 2.0, upper + 1.0], lower, upper), + 1.0 / 3.0, + ) class TestSetDistances(unittest.TestCase): def test_ospa_distance(self): estimates = array([[0.0], [2.0]]) groundtruths = array([[0.0], [3.0]]) - npt.assert_allclose(ospa_distance(estimates, groundtruths, cutoff=10.0, order=2.0), np.sqrt(0.5)) + npt.assert_allclose( + ospa_distance(estimates, groundtruths, cutoff=10.0, order=2.0), np.sqrt(0.5) + ) - cardinality_case = ospa_distance(array([[0.0]]), array([[0.0], [3.0]]), cutoff=2.0, order=1.0, return_components=True) + cardinality_case = ospa_distance( + array([[0.0]]), + array([[0.0], [3.0]]), + cutoff=2.0, + order=1.0, + return_components=True, + ) self.assertEqual(cardinality_case["ospa"], 1.0) self.assertEqual(cardinality_case["localization"], 0.0) self.assertEqual(cardinality_case["cardinality"], 1.0) @@ -123,13 +140,29 @@ def test_ospa_distance(self): def test_mospa_distance(self): estimated_sets = [array([[0.0]]), array([[1.0]])] groundtruth_sets = [array([[0.0]]), array([[3.0]])] - self.assertEqual(mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0), 1.0) - mean_distance, per_step = mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0, return_per_step=True) + self.assertEqual( + mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0), + 1.0, + ) + mean_distance, per_step = mospa_distance( + estimated_sets, + groundtruth_sets, + cutoff=10.0, + order=1.0, + return_per_step=True, + ) self.assertEqual(mean_distance, 1.0) npt.assert_allclose(per_step, np.array([0.0, 2.0])) def test_gospa_distance(self): - result = gospa_distance(array([[0.0]]), array([[0.0], [4.0]]), cutoff=2.0, order=1.0, alpha=2.0, return_components=True) + result = gospa_distance( + array([[0.0]]), + array([[0.0], [4.0]]), + cutoff=2.0, + order=1.0, + alpha=2.0, + return_components=True, + ) self.assertEqual(result["gospa"], 1.0) self.assertEqual(result["localization"], 0.0) self.assertEqual(result["cardinality"], 1.0) @@ -142,7 +175,9 @@ def test_gaussian_wasserstein_and_extent_error(self): cov = array(np.eye(2)) self.assertEqual(gaussian_wasserstein_distance(mean, cov, mean, cov), 0.0) self.assertEqual(extent_wasserstein_distance(cov, cov), 0.0) - npt.assert_allclose(gaussian_wasserstein_distance(mean, cov, array([1.0, 0.0]), cov), 1.0) + npt.assert_allclose( + gaussian_wasserstein_distance(mean, cov, array([1.0, 0.0]), cov), 1.0 + ) npt.assert_allclose(extent_matrix_error(cov, 2.0 * cov), np.sqrt(2.0)) npt.assert_allclose(extent_error(cov, 2.0 * cov, relative=True), 0.5) diff --git a/tests/test_track_metrics.py b/tests/test_track_metrics.py index 57223c164..80cf4de0a 100644 --- a/tests/test_track_metrics.py +++ b/tests/test_track_metrics.py @@ -4,7 +4,6 @@ import numpy as np import numpy.testing as npt - from pyrecest.utils.track_metrics import ( false_track_rate, missed_track_rate, @@ -46,8 +45,12 @@ def test_track_purity_and_detection(self): npt.assert_allclose(missed_track_rate(self.predicted, self.reference), 0.0) def test_track_latency_and_combined_scores(self): - npt.assert_allclose(track_latencies(self.predicted, self.reference), np.array([0.0, 0.0])) - summary = score_track_latency(self.predicted, self.reference, session_times=[0.0, 5.0, 9.0]) + npt.assert_allclose( + track_latencies(self.predicted, self.reference), np.array([0.0, 0.0]) + ) + summary = score_track_latency( + self.predicted, self.reference, session_times=[0.0, 5.0, 9.0] + ) self.assertEqual(summary["latency_detected_tracks"], 2) self.assertEqual(summary["mean_track_latency"], 0.0)