Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions test/client/test_security_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,125 @@ def test_no_algo_set(self):
def _test_no_algo_set(self):
with self.assertRaises(NotImplementedError):
self.udsclient.unlock_security_access(0x07)


class TestSecurityAccessStateLifecycle(ClientServerTest):
def __init__(self, *args, **kwargs):
ClientServerTest.__init__(self, *args, **kwargs)

def dummy_algo(self, level, seed, params=None):
key = bytearray(seed)
for i in range(len(key)):
key[i] = (params + level + i + key[i])
return bytes(key)

def test_security_level_tracked_after_unlock(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x27\x07")
self.conn.fromuserqueue.put(b"\x67\x07\x11\x22\x33\x44")
request = self.conn.touserqueue.get(timeout=0.2)
key = bytearray([(0x10 + 0x07 + 0 + 0x11), (0x10 + 0x07 + 1 + 0x22), (0x10 + 0x07 + 2 + 0x33), (0x10 + 0x07 + 3 + 0x44)])
self.assertEqual(request, b"\x27\x08" + bytes(key))
self.conn.fromuserqueue.put(b"\x67\x08")

def _test_security_level_tracked_after_unlock(self):
self.udsclient.config['security_algo'] = self.dummy_algo
self.udsclient.config['security_algo_params'] = 0x10

self.assertEqual(self.udsclient.get_unlocked_security_levels(), set())

response = self.udsclient.unlock_security_access(0x07)
self.assertTrue(response.positive)
self.assertIn(0x07, self.udsclient.get_unlocked_security_levels())

def test_security_level_tracked_after_send_key(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x27\x06\x11\x22\x33\x44")
self.conn.fromuserqueue.put(b"\x67\x06")

def _test_security_level_tracked_after_send_key(self):
self.assertEqual(self.udsclient.get_unlocked_security_levels(), set())

response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44")
self.assertTrue(response.positive)
self.assertIn(0x05, self.udsclient.get_unlocked_security_levels())

def test_security_level_tracked_after_zero_seed(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x27\x05")
self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00")

def _test_security_level_tracked_after_zero_seed(self):
self.udsclient.config['security_algo'] = self.dummy_algo
self.udsclient.config['security_algo_params'] = 0x10

self.assertEqual(self.udsclient.get_unlocked_security_levels(), set())

response = self.udsclient.unlock_security_access(0x05)
self.assertTrue(response.positive)
self.assertIn(0x05, self.udsclient.get_unlocked_security_levels())

def test_reset_after_session_change(self):
self.conn.fromuserqueue.put(b"\x50\x03\x00\x0A\x00\x14")
self.conn.fromuserqueue.put(b"\x27\x05")
self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00")

def _test_reset_after_session_change(self):
self.udsclient.config['security_algo'] = self.dummy_algo
self.udsclient.config['security_algo_params'] = 0x10
self.udsclient.config['standard_version'] = 2013

self.udsclient._unlocked_security_levels.add(0x05)
self.udsclient._unlocked_security_levels.add(0x01)
self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2)

self.udsclient.change_session(0x03)
self.assertEqual(self.udsclient.get_unlocked_security_levels(), set())

def test_reset_after_ecu_reset(self):
self.conn.fromuserqueue.put(b"\x51\x55")

def _test_reset_after_ecu_reset(self):
self.udsclient._unlocked_security_levels.add(0x05)
self.udsclient._unlocked_security_levels.add(0x01)
self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2)

self.udsclient.ecu_reset(0x55)
self.assertEqual(self.udsclient.get_unlocked_security_levels(), set())

def test_reset_after_invalid_key(self):
self.wait_request_and_respond(b"\x7F\x27\x35")

def _test_reset_after_invalid_key(self):
self.udsclient._unlocked_security_levels.add(0x05)
self.assertIn(0x05, self.udsclient.get_unlocked_security_levels())

with self.assertRaises(NegativeResponseException):
self.udsclient.send_key(0x06, b"\x11\x22\x33\x44")

self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels())

def test_reset_after_invalid_key_no_exception(self):
self.wait_request_and_respond(b"\x7F\x27\x35")

def _test_reset_after_invalid_key_no_exception(self):
self.udsclient._unlocked_security_levels.add(0x05)
self.assertIn(0x05, self.udsclient.get_unlocked_security_levels())

self.udsclient.config['exception_on_negative_response'] = False
response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44")

self.assertFalse(response.positive)
self.assertEqual(response.code, 0x35)

def test_reset_after_required_time_delay(self):
self.wait_request_and_respond(b"\x7F\x27\x37")

def _test_reset_after_required_time_delay(self):
self.udsclient._unlocked_security_levels.add(0x05)
self.assertIn(0x05, self.udsclient.get_unlocked_security_levels())

with self.assertRaises(NegativeResponseException):
self.udsclient.send_key(0x06, b"\x11\x22\x33\x44")

self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels())
69 changes: 57 additions & 12 deletions udsoncan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import functools
import time

from typing import Callable, Optional, Union, Dict, List, Any, cast, Type
from typing import Callable, Optional, Union, Dict, List, Any, cast, Type, Set


class SessionTiming:
Expand Down Expand Up @@ -108,6 +108,7 @@ def get_overrided_payload(self, original_payload: bytes) -> bytes:
last_response: Optional[Response]
session_timing: SessionTiming
logger: logging.Logger
_unlocked_security_levels: Set[int]

def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_config, request_timeout: Optional[float] = None):
self.conn = conn
Expand All @@ -121,6 +122,7 @@ def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_c
self.last_response = None

self.session_timing = SessionTiming(p2_server_max=None, p2_star_server_max=None)
self._unlocked_security_levels = set()

self.refresh_config()

Expand All @@ -134,9 +136,27 @@ def __exit__(self, type, value, traceback):
def open(self) -> None:
if not self.conn.is_open():
self.conn.open()
self._reset_security_access_state()

def close(self) -> None:
self.conn.close()
self._reset_security_access_state()

def _reset_security_access_state(self, level: Optional[int] = None) -> None:
if level is None:
if len(self._unlocked_security_levels) > 0:
self.logger.info('Resetting all security access levels')
self._unlocked_security_levels.clear()
else:
normalized_level = services.SecurityAccess.normalize_level(
mode=services.SecurityAccess.Mode.RequestSeed, level=level
)
if normalized_level in self._unlocked_security_levels:
self.logger.info('Resetting security access level 0x%02x' % normalized_level)
self._unlocked_security_levels.remove(normalized_level)

def get_unlocked_security_levels(self) -> Set[int]:
return set(self._unlocked_security_levels)

def configure_logger(self) -> None:
logger_name = 'UdsClient'
Expand Down Expand Up @@ -259,6 +279,7 @@ def change_session(self, newsession: int) -> Optional[services.DiagnosticSession
self.session_timing.p2_server_max = response.service_data.p2_server_max
self.session_timing.p2_star_server_max = response.service_data.p2_star_server_max

self._reset_security_access_state()
return response

@standard_error_management
Expand Down Expand Up @@ -323,19 +344,30 @@ def send_key(self, level: int, key: bytes) -> Optional[services.SecurityAccess.I
(self.service_log_prefix(services.SecurityAccess), req.subfunction))
self.logger.debug('\tKey to send [%s]' % (binascii.hexlify(key).decode('ascii')))

response = self.send_request(req)
if response is None:
return None
normalized_level = services.SecurityAccess.normalize_level(
mode=services.SecurityAccess.Mode.RequestSeed, level=level
)

response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey)
try:
response = self.send_request(req)
if response is None:
return None

expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level)
received_level = response.service_data.security_level_echo
if expected_level != received_level:
raise UnexpectedResponseException(
response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level))
response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey)

return response
expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level)
received_level = response.service_data.security_level_echo
if expected_level != received_level:
raise UnexpectedResponseException(
response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level))

if response.positive:
self._unlocked_security_levels.add(normalized_level)

return response
except NegativeResponseException:
self._reset_security_access_state(level)
raise

@standard_error_management
def unlock_security_access(self, level, seed_params=bytes()) -> Optional[services.SecurityAccess.InterpretedResponse]:
Expand All @@ -360,9 +392,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service

response = self.request_seed._func_no_error_management(self, level, data=seed_params)
seed = response.service_data.seed
normalized_level = services.SecurityAccess.normalize_level(
mode=services.SecurityAccess.Mode.RequestSeed, level=level
)

if len(seed) > 0 and seed == b'\x00' * len(seed):
self.logger.info('%s - Security access level 0x%02x is already unlocked, no key will be sent.' %
(self.service_log_prefix(services.SecurityAccess), level))
self._unlocked_security_levels.add(normalized_level)
return response

params = self.config['security_algo_params'] if 'security_algo_params' in self.config else None
Expand All @@ -383,7 +420,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service
algo_params = {'seed': seed, 'params': params, 'level': level}

key = self.config['security_algo'].__call__(**algo_params) # type: ignore
return self.send_key._func_no_error_management(self, level, key)
try:
result = self.send_key._func_no_error_management(self, level, key)
if result is not None and result.positive:
self._unlocked_security_levels.add(normalized_level)
return result
except NegativeResponseException:
self._reset_security_access_state(level)
raise

@standard_error_management
def tester_present(self) -> Optional[services.TesterPresent.InterpretedResponse]:
Expand Down Expand Up @@ -578,6 +622,7 @@ def ecu_reset(self, reset_type: int) -> Optional[services.ECUReset.InterpretedRe
assert response.service_data.powerdown_time is not None
self.logger.info('Server will shutdown in %d seconds.' % (response.service_data.powerdown_time))

self._reset_security_access_state()
return response

@standard_error_management
Expand Down