From b4f5c2039ae0f0293986e9e7712acb5ffa56dc68 Mon Sep 17 00:00:00 2001 From: FlorianPfaff <6773539+FlorianPfaff@users.noreply.github.com> Date: Sun, 10 May 2026 14:31:15 +0000 Subject: [PATCH] [MegaLinter] Apply linters automatic fixes --- .../filters/association_hypotheses.py | 129 ++++++-- src/pyrecest/filters/out_of_sequence.py | 81 ++++- src/pyrecest/models/motion_models.py | 112 +++++-- src/pyrecest/models/sensor_models.py | 4 +- src/pyrecest/utils/__init__.py | 2 +- src/pyrecest/utils/metrics.py | 300 ++++++++++++++---- src/pyrecest/utils/track_metrics.py | 171 +++++++--- tests/test_dynamic_models.py | 66 +++- tests/test_metrics.py | 55 +++- tests/test_track_metrics.py | 9 +- 10 files changed, 741 insertions(+), 188 deletions(-) diff --git a/src/pyrecest/filters/association_hypotheses.py b/src/pyrecest/filters/association_hypotheses.py index 726c59eb6..33d0bb6c1 100644 --- a/src/pyrecest/filters/association_hypotheses.py +++ b/src/pyrecest/filters/association_hypotheses.py @@ -91,7 +91,9 @@ def missed_detection_hypothesis( ) -def hypothesis_cost(hypothesis: AssociationHypothesis, *, missing_cost: float = np.inf) -> float: +def hypothesis_cost( + hypothesis: AssociationHypothesis, *, missing_cost: float = np.inf +) -> float: """Return a scalar minimization cost for a hypothesis.""" if hypothesis.cost is not None: return float(hypothesis.cost) @@ -110,7 +112,13 @@ def hypothesis_cost(hypothesis: AssociationHypothesis, *, missing_cost: float = class NISGate: """Gate association hypotheses by normalized innovation squared.""" - def __init__(self, threshold: float | None = None, *, measurement_dim: int | None = None, confidence: float | None = None): + def __init__( + self, + threshold: float | None = None, + *, + measurement_dim: int | None = None, + confidence: float | None = None, + ): if threshold is None: if measurement_dim is None or confidence is None: raise ValueError( @@ -143,7 +151,10 @@ def __init__(self, threshold: float, *, missing_cost: float = np.inf): def accepts(self, hypothesis: AssociationHypothesis) -> bool: if hypothesis.is_missed_detection: return True - return hypothesis_cost(hypothesis, missing_cost=self.missing_cost) <= self.threshold + return ( + hypothesis_cost(hypothesis, missing_cost=self.missing_cost) + <= self.threshold + ) def __call__(self, hypothesis: AssociationHypothesis) -> bool: return self.accepts(hypothesis) @@ -176,7 +187,9 @@ def __call__(self, hypothesis: AssociationHypothesis) -> bool: class TopKGate: """Keep the best ``k`` hypotheses per track or per measurement.""" - def __init__(self, k: int, *, mode: GateMode = "track", missing_cost: float = np.inf): + def __init__( + self, k: int, *, mode: GateMode = "track", missing_cost: float = np.inf + ): self.k = int(k) if self.k <= 0: raise ValueError("k must be positive") @@ -185,7 +198,9 @@ def __init__(self, k: int, *, mode: GateMode = "track", missing_cost: float = np self.mode = mode self.missing_cost = float(missing_cost) - def filter(self, hypotheses: Sequence[AssociationHypothesis]) -> list[AssociationHypothesis]: + def filter( + self, hypotheses: Sequence[AssociationHypothesis] + ) -> list[AssociationHypothesis]: """Return hypotheses accepted by the top-k rule.""" accepted_keys = set() grouped: dict[int, list[AssociationHypothesis]] = defaultdict(list) @@ -194,23 +209,34 @@ def filter(self, hypotheses: Sequence[AssociationHypothesis]) -> list[Associatio if hypothesis.is_missed_detection: missed.append(hypothesis) continue - key = hypothesis.track_index if self.mode == "track" else _measurement_index(hypothesis) + key = ( + hypothesis.track_index + if self.mode == "track" + else _measurement_index(hypothesis) + ) grouped[key].append(hypothesis) for group in grouped.values(): sorted_group = sorted( group, - key=lambda hypothesis: hypothesis_cost(hypothesis, missing_cost=self.missing_cost), + key=lambda hypothesis: hypothesis_cost( + hypothesis, missing_cost=self.missing_cost + ), ) for hypothesis in sorted_group[: self.k]: - accepted_keys.add((hypothesis.track_index, hypothesis.measurement_index)) + accepted_keys.add( + (hypothesis.track_index, hypothesis.measurement_index) + ) result = [] for hypothesis in hypotheses: if hypothesis.is_missed_detection: result.append(hypothesis) continue - accepted = (hypothesis.track_index, hypothesis.measurement_index) in accepted_keys + accepted = ( + hypothesis.track_index, + hypothesis.measurement_index, + ) in accepted_keys result.append( hypothesis.with_acceptance( accepted, @@ -222,7 +248,9 @@ def filter(self, hypotheses: Sequence[AssociationHypothesis]) -> list[Associatio def accepts(self, hypothesis: AssociationHypothesis) -> bool: raise TypeError("TopKGate operates on a collection; use filter(...)") - def __call__(self, hypotheses: Sequence[AssociationHypothesis]) -> list[AssociationHypothesis]: + def __call__( + self, hypotheses: Sequence[AssociationHypothesis] + ) -> list[AssociationHypothesis]: return self.filter(hypotheses) @@ -242,7 +270,9 @@ def gate_hypotheses( result = [] for hypothesis in hypotheses: - accepted = bool(gate(hypothesis) if callable(gate) else gate.accepts(hypothesis)) + accepted = bool( + gate(hypothesis) if callable(gate) else gate.accepts(hypothesis) + ) reason = None if accepted else reject_reason or type(gate).__name__ result.append(hypothesis.with_acceptance(accepted, reason)) return result @@ -257,7 +287,11 @@ def filter_hypotheses( """Apply one or more gates and optionally drop rejected hypotheses.""" result = list(hypotheses) if gates is None: - return [hypothesis for hypothesis in result if hypothesis.accepted] if accepted_only else result + return ( + [hypothesis for hypothesis in result if hypothesis.accepted] + if accepted_only + else result + ) if not isinstance(gates, (list, tuple)): gates = [gates] for gate in gates: @@ -279,9 +313,13 @@ def linear_gaussian_association_hypotheses( metadata_builder: Callable[..., dict[str, Any] | None] | None = None, ) -> list[AssociationHypothesis]: """Build Gaussian innovation hypotheses for tracks and measurements.""" - measurement_vectors = _coerce_measurements(measurements, measurement_axis=measurement_axis) + measurement_vectors = _coerce_measurements( + measurements, measurement_axis=measurement_axis + ) measurement_matrix = np.asarray(measurement_matrix, dtype=float) - covariance_stack = _coerce_measurement_covariances(meas_noise, len(measurement_vectors)) + covariance_stack = _coerce_measurement_covariances( + meas_noise, len(measurement_vectors) + ) hypotheses = [] for track_index, track in enumerate(tracks): @@ -295,7 +333,9 @@ def linear_gaussian_association_hypotheses( measurement_matrix, covariance, ) - nis = float(normalized_innovation_squared(innovation, innovation_covariance)) + nis = float( + normalized_innovation_squared(innovation, innovation_covariance) + ) measurement_dim = int(measurement_matrix.shape[0]) log_likelihood = _linear_gaussian_log_likelihood( innovation, @@ -329,7 +369,9 @@ def linear_gaussian_association_hypotheses( ) if gates is not None: - hypotheses = filter_hypotheses(hypotheses, gates, accepted_only=not include_rejected) + hypotheses = filter_hypotheses( + hypotheses, gates, accepted_only=not include_rejected + ) elif not include_rejected: hypotheses = [hypothesis for hypothesis in hypotheses if hypothesis.accepted] return hypotheses @@ -435,7 +477,9 @@ def association_result_from_hypotheses( unassigned_measurement_cost: float | Sequence[float] | None = None, ): """Solve GNN assignment from hypotheses and return ``AssociationResult``.""" - from .track_manager import solve_global_nearest_neighbor # pylint: disable=import-outside-toplevel + from .track_manager import ( # pylint: disable=import-outside-toplevel + solve_global_nearest_neighbor, + ) cost_matrix = hypotheses_to_cost_matrix( hypotheses, @@ -474,10 +518,19 @@ def associator(tracks, measurements, **kwargs): return association_result_from_hypotheses( hypotheses, num_tracks=len(tracks), - num_measurements=len(_coerce_measurements(measurements, measurement_axis=kwargs.get("measurement_axis", measurement_axis))), + num_measurements=len( + _coerce_measurements( + measurements, + measurement_axis=kwargs.get("measurement_axis", measurement_axis), + ) + ), missing_cost=kwargs.get("missing_cost", missing_cost), - unassigned_track_cost=kwargs.get("unassigned_track_cost", unassigned_track_cost), - unassigned_measurement_cost=kwargs.get("unassigned_measurement_cost", unassigned_measurement_cost), + unassigned_track_cost=kwargs.get( + "unassigned_track_cost", unassigned_track_cost + ), + unassigned_measurement_cost=kwargs.get( + "unassigned_measurement_cost", unassigned_measurement_cost + ), ) return associator @@ -510,24 +563,36 @@ def _track_filter_state(track): return track.filter_state if hasattr(track, "single_target_filter"): return track.single_target_filter.filter_state - raise TypeError("track must expose filter_state or single_target_filter.filter_state") + raise TypeError( + "track must expose filter_state or single_target_filter.filter_state" + ) -def _coerce_measurements(measurements, *, measurement_axis: MeasurementAxis = "auto") -> list[np.ndarray]: +def _coerce_measurements( + measurements, *, measurement_axis: MeasurementAxis = "auto" +) -> list[np.ndarray]: if isinstance(measurements, np.ndarray): - return _coerce_measurement_array(measurements, measurement_axis=measurement_axis) + return _coerce_measurement_array( + measurements, measurement_axis=measurement_axis + ) try: from pyrecest.backend import to_numpy # pylint: disable=import-outside-toplevel maybe_array = to_numpy(measurements) if isinstance(maybe_array, np.ndarray): - return _coerce_measurement_array(maybe_array, measurement_axis=measurement_axis) + return _coerce_measurement_array( + maybe_array, measurement_axis=measurement_axis + ) except (ImportError, AttributeError, TypeError): pass - return [np.asarray(measurement, dtype=float).reshape(-1) for measurement in measurements] + return [ + np.asarray(measurement, dtype=float).reshape(-1) for measurement in measurements + ] -def _coerce_measurement_array(array, *, measurement_axis: MeasurementAxis) -> list[np.ndarray]: +def _coerce_measurement_array( + array, *, measurement_axis: MeasurementAxis +) -> list[np.ndarray]: array = np.asarray(array, dtype=float) if array.ndim == 1: return [array.reshape(-1)] @@ -540,13 +605,17 @@ def _coerce_measurement_array(array, *, measurement_axis: MeasurementAxis) -> li if measurement_axis == "sequence": return [array[index].reshape(-1) for index in range(array.shape[0])] if measurement_axis != "auto": - raise ValueError("measurement_axis must be 'auto', 'columns', 'rows', or 'sequence'") + raise ValueError( + "measurement_axis must be 'auto', 'columns', 'rows', or 'sequence'" + ) if array.shape[0] <= array.shape[1]: return [array[:, index].reshape(-1) for index in range(array.shape[1])] return [array[index, :].reshape(-1) for index in range(array.shape[0])] -def _coerce_measurement_covariances(meas_noise, num_measurements: int) -> list[np.ndarray]: +def _coerce_measurement_covariances( + meas_noise, num_measurements: int +) -> list[np.ndarray]: covariance = np.asarray(meas_noise, dtype=float) if covariance.ndim == 2: return [covariance for _ in range(num_measurements)] @@ -558,7 +627,9 @@ def _coerce_measurement_covariances(meas_noise, num_measurements: int) -> list[n raise ValueError("meas_noise must have shape (m, m), (m, m, n), or (n, m, m)") -def _linear_gaussian_log_likelihood(innovation, innovation_covariance, measurement_dim: int) -> float: +def _linear_gaussian_log_likelihood( + innovation, innovation_covariance, measurement_dim: int +) -> float: innovation = np.asarray(innovation, dtype=float).reshape(-1) innovation_covariance = np.asarray(innovation_covariance, dtype=float) sign, logdet = np.linalg.slogdet(innovation_covariance) diff --git a/src/pyrecest/filters/out_of_sequence.py b/src/pyrecest/filters/out_of_sequence.py index aa7820d27..697db7265 100644 --- a/src/pyrecest/filters/out_of_sequence.py +++ b/src/pyrecest/filters/out_of_sequence.py @@ -130,12 +130,16 @@ def latest_at_or_before(self, time): def items_after(self, time): """Return all records with timestamp strictly greater than ``time``.""" query_time = _coerce_time(time) - return tuple(self._copy_item(item) for item in self._items if item.time > query_time) + return tuple( + self._copy_item(item) for item in self._items if item.time > query_time + ) def items_at_or_after(self, time): """Return all records with timestamp greater than or equal to ``time``.""" query_time = _coerce_time(time) - return tuple(self._copy_item(item) for item in self._items if item.time >= query_time) + return tuple( + self._copy_item(item) for item in self._items if item.time >= query_time + ) def _copy_item(self, item): return TimestampedItem( @@ -169,7 +173,9 @@ class MeasurementTimeBuffer: """Small helper for tracking measurement timestamps and OOSM status.""" def __init__(self, max_lag=None, maxlen=None, *, copy_values=True): - self._buffer = FixedLagBuffer(max_lag=max_lag, maxlen=maxlen, copy_values=copy_values) + self._buffer = FixedLagBuffer( + max_lag=max_lag, maxlen=maxlen, copy_values=copy_values + ) def __len__(self): return len(self._buffer) @@ -301,7 +307,9 @@ def _insert_and_apply(self, time, method_name, args=(), kwargs=None): if out_of_sequence: captured_result = self._replay(capture_event=event) - replayed_event_count = len([item for item in self._events if item.time >= event.time]) + replayed_event_count = len( + [item for item in self._events if item.time >= event.time] + ) else: captured_result = self._apply_event(event) self._latest_time = max(self._latest_time, event_time) @@ -309,7 +317,9 @@ def _insert_and_apply(self, time, method_name, args=(), kwargs=None): self._trim_to_lag() diagnostics = captured_result if isinstance(captured_result, dict) else None - accepted = True if diagnostics is None else bool(diagnostics.get("accepted", True)) + accepted = ( + True if diagnostics is None else bool(diagnostics.get("accepted", True)) + ) return OutOfSequenceResult( time=event_time, final_time=self._latest_time, @@ -362,12 +372,18 @@ class OutOfSequenceKalmanUpdater(_EventReplayMixin): """Fixed-lag OOSM processor for :class:`KalmanFilter`.""" def __init__(self, kalman_filter, initial_time=0.0, max_lag=None): - filter_object = kalman_filter if isinstance(kalman_filter, KalmanFilter) else KalmanFilter(kalman_filter) + filter_object = ( + kalman_filter + if isinstance(kalman_filter, KalmanFilter) + else KalmanFilter(kalman_filter) + ) self._setup_replay(filter_object, initial_time=initial_time, max_lag=max_lag) def predict_linear(self, time, system_matrix, sys_noise_cov, sys_input=None): """Record/apply a timestamped linear-Gaussian prediction.""" - return self._insert_and_apply(time, "predict_linear", (system_matrix, sys_noise_cov, sys_input)) + return self._insert_and_apply( + time, "predict_linear", (system_matrix, sys_noise_cov, sys_input) + ) def predict_model(self, time, transition_model): """Record/apply a timestamped structural transition-model prediction.""" @@ -389,7 +405,11 @@ def update_linear( time, "update_linear", (measurement, measurement_matrix, meas_noise), - {"return_diagnostics": return_diagnostics, "scale": scale, "action": action}, + { + "return_diagnostics": return_diagnostics, + "scale": scale, + "action": action, + }, ) def update_linear_robust( @@ -421,18 +441,33 @@ def update_linear_robust( }, ) - def update_model(self, time, measurement_model, measurement, *, return_diagnostics=False, scale=1.0, action="updated"): + def update_model( + self, + time, + measurement_model, + measurement, + *, + return_diagnostics=False, + scale=1.0, + action="updated", + ): """Record/apply a timestamped structural measurement-model update.""" return self._insert_and_apply( time, "update_model", (measurement_model, measurement), - {"return_diagnostics": return_diagnostics, "scale": scale, "action": action}, + { + "return_diagnostics": return_diagnostics, + "scale": scale, + "action": action, + }, ) def update_model_robust(self, time, measurement_model, measurement, **kwargs): """Record/apply a timestamped robust structural measurement-model update.""" - return self._insert_and_apply(time, "update_model_robust", (measurement_model, measurement), kwargs) + return self._insert_and_apply( + time, "update_model_robust", (measurement_model, measurement), kwargs + ) class OutOfSequenceParticleUpdater(_EventReplayMixin): @@ -461,18 +496,28 @@ def predict_nonlinear( shift_instead_of_add=None, ): """Record/apply a timestamped nonlinear particle prediction.""" - kwargs = {"noise_distribution": noise_distribution, "function_is_vectorized": function_is_vectorized} + kwargs = { + "noise_distribution": noise_distribution, + "function_is_vectorized": function_is_vectorized, + } if shift_instead_of_add is not None: kwargs["shift_instead_of_add"] = shift_instead_of_add return self._insert_and_apply(time, "predict_nonlinear", (f,), kwargs) def update_model(self, time, measurement_model, measurement=None): """Record/apply a timestamped particle measurement-model update.""" - return self._insert_and_apply(time, "update_model", (measurement_model,), {"measurement": measurement}) + return self._insert_and_apply( + time, "update_model", (measurement_model,), {"measurement": measurement} + ) def update_nonlinear_using_likelihood(self, time, likelihood, measurement=None): """Record/apply a timestamped likelihood-based particle update.""" - return self._insert_and_apply(time, "update_nonlinear_using_likelihood", (likelihood,), {"measurement": measurement}) + return self._insert_and_apply( + time, + "update_nonlinear_using_likelihood", + (likelihood,), + {"measurement": measurement}, + ) update_with_likelihood = update_nonlinear_using_likelihood @@ -527,7 +572,9 @@ def retrodict_linear_gaussian( effective_covariance = covariance if remove_process_noise: if sys_noise_cov is None: - raise ValueError("sys_noise_cov is required when remove_process_noise is true") + raise ValueError( + "sys_noise_cov is required when remove_process_noise is true" + ) sys_noise_cov = _as_matrix(sys_noise_cov, "sys_noise_cov") if sys_noise_cov.shape != covariance.shape: raise ValueError("sys_noise_cov has incompatible shape") @@ -559,4 +606,6 @@ def retrodict_linear_gaussian_state( sys_noise_cov=sys_noise_cov, remove_process_noise=remove_process_noise, ) - return GaussianDistribution(previous_mean, previous_covariance, check_validity=False) + return GaussianDistribution( + previous_mean, previous_covariance, check_validity=False + ) diff --git a/src/pyrecest/models/motion_models.py b/src/pyrecest/models/motion_models.py index 2ce1fdd31..11874a048 100644 --- a/src/pyrecest/models/motion_models.py +++ b/src/pyrecest/models/motion_models.py @@ -12,13 +12,13 @@ from typing import Any import numpy as np -from scipy.linalg import expm # pylint: disable=no-name-in-module,no-member,too-many-arguments,too-many-positional-arguments from pyrecest.backend import abs as _abs from pyrecest.backend import asarray, cos, sin, stack, where, zeros from pyrecest.models.additive_noise import AdditiveNoiseTransitionModel from pyrecest.models.linear_gaussian import LinearGaussianTransitionModel +from scipy.linalg import expm __all__ = [ "constant_acceleration_model", @@ -48,7 +48,9 @@ ] -def kinematic_transition_matrix(dt: float, spatial_dim: int = 2, derivative_order: int = 1): +def kinematic_transition_matrix( + dt: float, spatial_dim: int = 2, derivative_order: int = 1 +): """Return a block kinematic transition matrix. ``derivative_order=1`` yields constant velocity states ``[p, v]``; @@ -140,26 +142,51 @@ def integrated_white_noise_covariance( covariance[ _state_index(derivative_row, axis, int(spatial_dim)), _state_index(derivative_col, axis, int(spatial_dim)), - ] = float(density) * coefficient + ] = ( + float(density) * coefficient + ) return asarray(covariance) -def white_noise_acceleration_covariance(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0): +def white_noise_acceleration_covariance( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +): """Return white-noise-acceleration covariance for constant-velocity states.""" - return integrated_white_noise_covariance(dt, spatial_dim=spatial_dim, derivative_order=1, spectral_density=spectral_density) + return integrated_white_noise_covariance( + dt, + spatial_dim=spatial_dim, + derivative_order=1, + spectral_density=spectral_density, + ) -def white_noise_jerk_covariance(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0): +def white_noise_jerk_covariance( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +): """Return white-noise-jerk covariance for constant-acceleration states.""" - return integrated_white_noise_covariance(dt, spatial_dim=spatial_dim, derivative_order=2, spectral_density=spectral_density) + return integrated_white_noise_covariance( + dt, + spatial_dim=spatial_dim, + derivative_order=2, + spectral_density=spectral_density, + ) -def white_noise_snap_covariance(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0): +def white_noise_snap_covariance( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +): """Return white-noise-snap covariance for constant-jerk states.""" - return integrated_white_noise_covariance(dt, spatial_dim=spatial_dim, derivative_order=3, spectral_density=spectral_density) + return integrated_white_noise_covariance( + dt, + spatial_dim=spatial_dim, + derivative_order=3, + spectral_density=spectral_density, + ) -def constant_velocity_model(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0) -> LinearGaussianTransitionModel: +def constant_velocity_model( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +) -> LinearGaussianTransitionModel: """Return a linear Gaussian constant-velocity transition model.""" return LinearGaussianTransitionModel( constant_velocity_transition_matrix(dt, spatial_dim=spatial_dim), @@ -169,7 +196,9 @@ def constant_velocity_model(dt: float, spatial_dim: int = 2, spectral_density: f ) -def constant_acceleration_model(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0) -> LinearGaussianTransitionModel: +def constant_acceleration_model( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +) -> LinearGaussianTransitionModel: """Return a linear Gaussian constant-acceleration transition model.""" return LinearGaussianTransitionModel( constant_acceleration_transition_matrix(dt, spatial_dim=spatial_dim), @@ -179,7 +208,9 @@ def constant_acceleration_model(dt: float, spatial_dim: int = 2, spectral_densit ) -def constant_jerk_model(dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0) -> LinearGaussianTransitionModel: +def constant_jerk_model( + dt: float, spatial_dim: int = 2, spectral_density: float | np.ndarray = 1.0 +) -> LinearGaussianTransitionModel: """Return a linear Gaussian constant-jerk transition model.""" return LinearGaussianTransitionModel( constant_jerk_transition_matrix(dt, spatial_dim=spatial_dim), @@ -202,7 +233,10 @@ def continuous_to_discrete_lti( zero matrix. """ continuous_matrix_np = np.asarray(continuous_matrix, dtype=float) - if continuous_matrix_np.ndim != 2 or continuous_matrix_np.shape[0] != continuous_matrix_np.shape[1]: + if ( + continuous_matrix_np.ndim != 2 + or continuous_matrix_np.shape[0] != continuous_matrix_np.shape[1] + ): raise ValueError("continuous_matrix must be square") dim = continuous_matrix_np.shape[0] transition = expm(continuous_matrix_np * float(dt)) @@ -261,7 +295,12 @@ def singer_transition_matrix(dt: float, spatial_dim: int = 2, tau: float = 20.0) return asarray(matrix) -def singer_process_noise_covariance(dt: float, spatial_dim: int = 2, tau: float = 20.0, acceleration_variance: float | np.ndarray = 1.0): +def singer_process_noise_covariance( + dt: float, + spatial_dim: int = 2, + tau: float = 20.0, + acceleration_variance: float | np.ndarray = 1.0, +): """Return Singer process noise covariance via Van Loan discretization.""" if float(tau) <= 0.0: raise ValueError("tau must be positive") @@ -272,7 +311,9 @@ def singer_process_noise_covariance(dt: float, spatial_dim: int = 2, tau: float elif acceleration_variance_array.shape == (int(spatial_dim),): variances = acceleration_variance_array else: - raise ValueError("acceleration_variance must be scalar or have shape (spatial_dim,)") + raise ValueError( + "acceleration_variance must be scalar or have shape (spatial_dim,)" + ) continuous_block = np.array( [[0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [0.0, 0.0, -alpha]], @@ -297,7 +338,12 @@ def singer_process_noise_covariance(dt: float, spatial_dim: int = 2, tau: float return asarray(covariance) -def singer_model(dt: float, spatial_dim: int = 2, tau: float = 20.0, acceleration_variance: float | np.ndarray = 1.0) -> LinearGaussianTransitionModel: +def singer_model( + dt: float, + spatial_dim: int = 2, + tau: float = 20.0, + acceleration_variance: float | np.ndarray = 1.0, +) -> LinearGaussianTransitionModel: """Return a linear Gaussian Singer acceleration transition model.""" return LinearGaussianTransitionModel( singer_transition_matrix(dt, spatial_dim=spatial_dim, tau=tau), @@ -319,8 +365,16 @@ def coordinated_turn_transition(state, dt: float = 1.0, turn_threshold: float = cos_omega_dt = cos(omega_dt) safe_omega = where(_abs(omega) < float(turn_threshold), 1.0, omega) - turn_x = x_pos + sin_omega_dt / safe_omega * x_vel - (1.0 - cos_omega_dt) / safe_omega * y_vel - turn_y = y_pos + (1.0 - cos_omega_dt) / safe_omega * x_vel + sin_omega_dt / safe_omega * y_vel + turn_x = ( + x_pos + + sin_omega_dt / safe_omega * x_vel + - (1.0 - cos_omega_dt) / safe_omega * y_vel + ) + turn_y = ( + y_pos + + (1.0 - cos_omega_dt) / safe_omega * x_vel + + sin_omega_dt / safe_omega * y_vel + ) turn_vx = cos_omega_dt * x_vel - sin_omega_dt * y_vel turn_vy = sin_omega_dt * x_vel + cos_omega_dt * y_vel @@ -338,7 +392,9 @@ def coordinated_turn_transition(state, dt: float = 1.0, turn_threshold: float = ) -def coordinated_turn_model(dt: float = 1.0, noise_covariance: Any | None = None) -> AdditiveNoiseTransitionModel: +def coordinated_turn_model( + dt: float = 1.0, noise_covariance: Any | None = None +) -> AdditiveNoiseTransitionModel: """Return an additive-noise coordinated-turn model for ``[x, y, vx, vy, omega]``.""" if noise_covariance is None: noise_covariance = zeros((5, 5)) @@ -349,7 +405,11 @@ def coordinated_turn_model(dt: float = 1.0, noise_covariance: Any | None = None) ) -def nearly_coordinated_turn_model(dt: float = 1.0, position_spectral_density: float = 1.0, turn_rate_variance: float = 1e-4) -> AdditiveNoiseTransitionModel: +def nearly_coordinated_turn_model( + dt: float = 1.0, + position_spectral_density: float = 1.0, + turn_rate_variance: float = 1e-4, +) -> AdditiveNoiseTransitionModel: """Return a coordinated-turn model with a simple nearly-constant-turn covariance.""" covariance = np.zeros((5, 5), dtype=float) covariance[:4, :4] = np.asarray( @@ -376,7 +436,9 @@ def nearly_constant_speed_transition(state, dt: float = 1.0): ) -def nearly_constant_speed_model(dt: float = 1.0, noise_covariance: Any | None = None) -> AdditiveNoiseTransitionModel: +def nearly_constant_speed_model( + dt: float = 1.0, noise_covariance: Any | None = None +) -> AdditiveNoiseTransitionModel: """Return an additive-noise nearly-constant-speed model.""" if noise_covariance is None: noise_covariance = zeros((4, 4)) @@ -409,7 +471,9 @@ def se2_unicycle_transition(state, dt: float = 1.0, turn_threshold: float = 1e-8 ) -def se2_unicycle_model(dt: float = 1.0, noise_covariance: Any | None = None) -> AdditiveNoiseTransitionModel: +def se2_unicycle_model( + dt: float = 1.0, noise_covariance: Any | None = None +) -> AdditiveNoiseTransitionModel: """Return an additive-noise SE(2)-style unicycle transition model.""" if noise_covariance is None: noise_covariance = zeros((5, 5)) @@ -446,7 +510,9 @@ def se3_pose_twist_transition(state, dt: float = 1.0): ) -def se3_pose_twist_model(dt: float = 1.0, noise_covariance: Any | None = None) -> AdditiveNoiseTransitionModel: +def se3_pose_twist_model( + dt: float = 1.0, noise_covariance: Any | None = None +) -> AdditiveNoiseTransitionModel: """Return an additive-noise local SE(3) pose/twist transition model.""" if noise_covariance is None: noise_covariance = zeros((12, 12)) diff --git a/src/pyrecest/models/sensor_models.py b/src/pyrecest/models/sensor_models.py index c859f35bb..28793fbba 100644 --- a/src/pyrecest/models/sensor_models.py +++ b/src/pyrecest/models/sensor_models.py @@ -6,7 +6,9 @@ from typing import Any # pylint: disable=no-name-in-module,no-member,too-many-arguments,too-many-positional-arguments -from pyrecest.backend import arctan2, asarray, matvec, sqrt, stack, sum as _sum, zeros +from pyrecest.backend import arctan2, asarray, matvec, sqrt, stack +from pyrecest.backend import sum as _sum +from pyrecest.backend import zeros from pyrecest.models.additive_noise import AdditiveNoiseMeasurementModel __all__ = [ diff --git a/src/pyrecest/utils/__init__.py b/src/pyrecest/utils/__init__.py index 839247db6..202bd4a47 100644 --- a/src/pyrecest/utils/__init__.py +++ b/src/pyrecest/utils/__init__.py @@ -9,8 +9,8 @@ from .association_models import LogisticPairwiseAssociationModel from .history_recorder import HistoryRecorder from .metrics import ( - anis, anees, + anis, chi_square_confidence_bounds, chi_square_confidence_interval, consistency_fraction, diff --git a/src/pyrecest/utils/metrics.py b/src/pyrecest/utils/metrics.py index 911294c2d..e37275a8f 100644 --- a/src/pyrecest/utils/metrics.py +++ b/src/pyrecest/utils/metrics.py @@ -67,7 +67,12 @@ def squared_error(estimates: ArrayLike, groundtruths: ArrayLike) -> np.ndarray: return np.sum(errors * errors, axis=-1) -def mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def mean_squared_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return mean squared component error. ``axis=None`` averages all entries. Use ``axis=0`` to obtain a component-wise @@ -77,12 +82,22 @@ def mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: i return np.mean(errors * errors, axis=axis) -def root_mean_squared_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def root_mean_squared_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return root mean squared component error.""" return np.sqrt(mean_squared_error(estimates, groundtruths, axis=axis)) -def mean_absolute_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: int | tuple[int, ...] | None = None): +def mean_absolute_error( + estimates: ArrayLike, + groundtruths: ArrayLike, + *, + axis: int | tuple[int, ...] | None = None, +): """Return mean absolute component error.""" return np.mean(np.abs(error_vectors(estimates, groundtruths)), axis=axis) @@ -92,7 +107,11 @@ def mean_absolute_error(estimates: ArrayLike, groundtruths: ArrayLike, *, axis: mae = mean_absolute_error -def normalized_estimation_error_squared(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike | None = None): +def normalized_estimation_error_squared( + estimates: ArrayLike, + uncertainties: ArrayLike, + groundtruths: ArrayLike | None = None, +): """Return normalized estimation error squared values. If ``groundtruths`` is omitted, ``estimates`` is interpreted as an error @@ -100,19 +119,43 @@ def normalized_estimation_error_squared(estimates: ArrayLike, uncertainties: Arr returned values have expected value equal to the state dimension for a consistent covariance. """ - errors = _as_numeric_array(estimates, "estimates") if groundtruths is None else error_vectors(estimates, groundtruths) + errors = ( + _as_numeric_array(estimates, "estimates") + if groundtruths is None + else error_vectors(estimates, groundtruths) + ) errors = _as_sample_matrix(errors, "errors") - covariances = _as_covariance_stack(uncertainties, errors.shape[0], errors.shape[1], "uncertainties") + covariances = _as_covariance_stack( + uncertainties, errors.shape[0], errors.shape[1], "uncertainties" + ) values = _quadratic_forms(errors, covariances) - return float(values[0]) if _is_single_vector(estimates) and groundtruths is None else values + return ( + float(values[0]) + if _is_single_vector(estimates) and groundtruths is None + else values + ) -def average_nees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike | None = None) -> float: +def average_nees( + estimates: ArrayLike, + uncertainties: ArrayLike, + groundtruths: ArrayLike | None = None, +) -> float: """Return average normalized estimation error squared.""" - return float(np.mean(np.atleast_1d(normalized_estimation_error_squared(estimates, uncertainties, groundtruths)))) + return float( + np.mean( + np.atleast_1d( + normalized_estimation_error_squared( + estimates, uncertainties, groundtruths + ) + ) + ) + ) -def anees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike) -> float: +def anees( + estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLike +) -> float: """Backward-compatible alias for average NEES.""" return average_nees(estimates, uncertainties, groundtruths) @@ -120,7 +163,11 @@ def anees(estimates: ArrayLike, uncertainties: ArrayLike, groundtruths: ArrayLik nees = normalized_estimation_error_squared -def normalized_innovation_squared(innovations_or_measurements: ArrayLike, predicted_or_covariances: ArrayLike, innovation_covariances: ArrayLike | None = None): +def normalized_innovation_squared( + innovations_or_measurements: ArrayLike, + predicted_or_covariances: ArrayLike, + innovation_covariances: ArrayLike | None = None, +): """Return normalized innovation squared values. Two call styles are supported: @@ -132,24 +179,51 @@ def normalized_innovation_squared(innovations_or_measurements: ArrayLike, predic innovations = _as_numeric_array(innovations_or_measurements, "innovations") covariances = predicted_or_covariances else: - innovations = error_vectors(innovations_or_measurements, predicted_or_covariances) + innovations = error_vectors( + innovations_or_measurements, predicted_or_covariances + ) covariances = innovation_covariances innovations = _as_sample_matrix(innovations, "innovations") - covariance_stack = _as_covariance_stack(covariances, innovations.shape[0], innovations.shape[1], "innovation_covariances") + covariance_stack = _as_covariance_stack( + covariances, + innovations.shape[0], + innovations.shape[1], + "innovation_covariances", + ) values = _quadratic_forms(innovations, covariance_stack) - return float(values[0]) if innovations.shape[0] == 1 and _is_single_vector(innovations_or_measurements) else values + return ( + float(values[0]) + if innovations.shape[0] == 1 and _is_single_vector(innovations_or_measurements) + else values + ) -def average_nis(innovations_or_measurements: ArrayLike, predicted_or_covariances: ArrayLike, innovation_covariances: ArrayLike | None = None) -> float: +def average_nis( + innovations_or_measurements: ArrayLike, + predicted_or_covariances: ArrayLike, + innovation_covariances: ArrayLike | None = None, +) -> float: """Return average normalized innovation squared.""" - return float(np.mean(np.atleast_1d(normalized_innovation_squared(innovations_or_measurements, predicted_or_covariances, innovation_covariances)))) + return float( + np.mean( + np.atleast_1d( + normalized_innovation_squared( + innovations_or_measurements, + predicted_or_covariances, + innovation_covariances, + ) + ) + ) + ) nis = normalized_innovation_squared anis = average_nis -def chi_square_confidence_bounds(degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def chi_square_confidence_bounds( + degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return two-sided chi-square bounds for an averaged NEES/NIS statistic.""" if int(degrees_of_freedom) <= 0: raise ValueError("degrees_of_freedom must be positive") @@ -164,40 +238,74 @@ def chi_square_confidence_bounds(degrees_of_freedom: int, *, n_samples: int = 1, return float(lower), float(upper) -def chi_square_confidence_interval(degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def chi_square_confidence_interval( + degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`chi_square_confidence_bounds`.""" - return chi_square_confidence_bounds(degrees_of_freedom, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) -def nees_confidence_bounds(state_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nees_confidence_bounds( + state_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return chi-square consistency bounds for NEES or ANEES.""" - return chi_square_confidence_bounds(state_dim, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + state_dim, n_samples=n_samples, confidence=confidence + ) -def nees_confidence_interval(state_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nees_confidence_interval( + state_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`nees_confidence_bounds`.""" return nees_confidence_bounds(state_dim, n_samples=n_samples, confidence=confidence) -def nis_confidence_bounds(measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nis_confidence_bounds( + measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Return chi-square consistency bounds for NIS or ANIS.""" - return chi_square_confidence_bounds(measurement_dim, n_samples=n_samples, confidence=confidence) + return chi_square_confidence_bounds( + measurement_dim, n_samples=n_samples, confidence=confidence + ) -def nis_confidence_interval(measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95) -> tuple[float, float]: +def nis_confidence_interval( + measurement_dim: int, *, n_samples: int = 1, confidence: float = 0.95 +) -> tuple[float, float]: """Alias for :func:`nis_confidence_bounds`.""" - return nis_confidence_bounds(measurement_dim, n_samples=n_samples, confidence=confidence) + return nis_confidence_bounds( + measurement_dim, n_samples=n_samples, confidence=confidence + ) -def is_chi_square_consistent(statistic: float, degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> bool: +def is_chi_square_consistent( + statistic: float, + degrees_of_freedom: int, + *, + n_samples: int = 1, + confidence: float = 0.95, +) -> bool: """Return whether a scalar statistic lies inside chi-square bounds.""" - lower, upper = chi_square_confidence_bounds(degrees_of_freedom, n_samples=n_samples, confidence=confidence) + lower, upper = chi_square_confidence_bounds( + degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) return bool(lower <= float(statistic) <= upper) -def is_within_chi_square_confidence_interval(statistic: float, degrees_of_freedom: int, *, n_samples: int = 1, confidence: float = 0.95) -> bool: +def is_within_chi_square_confidence_interval( + statistic: float, + degrees_of_freedom: int, + *, + n_samples: int = 1, + confidence: float = 0.95, +) -> bool: """Alias for :func:`is_chi_square_consistent`.""" - return is_chi_square_consistent(statistic, degrees_of_freedom, n_samples=n_samples, confidence=confidence) + return is_chi_square_consistent( + statistic, degrees_of_freedom, n_samples=n_samples, confidence=confidence + ) def consistency_fraction(values: ArrayLike, lower: float, upper: float) -> float: @@ -205,7 +313,9 @@ def consistency_fraction(values: ArrayLike, lower: float, upper: float) -> float values_array = _as_numeric_array(values, "values") if values_array.size == 0: return 0.0 - return float(np.mean((values_array >= float(lower)) & (values_array <= float(upper)))) + return float( + np.mean((values_array >= float(lower)) & (values_array <= float(upper))) + ) def ospa_distance( @@ -226,14 +336,25 @@ def ospa_distance( if normalizer == 0: return _components_or_distance("ospa", 0.0, 0.0, 0.0, 0, return_components) if n_estimated == 0 or n_reference == 0: - return _components_or_distance("ospa", cutoff, 0.0, cutoff, 0, return_components) + return _components_or_distance( + "ospa", cutoff, 0.0, cutoff, 0, return_components + ) - assignment_cost, assignments = _clipped_assignment_cost(estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn) + assignment_cost, assignments = _clipped_assignment_cost( + estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn + ) cardinality_cost = cutoff**order * abs(n_estimated - n_reference) localization_component = assignment_cost / float(normalizer) cardinality_component = cardinality_cost / float(normalizer) distance = (localization_component + cardinality_component) ** (1.0 / order) - return _components_or_distance("ospa", distance, localization_component ** (1.0 / order), cardinality_component ** (1.0 / order), len(assignments), return_components) + return _components_or_distance( + "ospa", + distance, + localization_component ** (1.0 / order), + cardinality_component ** (1.0 / order), + len(assignments), + return_components, + ) def mospa_distance( @@ -247,9 +368,20 @@ def mospa_distance( ) -> float | tuple[float, np.ndarray]: """Return mean OSPA over a sequence of finite-set estimates.""" if len(estimated_point_sets) != len(reference_point_sets): - raise ValueError("estimated_point_sets and reference_point_sets must have the same length") + raise ValueError( + "estimated_point_sets and reference_point_sets must have the same length" + ) distances = np.asarray( - [ospa_distance(estimated, reference, cutoff=cutoff, order=order, distance_fn=distance_fn) for estimated, reference in zip(estimated_point_sets, reference_point_sets)], + [ + ospa_distance( + estimated, + reference, + cutoff=cutoff, + order=order, + distance_fn=distance_fn, + ) + for estimated, reference in zip(estimated_point_sets, reference_point_sets) + ], dtype=float, ) mean_distance = float(np.mean(distances)) if distances.size else 0.0 @@ -274,10 +406,21 @@ def gospa_distance( if estimated.shape[0] == 0 and reference.shape[0] == 0: return _components_or_distance("gospa", 0.0, 0.0, 0.0, 0, return_components) - assignment_cost, assignments = _clipped_assignment_cost(estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn) - cardinality_cost = cutoff**order / float(alpha) * abs(estimated.shape[0] - reference.shape[0]) + assignment_cost, assignments = _clipped_assignment_cost( + estimated, reference, order=order, cutoff=cutoff, distance_fn=distance_fn + ) + cardinality_cost = ( + cutoff**order / float(alpha) * abs(estimated.shape[0] - reference.shape[0]) + ) distance = (assignment_cost + cardinality_cost) ** (1.0 / order) - return _components_or_distance("gospa", distance, assignment_cost ** (1.0 / order), cardinality_cost ** (1.0 / order), len(assignments), return_components) + return _components_or_distance( + "gospa", + distance, + assignment_cost ** (1.0 / order), + cardinality_cost ** (1.0 / order), + len(assignments), + return_components, + ) def iou_polygon(polygon1, polygon2): @@ -301,7 +444,14 @@ def extent_intersection_over_union(shape1, shape2) -> float: return eot_shape_iou(shape1, shape2) -def gaussian_wasserstein_distance(mean1: ArrayLike, covariance1: ArrayLike, mean2: ArrayLike, covariance2: ArrayLike, *, squared: bool = False) -> float: +def gaussian_wasserstein_distance( + mean1: ArrayLike, + covariance1: ArrayLike, + mean2: ArrayLike, + covariance2: ArrayLike, + *, + squared: bool = False, +) -> float: """Return the 2-Wasserstein distance between Gaussian distributions.""" mean1_np = _as_numeric_array(mean1, "mean1").reshape(-1) mean2_np = _as_numeric_array(mean2, "mean2").reshape(-1) @@ -319,30 +469,46 @@ def gaussian_wasserstein_distance(mean1: ArrayLike, covariance1: ArrayLike, mean mean_error = mean1_np - mean2_np mean_term = float(mean_error @ mean_error) covariance1_sqrt = _symmetric_matrix_square_root(covariance1_np) - middle_sqrt = _symmetric_matrix_square_root(covariance1_sqrt @ covariance2_np @ covariance1_sqrt) + middle_sqrt = _symmetric_matrix_square_root( + covariance1_sqrt @ covariance2_np @ covariance1_sqrt + ) trace_term = float(np.trace(covariance1_np + covariance2_np - 2.0 * middle_sqrt)) squared_distance = max(mean_term + trace_term, 0.0) return float(squared_distance) if squared else float(np.sqrt(squared_distance)) -def extent_wasserstein_distance(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, squared: bool = False) -> float: +def extent_wasserstein_distance( + estimated_extent: ArrayLike, reference_extent: ArrayLike, *, squared: bool = False +) -> float: """Return the covariance/extent part of the Gaussian 2-Wasserstein distance.""" estimated = _as_numeric_array(estimated_extent, "estimated_extent") reference = _as_numeric_array(reference_extent, "reference_extent") _validate_square_matrix(estimated, "estimated_extent") _validate_square_matrix(reference, "reference_extent") if estimated.shape != reference.shape: - raise ValueError("estimated_extent and reference_extent must have the same shape") + raise ValueError( + "estimated_extent and reference_extent must have the same shape" + ) zeros = np.zeros(estimated.shape[0], dtype=float) - return gaussian_wasserstein_distance(zeros, estimated, zeros, reference, squared=squared) + return gaussian_wasserstein_distance( + zeros, estimated, zeros, reference, squared=squared + ) -def extent_matrix_error(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, ord: str | int | float = "fro", relative: bool = False) -> float: +def extent_matrix_error( + estimated_extent: ArrayLike, + reference_extent: ArrayLike, + *, + ord: str | int | float = "fro", + relative: bool = False, +) -> float: """Return matrix-norm error between estimated and reference extents.""" estimated = _as_numeric_array(estimated_extent, "estimated_extent") reference = _as_numeric_array(reference_extent, "reference_extent") if estimated.shape != reference.shape: - raise ValueError("estimated_extent and reference_extent must have the same shape") + raise ValueError( + "estimated_extent and reference_extent must have the same shape" + ) error = float(np.linalg.norm(estimated - reference, ord=ord)) if not relative: return error @@ -352,9 +518,17 @@ def extent_matrix_error(estimated_extent: ArrayLike, reference_extent: ArrayLike return error / denominator -def extent_error(estimated_extent: ArrayLike, reference_extent: ArrayLike, *, ord: str | int | float = "fro", relative: bool = False) -> float: +def extent_error( + estimated_extent: ArrayLike, + reference_extent: ArrayLike, + *, + ord: str | int | float = "fro", + relative: bool = False, +) -> float: """Alias for :func:`extent_matrix_error`.""" - return extent_matrix_error(estimated_extent, reference_extent, ord=ord, relative=relative) + return extent_matrix_error( + estimated_extent, reference_extent, ord=ord, relative=relative + ) def _as_numeric_array(value: ArrayLike, name: str) -> np.ndarray: @@ -392,7 +566,9 @@ def _is_single_vector(values: ArrayLike) -> bool: return _as_numeric_array(values, "values").ndim == 1 -def _as_covariance_stack(covariances: ArrayLike, n_samples: int, dim: int, name: str) -> np.ndarray: +def _as_covariance_stack( + covariances: ArrayLike, n_samples: int, dim: int, name: str +) -> np.ndarray: covariance_array = _as_numeric_array(covariances, name) if covariance_array.ndim == 2: if covariance_array.shape != (dim, dim): @@ -420,7 +596,9 @@ def _validate_order_cutoff(order: float, cutoff: float) -> tuple[float, float]: return order, cutoff -def _coerce_point_sets(points1: ArrayLike, points2: ArrayLike) -> tuple[np.ndarray, np.ndarray]: +def _coerce_point_sets( + points1: ArrayLike, points2: ArrayLike +) -> tuple[np.ndarray, np.ndarray]: first = _coerce_point_set(points1, "estimated_points") second = _coerce_point_set(points2, "reference_points") if first.shape[0] and second.shape[0] and first.shape[1] != second.shape[1]: @@ -439,7 +617,14 @@ def _coerce_point_set(points: ArrayLike, name: str) -> np.ndarray: return points_np -def _clipped_assignment_cost(estimated: np.ndarray, reference: np.ndarray, *, order: float, cutoff: float, distance_fn: DistanceFunction | None) -> tuple[float, list[tuple[int, int]]]: +def _clipped_assignment_cost( + estimated: np.ndarray, + reference: np.ndarray, + *, + order: float, + cutoff: float, + distance_fn: DistanceFunction | None, +) -> tuple[float, list[tuple[int, int]]]: if estimated.shape[0] == 0 or reference.shape[0] == 0: return 0.0, [] distances = _pairwise_distances(estimated, reference, distance_fn) @@ -449,7 +634,9 @@ def _clipped_assignment_cost(estimated: np.ndarray, reference: np.ndarray, *, or return assignment_cost, [(int(row), int(col)) for row, col in zip(row_ind, col_ind)] -def _pairwise_distances(estimated: np.ndarray, reference: np.ndarray, distance_fn: DistanceFunction | None) -> np.ndarray: +def _pairwise_distances( + estimated: np.ndarray, reference: np.ndarray, distance_fn: DistanceFunction | None +) -> np.ndarray: if distance_fn is None: differences = estimated[:, None, :] - reference[None, :, :] return np.linalg.norm(differences, axis=-1) @@ -460,7 +647,14 @@ def _pairwise_distances(estimated: np.ndarray, reference: np.ndarray, distance_f return distances -def _components_or_distance(name: str, distance: float, localization_component: float, cardinality_component: float, assignments: int, return_components: bool) -> float | dict[str, float | int]: +def _components_or_distance( + name: str, + distance: float, + localization_component: float, + cardinality_component: float, + assignments: int, + return_components: bool, +) -> float | dict[str, float | int]: if not return_components: return float(distance) return { diff --git a/src/pyrecest/utils/track_metrics.py b/src/pyrecest/utils/track_metrics.py index 684298b51..f3e84fc01 100644 --- a/src/pyrecest/utils/track_metrics.py +++ b/src/pyrecest/utils/track_metrics.py @@ -25,7 +25,9 @@ ] -def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) -> dict[str, float | int]: +def score_track_purity( + predicted_track_matrix: Any, reference_track_matrix: Any +) -> dict[str, float | int]: """Return predicted-track identity purity metrics. A predicted track's purity is the fraction of its observations that belong @@ -33,7 +35,9 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) matrix count as impure in ``mean_track_purity`` and ``observation_weighted_track_purity``. """ - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) reference_lookup = _single_observation_lookup(reference) purities: list[float] = [] @@ -51,7 +55,11 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) empty_tracks += 1 continue total_observations += len(observations) - matched_reference_ids = [reference_lookup[observation] for observation in observations if observation in reference_lookup] + matched_reference_ids = [ + reference_lookup[observation] + for observation in observations + if observation in reference_lookup + ] counts = Counter(matched_reference_ids) labeled_count = int(sum(counts.values())) dominant_count = max(counts.values(), default=0) @@ -72,24 +80,38 @@ def score_track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) "pure_tracks": int(pure_tracks), "impure_tracks": int(impure_tracks), "mean_track_purity": _mean_or_zero(purities), - "observation_weighted_track_purity": _zero_ratio(dominant_observations, total_observations), + "observation_weighted_track_purity": _zero_ratio( + dominant_observations, total_observations + ), "mean_labeled_track_purity": _mean_or_zero(labeled_purities), - "observation_weighted_labeled_track_purity": _zero_ratio(dominant_observations, labeled_observations), + "observation_weighted_labeled_track_purity": _zero_ratio( + dominant_observations, labeled_observations + ), "unreferenced_predicted_observations": int(unreferenced_observations), - "unreferenced_observation_rate": _zero_ratio(unreferenced_observations, total_observations), + "unreferenced_observation_rate": _zero_ratio( + unreferenced_observations, total_observations + ), } def track_purity(predicted_track_matrix: Any, reference_track_matrix: Any) -> float: """Return observation-weighted predicted-track purity.""" - return float(score_track_purity(predicted_track_matrix, reference_track_matrix)["observation_weighted_track_purity"]) + return float( + score_track_purity(predicted_track_matrix, reference_track_matrix)[ + "observation_weighted_track_purity" + ] + ) -def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> dict[str, float | int]: +def score_false_tracks( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> dict[str, float | int]: """Return metrics for predicted tracks that contain no reference observation.""" if int(min_length) <= 0: raise ValueError("min_length must be positive") - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) reference_lookup = _single_observation_lookup(reference) evaluated_tracks = 0 @@ -102,7 +124,9 @@ def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, if len(observations) < int(min_length): continue evaluated_tracks += 1 - matched = sum(1 for observation in observations if observation in reference_lookup) + matched = sum( + 1 for observation in observations if observation in reference_lookup + ) unreferenced_observations += len(observations) - matched if matched == 0: false_tracks += 1 @@ -114,20 +138,32 @@ def score_false_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, "false_track_rate": _zero_ratio(false_tracks, evaluated_tracks), "false_track_observations": int(false_track_observations), "unreferenced_predicted_observations": int(unreferenced_observations), - "unreferenced_predicted_observation_rate": _zero_ratio(unreferenced_observations, total_observations), + "unreferenced_predicted_observation_rate": _zero_ratio( + unreferenced_observations, total_observations + ), } -def false_track_rate(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> float: +def false_track_rate( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> float: """Return the fraction of evaluated predicted tracks that are false.""" - return float(score_false_tracks(predicted_track_matrix, reference_track_matrix, min_length=min_length)["false_track_rate"]) + return float( + score_false_tracks( + predicted_track_matrix, reference_track_matrix, min_length=min_length + )["false_track_rate"] + ) -def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> dict[str, float | int]: +def score_missed_tracks( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> dict[str, float | int]: """Return metrics for reference tracks with no predicted observation support.""" if int(min_length) <= 0: raise ValueError("min_length must be positive") - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) predicted_lookup = _single_observation_lookup(predicted) evaluated_tracks = 0 @@ -140,7 +176,9 @@ def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any if len(observations) < int(min_length): continue evaluated_tracks += 1 - recovered = sum(1 for observation in observations if observation in predicted_lookup) + recovered = sum( + 1 for observation in observations if observation in predicted_lookup + ) missed_observations += len(observations) - recovered if recovered == 0: missed_tracks += 1 @@ -153,18 +191,34 @@ def score_missed_tracks(predicted_track_matrix: Any, reference_track_matrix: Any "missed_track_evaluated_reference_tracks": int(evaluated_tracks), "missed_track_rate": _zero_ratio(missed_tracks, evaluated_tracks), "missed_reference_observations": int(missed_observations), - "missed_reference_observation_rate": _zero_ratio(missed_observations, total_observations), + "missed_reference_observation_rate": _zero_ratio( + missed_observations, total_observations + ), } -def missed_track_rate(predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1) -> float: +def missed_track_rate( + predicted_track_matrix: Any, reference_track_matrix: Any, *, min_length: int = 1 +) -> float: """Return the fraction of evaluated reference tracks that are missed.""" - return float(score_missed_tracks(predicted_track_matrix, reference_track_matrix, min_length=min_length)["missed_track_rate"]) - - -def track_latencies(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None, missed_value=np.nan) -> np.ndarray: + return float( + score_missed_tracks( + predicted_track_matrix, reference_track_matrix, min_length=min_length + )["missed_track_rate"] + ) + + +def track_latencies( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, + missed_value=np.nan, +) -> np.ndarray: """Return first-detection latency for each non-empty reference track.""" - predicted, reference = _normalized_pair(predicted_track_matrix, reference_track_matrix) + predicted, reference = _normalized_pair( + predicted_track_matrix, reference_track_matrix + ) predicted_lookup = _single_observation_lookup(predicted) times = _session_times(reference.shape[1], session_times) values: list[float] = [] @@ -172,17 +226,30 @@ def track_latencies(predicted_track_matrix: Any, reference_track_matrix: Any, *, if not observations: continue first_reference_session = min(session for session, _ in observations) - detected_sessions = [session for session, observation in observations if (session, observation) in predicted_lookup] + detected_sessions = [ + session + for session, observation in observations + if (session, observation) in predicted_lookup + ] if not detected_sessions: values.append(float(missed_value)) continue - values.append(float(times[min(detected_sessions)] - times[first_reference_session])) + values.append( + float(times[min(detected_sessions)] - times[first_reference_session]) + ) return np.asarray(values, dtype=float) -def score_track_latency(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None) -> dict[str, float | int]: +def score_track_latency( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, +) -> dict[str, float | int]: """Return aggregate first-detection latency metrics.""" - values = track_latencies(predicted_track_matrix, reference_track_matrix, session_times=session_times) + values = track_latencies( + predicted_track_matrix, reference_track_matrix, session_times=session_times + ) detected_values = values[np.isfinite(values)] return { "latency_reference_tracks": int(values.size), @@ -190,32 +257,58 @@ def score_track_latency(predicted_track_matrix: Any, reference_track_matrix: Any "latency_missed_tracks": int(values.size - detected_values.size), "track_detection_rate": _zero_ratio(detected_values.size, values.size), "mean_track_latency": _mean_or_zero(detected_values), - "median_track_latency": float(np.median(detected_values)) if detected_values.size else 0.0, - "max_track_latency": float(np.max(detected_values)) if detected_values.size else 0.0, + "median_track_latency": ( + float(np.median(detected_values)) if detected_values.size else 0.0 + ), + "max_track_latency": ( + float(np.max(detected_values)) if detected_values.size else 0.0 + ), } -def score_track_outcomes(predicted_track_matrix: Any, reference_track_matrix: Any, *, session_times: Sequence[float] | None = None) -> dict[str, float | int]: +def score_track_outcomes( + predicted_track_matrix: Any, + reference_track_matrix: Any, + *, + session_times: Sequence[float] | None = None, +) -> dict[str, float | int]: """Return fragmentation, purity, false-track, missed-track, and latency metrics.""" scores: dict[str, float | int] = {} - scores.update(score_track_fragmentation(predicted_track_matrix, reference_track_matrix)) + scores.update( + score_track_fragmentation(predicted_track_matrix, reference_track_matrix) + ) scores.update(score_track_purity(predicted_track_matrix, reference_track_matrix)) scores.update(score_false_tracks(predicted_track_matrix, reference_track_matrix)) scores.update(score_missed_tracks(predicted_track_matrix, reference_track_matrix)) - scores.update(score_track_latency(predicted_track_matrix, reference_track_matrix, session_times=session_times)) + scores.update( + score_track_latency( + predicted_track_matrix, reference_track_matrix, session_times=session_times + ) + ) return scores -def _normalized_pair(predicted_track_matrix: Any, reference_track_matrix: Any) -> tuple[np.ndarray, np.ndarray]: +def _normalized_pair( + predicted_track_matrix: Any, reference_track_matrix: Any +) -> tuple[np.ndarray, np.ndarray]: predicted = normalize_track_matrix(predicted_track_matrix) reference = normalize_track_matrix(reference_track_matrix) if predicted.shape[1] != reference.shape[1]: - raise ValueError("Predicted and reference matrices must have the same number of sessions") + raise ValueError( + "Predicted and reference matrices must have the same number of sessions" + ) return predicted, reference def _observations_by_track(matrix: np.ndarray) -> list[list[Observation]]: - return [[(int(session_idx), int(value)) for session_idx, value in enumerate(row) if value is not None] for row in matrix] + return [ + [ + (int(session_idx), int(value)) + for session_idx, value in enumerate(row) + if value is not None + ] + for row in matrix + ] def _single_observation_lookup(matrix: np.ndarray) -> dict[Observation, int]: @@ -227,12 +320,16 @@ def _single_observation_lookup(matrix: np.ndarray) -> dict[Observation, int]: return lookup -def _session_times(n_sessions: int, session_times: Sequence[float] | None) -> np.ndarray: +def _session_times( + n_sessions: int, session_times: Sequence[float] | None +) -> np.ndarray: if session_times is None: return np.arange(n_sessions, dtype=float) times = np.asarray(session_times, dtype=float) if times.shape != (n_sessions,): - raise ValueError("session_times must have length equal to the number of sessions") + raise ValueError( + "session_times must have length equal to the number of sessions" + ) return times diff --git a/tests/test_dynamic_models.py b/tests/test_dynamic_models.py index 0e3f1f89f..45239bd55 100644 --- a/tests/test_dynamic_models.py +++ b/tests/test_dynamic_models.py @@ -4,7 +4,6 @@ import numpy as np import numpy.testing as npt - from pyrecest.backend import array, diag from pyrecest.models import ( bearing_only_measurement, @@ -31,7 +30,9 @@ class TestMotionModelCatalog(unittest.TestCase): def test_constant_velocity_matrix_and_covariance_use_derivative_grouped_order(self): transition = constant_velocity_transition_matrix(2.0, spatial_dim=2) - covariance = white_noise_acceleration_covariance(2.0, spatial_dim=2, spectral_density=3.0) + covariance = white_noise_acceleration_covariance( + 2.0, spatial_dim=2, spectral_density=3.0 + ) npt.assert_allclose( transition, @@ -58,10 +59,15 @@ def test_constant_velocity_matrix_and_covariance_use_derivative_grouped_order(se def test_linear_transition_models_predict_mean(self): model = constant_velocity_model(0.5, spatial_dim=2, spectral_density=0.1) - npt.assert_allclose(model.predict_mean(array([1.0, 2.0, 4.0, -2.0])), np.array([3.0, 1.0, 4.0, -2.0])) + npt.assert_allclose( + model.predict_mean(array([1.0, 2.0, 4.0, -2.0])), + np.array([3.0, 1.0, 4.0, -2.0]), + ) ca_transition = constant_acceleration_transition_matrix(2.0, spatial_dim=1) - npt.assert_allclose(ca_transition, np.array([[1.0, 2.0, 2.0], [0.0, 1.0, 2.0], [0.0, 0.0, 1.0]])) + npt.assert_allclose( + ca_transition, np.array([[1.0, 2.0, 2.0], [0.0, 1.0, 2.0], [0.0, 0.0, 1.0]]) + ) def test_continuous_to_discrete_lti(self): transition, covariance = continuous_to_discrete_lti( @@ -71,7 +77,9 @@ def test_continuous_to_discrete_lti(self): dt=1.0, ) npt.assert_allclose(transition, np.array([[1.0, 1.0], [0.0, 1.0]]), atol=1e-12) - npt.assert_allclose(covariance, np.array([[2.0 / 3.0, 1.0], [1.0, 2.0]]), atol=1e-12) + npt.assert_allclose( + covariance, np.array([[2.0 / 3.0, 1.0], [1.0, 2.0]]), atol=1e-12 + ) def test_singer_model_shapes(self): model = singer_model(1.0, spatial_dim=2, tau=5.0, acceleration_variance=0.5) @@ -79,39 +87,67 @@ def test_singer_model_shapes(self): self.assertEqual(tuple(model.noise_cov.shape), (6, 6)) def test_nonlinear_motion_transitions(self): - npt.assert_allclose(coordinated_turn_transition(array([0.0, 0.0, 1.0, 0.0, 0.0]), dt=2.0), np.array([2.0, 0.0, 1.0, 0.0, 0.0])) - npt.assert_allclose(nearly_constant_speed_transition(array([0.0, 0.0, 2.0, 0.0]), dt=3.0), np.array([6.0, 0.0, 2.0, 0.0])) - npt.assert_allclose(se2_unicycle_transition(array([0.0, 0.0, 0.0, 1.0, 0.0]), dt=2.0), np.array([2.0, 0.0, 0.0, 1.0, 0.0])) + npt.assert_allclose( + coordinated_turn_transition(array([0.0, 0.0, 1.0, 0.0, 0.0]), dt=2.0), + np.array([2.0, 0.0, 1.0, 0.0, 0.0]), + ) + npt.assert_allclose( + nearly_constant_speed_transition(array([0.0, 0.0, 2.0, 0.0]), dt=3.0), + np.array([6.0, 0.0, 2.0, 0.0]), + ) + npt.assert_allclose( + se2_unicycle_transition(array([0.0, 0.0, 0.0, 1.0, 0.0]), dt=2.0), + np.array([2.0, 0.0, 0.0, 1.0, 0.0]), + ) state = array([1.0, 2.0, 3.0, 0.1, 0.2, 0.3, 4.0, 5.0, 6.0, 0.01, 0.02, 0.03]) - npt.assert_allclose(se3_pose_twist_transition(state, dt=2.0)[:6], np.array([9.0, 12.0, 15.0, 0.12, 0.24, 0.36])) + npt.assert_allclose( + se3_pose_twist_transition(state, dt=2.0)[:6], + np.array([9.0, 12.0, 15.0, 0.12, 0.24, 0.36]), + ) class TestSensorModelCatalog(unittest.TestCase): def test_range_bearing_and_jacobian(self): state = array([3.0, 4.0, 0.0, 0.0]) - npt.assert_allclose(range_bearing_measurement(state), np.array([5.0, np.arctan2(4.0, 3.0)])) - npt.assert_allclose(bearing_only_measurement(state), np.array([np.arctan2(4.0, 3.0)])) + npt.assert_allclose( + range_bearing_measurement(state), np.array([5.0, np.arctan2(4.0, 3.0)]) + ) + npt.assert_allclose( + bearing_only_measurement(state), np.array([np.arctan2(4.0, 3.0)]) + ) npt.assert_allclose( range_bearing_jacobian(state), np.array([[0.6, 0.8, 0.0, 0.0], [-4.0 / 25.0, 3.0 / 25.0, 0.0, 0.0]]), ) model = range_bearing_model(diag(array([0.1, 0.2]))) - npt.assert_allclose(model.evaluate(state), np.array([5.0, np.arctan2(4.0, 3.0)])) + npt.assert_allclose( + model.evaluate(state), np.array([5.0, np.arctan2(4.0, 3.0)]) + ) def test_radar_tdoa_fdoa_and_camera_measurements(self): radar_state = array([3.0, 4.0, 3.0, 4.0]) - npt.assert_allclose(radar_range_bearing_doppler_measurement(radar_state), np.array([5.0, np.arctan2(4.0, 3.0), 5.0])) + npt.assert_allclose( + radar_range_bearing_doppler_measurement(radar_state), + np.array([5.0, np.arctan2(4.0, 3.0), 5.0]), + ) tdoa_state = array([0.0, 4.0, 0.0, 1.0]) sensors = array([[0.0, 0.0], [3.0, 0.0]]) npt.assert_allclose(tdoa_measurement(tdoa_state, sensors), np.array([1.0])) npt.assert_allclose(fdoa_measurement(tdoa_state, sensors), np.array([-0.2])) - npt.assert_allclose(camera_projection_measurement(array([2.0, 4.0, 2.0])), np.array([1.0, 2.0])) + npt.assert_allclose( + camera_projection_measurement(array([2.0, 4.0, 2.0])), np.array([1.0, 2.0]) + ) camera_matrix = array([[2.0, 0.0, 10.0], [0.0, 3.0, 20.0], [0.0, 0.0, 1.0]]) - npt.assert_allclose(camera_projection_measurement(array([2.0, 4.0, 2.0]), camera_matrix=camera_matrix), np.array([12.0, 26.0])) + npt.assert_allclose( + camera_projection_measurement( + array([2.0, 4.0, 2.0]), camera_matrix=camera_matrix + ), + np.array([12.0, 26.0]), + ) if __name__ == "__main__": diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 5c96bfe47..614d928da 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -90,13 +90,19 @@ def test_nees_nis_and_bounds(self): groundtruths = array([[0.0, 0.0], [0.0, 0.0]]) covariances = array([np.eye(2), np.eye(2)]) - npt.assert_allclose(nees(estimates, covariances, groundtruths), np.array([2.0, 4.0])) + npt.assert_allclose( + nees(estimates, covariances, groundtruths), np.array([2.0, 4.0]) + ) self.assertEqual(anees(estimates, covariances, groundtruths), 3.0) innovations = array([[1.0, 0.0], [0.0, 2.0]]) innovation_covariances = array([np.eye(2), 2.0 * np.eye(2)]) - npt.assert_allclose(nis(innovations, innovation_covariances), np.array([1.0, 2.0])) - npt.assert_allclose(nis(innovations, groundtruths, innovation_covariances), np.array([1.0, 2.0])) + npt.assert_allclose( + nis(innovations, innovation_covariances), np.array([1.0, 2.0]) + ) + npt.assert_allclose( + nis(innovations, groundtruths, innovation_covariances), np.array([1.0, 2.0]) + ) self.assertEqual(anis(innovations, innovation_covariances), 1.5) lower, upper = nees_confidence_bounds(2, n_samples=2) @@ -105,16 +111,27 @@ def test_nees_nis_and_bounds(self): self.assertTrue(is_chi_square_consistent(3.0, 2, n_samples=2)) nis_lower, nis_upper = nis_confidence_bounds(2, n_samples=2) self.assertEqual((lower, upper), (nis_lower, nis_upper)) - self.assertEqual(consistency_fraction([lower - 1.0, 2.0, upper + 1.0], lower, upper), 1.0 / 3.0) + self.assertEqual( + consistency_fraction([lower - 1.0, 2.0, upper + 1.0], lower, upper), + 1.0 / 3.0, + ) class TestSetDistances(unittest.TestCase): def test_ospa_distance(self): estimates = array([[0.0], [2.0]]) groundtruths = array([[0.0], [3.0]]) - npt.assert_allclose(ospa_distance(estimates, groundtruths, cutoff=10.0, order=2.0), np.sqrt(0.5)) + npt.assert_allclose( + ospa_distance(estimates, groundtruths, cutoff=10.0, order=2.0), np.sqrt(0.5) + ) - cardinality_case = ospa_distance(array([[0.0]]), array([[0.0], [3.0]]), cutoff=2.0, order=1.0, return_components=True) + cardinality_case = ospa_distance( + array([[0.0]]), + array([[0.0], [3.0]]), + cutoff=2.0, + order=1.0, + return_components=True, + ) self.assertEqual(cardinality_case["ospa"], 1.0) self.assertEqual(cardinality_case["localization"], 0.0) self.assertEqual(cardinality_case["cardinality"], 1.0) @@ -123,13 +140,29 @@ def test_ospa_distance(self): def test_mospa_distance(self): estimated_sets = [array([[0.0]]), array([[1.0]])] groundtruth_sets = [array([[0.0]]), array([[3.0]])] - self.assertEqual(mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0), 1.0) - mean_distance, per_step = mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0, return_per_step=True) + self.assertEqual( + mospa_distance(estimated_sets, groundtruth_sets, cutoff=10.0, order=1.0), + 1.0, + ) + mean_distance, per_step = mospa_distance( + estimated_sets, + groundtruth_sets, + cutoff=10.0, + order=1.0, + return_per_step=True, + ) self.assertEqual(mean_distance, 1.0) npt.assert_allclose(per_step, np.array([0.0, 2.0])) def test_gospa_distance(self): - result = gospa_distance(array([[0.0]]), array([[0.0], [4.0]]), cutoff=2.0, order=1.0, alpha=2.0, return_components=True) + result = gospa_distance( + array([[0.0]]), + array([[0.0], [4.0]]), + cutoff=2.0, + order=1.0, + alpha=2.0, + return_components=True, + ) self.assertEqual(result["gospa"], 1.0) self.assertEqual(result["localization"], 0.0) self.assertEqual(result["cardinality"], 1.0) @@ -142,7 +175,9 @@ def test_gaussian_wasserstein_and_extent_error(self): cov = array(np.eye(2)) self.assertEqual(gaussian_wasserstein_distance(mean, cov, mean, cov), 0.0) self.assertEqual(extent_wasserstein_distance(cov, cov), 0.0) - npt.assert_allclose(gaussian_wasserstein_distance(mean, cov, array([1.0, 0.0]), cov), 1.0) + npt.assert_allclose( + gaussian_wasserstein_distance(mean, cov, array([1.0, 0.0]), cov), 1.0 + ) npt.assert_allclose(extent_matrix_error(cov, 2.0 * cov), np.sqrt(2.0)) npt.assert_allclose(extent_error(cov, 2.0 * cov, relative=True), 0.5) diff --git a/tests/test_track_metrics.py b/tests/test_track_metrics.py index 57223c164..80cf4de0a 100644 --- a/tests/test_track_metrics.py +++ b/tests/test_track_metrics.py @@ -4,7 +4,6 @@ import numpy as np import numpy.testing as npt - from pyrecest.utils.track_metrics import ( false_track_rate, missed_track_rate, @@ -46,8 +45,12 @@ def test_track_purity_and_detection(self): npt.assert_allclose(missed_track_rate(self.predicted, self.reference), 0.0) def test_track_latency_and_combined_scores(self): - npt.assert_allclose(track_latencies(self.predicted, self.reference), np.array([0.0, 0.0])) - summary = score_track_latency(self.predicted, self.reference, session_times=[0.0, 5.0, 9.0]) + npt.assert_allclose( + track_latencies(self.predicted, self.reference), np.array([0.0, 0.0]) + ) + summary = score_track_latency( + self.predicted, self.reference, session_times=[0.0, 5.0, 9.0] + ) self.assertEqual(summary["latency_detected_tracks"], 2) self.assertEqual(summary["mean_track_latency"], 0.0)