From f69894d1b1205a0947247d651b10c3c458aa1514 Mon Sep 17 00:00:00 2001 From: lovely90133 Date: Sun, 3 May 2026 16:33:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(security=5Faccess):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E8=AE=BF=E9=97=AE=E7=8A=B6=E6=80=81=E7=94=9F?= =?UTF-8?q?=E5=91=BD=E5=91=A8=E6=9C=9F=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现安全访问级别的状态跟踪和重置功能 - 添加_unlocked_security_levels集合跟踪已解锁的安全级别 - 在会话变更、ECU复位等情况下自动重置安全状态 - 处理无效密钥响应时清除对应安全级别 - 提供get_unlocked_security_levels方法查询当前状态 --- test/client/test_security_access.py | 122 ++++++++++++++++++++++++++++ udsoncan/client.py | 69 +++++++++++++--- 2 files changed, 179 insertions(+), 12 deletions(-) diff --git a/test/client/test_security_access.py b/test/client/test_security_access.py index 55fb892..7460601 100755 --- a/test/client/test_security_access.py +++ b/test/client/test_security_access.py @@ -322,3 +322,125 @@ def test_no_algo_set(self): def _test_no_algo_set(self): with self.assertRaises(NotImplementedError): self.udsclient.unlock_security_access(0x07) + + +class TestSecurityAccessStateLifecycle(ClientServerTest): + def __init__(self, *args, **kwargs): + ClientServerTest.__init__(self, *args, **kwargs) + + def dummy_algo(self, level, seed, params=None): + key = bytearray(seed) + for i in range(len(key)): + key[i] = (params + level + i + key[i]) + return bytes(key) + + def test_security_level_tracked_after_unlock(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x07") + self.conn.fromuserqueue.put(b"\x67\x07\x11\x22\x33\x44") + request = self.conn.touserqueue.get(timeout=0.2) + key = bytearray([(0x10 + 0x07 + 0 + 0x11), (0x10 + 0x07 + 1 + 0x22), (0x10 + 0x07 + 2 + 0x33), (0x10 + 0x07 + 3 + 0x44)]) + self.assertEqual(request, b"\x27\x08" + bytes(key)) + self.conn.fromuserqueue.put(b"\x67\x08") + + def _test_security_level_tracked_after_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.unlock_security_access(0x07) + self.assertTrue(response.positive) + self.assertIn(0x07, self.udsclient.get_unlocked_security_levels()) + + def test_security_level_tracked_after_send_key(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x06\x11\x22\x33\x44") + self.conn.fromuserqueue.put(b"\x67\x06") + + def _test_security_level_tracked_after_send_key(self): + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + self.assertTrue(response.positive) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_security_level_tracked_after_zero_seed(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x05") + self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00") + + def _test_security_level_tracked_after_zero_seed(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.unlock_security_access(0x05) + self.assertTrue(response.positive) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_reset_after_session_change(self): + self.conn.fromuserqueue.put(b"\x50\x03\x00\x0A\x00\x14") + self.conn.fromuserqueue.put(b"\x27\x05") + self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00") + + def _test_reset_after_session_change(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['standard_version'] = 2013 + + self.udsclient._unlocked_security_levels.add(0x05) + self.udsclient._unlocked_security_levels.add(0x01) + self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2) + + self.udsclient.change_session(0x03) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + def test_reset_after_ecu_reset(self): + self.conn.fromuserqueue.put(b"\x51\x55") + + def _test_reset_after_ecu_reset(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.udsclient._unlocked_security_levels.add(0x01) + self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2) + + self.udsclient.ecu_reset(0x55) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + def test_reset_after_invalid_key(self): + self.wait_request_and_respond(b"\x7F\x27\x35") + + def _test_reset_after_invalid_key(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(NegativeResponseException): + self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_reset_after_invalid_key_no_exception(self): + self.wait_request_and_respond(b"\x7F\x27\x35") + + def _test_reset_after_invalid_key_no_exception(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + self.udsclient.config['exception_on_negative_response'] = False + response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertFalse(response.positive) + self.assertEqual(response.code, 0x35) + + def test_reset_after_required_time_delay(self): + self.wait_request_and_respond(b"\x7F\x27\x37") + + def _test_reset_after_required_time_delay(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(NegativeResponseException): + self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels()) diff --git a/udsoncan/client.py b/udsoncan/client.py index 489d11e..71674cd 100755 --- a/udsoncan/client.py +++ b/udsoncan/client.py @@ -21,7 +21,7 @@ import functools import time -from typing import Callable, Optional, Union, Dict, List, Any, cast, Type +from typing import Callable, Optional, Union, Dict, List, Any, cast, Type, Set class SessionTiming: @@ -108,6 +108,7 @@ def get_overrided_payload(self, original_payload: bytes) -> bytes: last_response: Optional[Response] session_timing: SessionTiming logger: logging.Logger + _unlocked_security_levels: Set[int] def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_config, request_timeout: Optional[float] = None): self.conn = conn @@ -121,6 +122,7 @@ def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_c self.last_response = None self.session_timing = SessionTiming(p2_server_max=None, p2_star_server_max=None) + self._unlocked_security_levels = set() self.refresh_config() @@ -134,9 +136,27 @@ def __exit__(self, type, value, traceback): def open(self) -> None: if not self.conn.is_open(): self.conn.open() + self._reset_security_access_state() def close(self) -> None: self.conn.close() + self._reset_security_access_state() + + def _reset_security_access_state(self, level: Optional[int] = None) -> None: + if level is None: + if len(self._unlocked_security_levels) > 0: + self.logger.info('Resetting all security access levels') + self._unlocked_security_levels.clear() + else: + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) + if normalized_level in self._unlocked_security_levels: + self.logger.info('Resetting security access level 0x%02x' % normalized_level) + self._unlocked_security_levels.remove(normalized_level) + + def get_unlocked_security_levels(self) -> Set[int]: + return set(self._unlocked_security_levels) def configure_logger(self) -> None: logger_name = 'UdsClient' @@ -259,6 +279,7 @@ def change_session(self, newsession: int) -> Optional[services.DiagnosticSession self.session_timing.p2_server_max = response.service_data.p2_server_max self.session_timing.p2_star_server_max = response.service_data.p2_star_server_max + self._reset_security_access_state() return response @standard_error_management @@ -323,19 +344,30 @@ def send_key(self, level: int, key: bytes) -> Optional[services.SecurityAccess.I (self.service_log_prefix(services.SecurityAccess), req.subfunction)) self.logger.debug('\tKey to send [%s]' % (binascii.hexlify(key).decode('ascii'))) - response = self.send_request(req) - if response is None: - return None + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) - response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey) + try: + response = self.send_request(req) + if response is None: + return None - expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level) - received_level = response.service_data.security_level_echo - if expected_level != received_level: - raise UnexpectedResponseException( - response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level)) + response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey) - return response + expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level) + received_level = response.service_data.security_level_echo + if expected_level != received_level: + raise UnexpectedResponseException( + response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level)) + + if response.positive: + self._unlocked_security_levels.add(normalized_level) + + return response + except NegativeResponseException: + self._reset_security_access_state(level) + raise @standard_error_management def unlock_security_access(self, level, seed_params=bytes()) -> Optional[services.SecurityAccess.InterpretedResponse]: @@ -360,9 +392,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service response = self.request_seed._func_no_error_management(self, level, data=seed_params) seed = response.service_data.seed + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) + if len(seed) > 0 and seed == b'\x00' * len(seed): self.logger.info('%s - Security access level 0x%02x is already unlocked, no key will be sent.' % (self.service_log_prefix(services.SecurityAccess), level)) + self._unlocked_security_levels.add(normalized_level) return response params = self.config['security_algo_params'] if 'security_algo_params' in self.config else None @@ -383,7 +420,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service algo_params = {'seed': seed, 'params': params, 'level': level} key = self.config['security_algo'].__call__(**algo_params) # type: ignore - return self.send_key._func_no_error_management(self, level, key) + try: + result = self.send_key._func_no_error_management(self, level, key) + if result is not None and result.positive: + self._unlocked_security_levels.add(normalized_level) + return result + except NegativeResponseException: + self._reset_security_access_state(level) + raise @standard_error_management def tester_present(self) -> Optional[services.TesterPresent.InterpretedResponse]: @@ -578,6 +622,7 @@ def ecu_reset(self, reset_type: int) -> Optional[services.ECUReset.InterpretedRe assert response.service_data.powerdown_time is not None self.logger.info('Server will shutdown in %d seconds.' % (response.service_data.powerdown_time)) + self._reset_security_access_state() return response @standard_error_management