Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ _Changes in the next release_

### Changed
- Improved WS msg processing with dedicated consumer, producer and router tasks with asyncio queues ([#47](https://github.com/unfoldedcircle/integration-python-library/pull/47)).
- Sanitize log messages to prevent sensitive information exposure.
- Updated GitHub actions.

---
Expand Down
60 changes: 50 additions & 10 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import unittest
from copy import deepcopy

from ucapi.api import filter_log_msg_data
from ucapi.api import sanitize_json_message
from ucapi.media_player import Attributes


class TestFilterLogMsgData(unittest.TestCase):
class TestSanitizeJsonMessage(unittest.TestCase):

def test_no_modification_when_no_msg_data(self):
data = {}
result = filter_log_msg_data(data)
result = sanitize_json_message(data)
self.assertEqual(result, {}, "The result should be an empty dictionary")

def test_no_changes_when_media_image_url_not_present(self):
data = {"msg_data": {"attributes": {"state": "playing", "volume": 50}}}
original = deepcopy(data)

result = filter_log_msg_data(data)
result = sanitize_json_message(data)

self.assertEqual(
result,
Expand All @@ -36,9 +36,9 @@ def test_filtering_media_image_url_in_dict(self):
expected_result = deepcopy(data)
expected_result["msg_data"]["attributes"][
Attributes.MEDIA_IMAGE_URL
] = "data:***"
] = "data:..."

result = filter_log_msg_data(data)
result = sanitize_json_message(data)

self.assertEqual(
result, expected_result, "The MEDIA_IMAGE_URL attribute should be filtered"
Expand All @@ -65,12 +65,12 @@ def test_filtering_media_image_url_in_list(self):
expected_result = deepcopy(data)
expected_result["msg_data"][0]["attributes"][
Attributes.MEDIA_IMAGE_URL
] = "data:***"
] = "data:..."
expected_result["msg_data"][1]["attributes"][
Attributes.MEDIA_IMAGE_URL
] = "data:***"
] = "data:..."

result = filter_log_msg_data(data)
result = sanitize_json_message(data)

self.assertEqual(
result,
Expand All @@ -88,8 +88,48 @@ def test_input_is_not_modified(self):
}
original_data = deepcopy(data)

filter_log_msg_data(data)
sanitize_json_message(data)

self.assertEqual(
data, original_data, "The input data should not be modified by the function"
)

def test_generic_sensitive_keys_redaction(self):
sensitive_keys = [
"token",
"token_id",
"access_token",
"refresh_token",
"id_token",
"authorization_code",
"client_secret",
"secret",
"auth_url",
"client_data",
"password",
]

for key in sensitive_keys:
msg = {key: "sensitive-value", "other": "public-value"}
sanitized = sanitize_json_message(msg)
self.assertEqual(
sanitized[key], "***REDACTED***", f"{key} should be redacted"
)
self.assertEqual(
sanitized["other"], "public-value", "public fields should remain intact"
)

def test_recursive_redaction(self):
msg = {
"level1": {
"token": "secret1",
"level2": {"secret": "secret2", "public": "data"},
},
"array": [{"refresh_token": "secret3"}, "plain-string"],
}
sanitized = sanitize_json_message(msg)
self.assertEqual(sanitized["level1"]["token"], "***REDACTED***")
self.assertEqual(sanitized["level1"]["level2"]["secret"], "***REDACTED***")
self.assertEqual(sanitized["level1"]["level2"]["public"], "data")
self.assertEqual(sanitized["array"][0]["refresh_token"], "***REDACTED***")
self.assertEqual(sanitized["array"][1], "plain-string")
116 changes: 78 additions & 38 deletions ucapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ async def _enqueue_ws_payload(self, websocket, payload: dict[str, Any]) -> None:

if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug(
"[%s] ->: %s", websocket.remote_address, filter_log_msg_data(payload)
"[%s] <-: %s", websocket.remote_address, sanitize_json_message(payload)
)

match payload.get("kind"):
Expand Down Expand Up @@ -513,7 +513,10 @@ async def _send_ws_event(
await self._enqueue_ws_payload(websocket, data)

async def _process_ws_message(self, websocket, data: dict[str, Any]) -> None:
_LOG.debug("[%s] <-: %s", websocket.remote_address, data)
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug(
"[%s] ->: %s", websocket.remote_address, sanitize_json_message(data)
)

kind = data["kind"]
req_id = data.get("id")
Expand Down Expand Up @@ -630,7 +633,7 @@ async def _process_ws_binary_message(self, websocket, data: bytes) -> None:
"""
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug(
"[%s] <-: <binary %d bytes>", websocket.remote_address, len(data)
"[%s] ->: <binary %d bytes>", websocket.remote_address, len(data)
)

# Parse IntegrationMessage from bytes
Expand Down Expand Up @@ -1777,46 +1780,83 @@ def local_hostname() -> str:
)


def filter_log_msg_data(data: dict[str, Any]) -> dict[str, Any]:
_REDACTED_VALUE = "***REDACTED***"
_SENSITIVE_KEYS = {
"token",
"token_id",
"access_token",
"refresh_token",
"id_token",
"authorization_code",
"client_secret",
"secret",
"auth_url",
"client_data",
"password",
}


def _filter_base64_images(json_data: Any) -> Any:
"""
Filter out base64 encoded images from a JSON object.

**Attention:** the provided JSON object is modified in-place!

:param json_data: The JSON object to filter.
:returns: The filtered JSON object.
"""
Filter attribute fields to exclude for log messages in the given msg data dict.
if json_data and isinstance(json_data, dict) and "msg_data" in json_data:
msg_data = json_data["msg_data"]
if isinstance(msg_data, list):
for item in msg_data:
if (
isinstance(item, dict)
and "attributes" in item
and isinstance(item["attributes"], dict)
and item["attributes"]
.get(MediaAttr.MEDIA_IMAGE_URL, "")
.startswith("data:")
):
item["attributes"][MediaAttr.MEDIA_IMAGE_URL] = "data:..."
elif (
isinstance(msg_data, dict)
and "attributes" in msg_data
and isinstance(msg_data["attributes"], dict)
and msg_data["attributes"]
.get(MediaAttr.MEDIA_IMAGE_URL, "")
.startswith("data:")
):
msg_data["attributes"][MediaAttr.MEDIA_IMAGE_URL] = "data:..."
return json_data

- Attributes are filtered in `data["msg_data"]`:
- dict object: key `attributes`
- list object: every list item `attributes`
- Filtered attributes: `MEDIA_IMAGE_URL`

:param data: the message data dict
:return: copy of the message data dict with filtered attributes
def sanitize_json_message(data: Any) -> Any:
"""
Sanitizes a JSON message by redacting sensitive fields such as tokens and secrets.

Base64 encoded images starting with `data:` are removed in `msg_data.attributes.media_image_url`
fields to limit log output.

The original message is not modified, the returned redacted message is a deepcopy.

:param data: The JSON object to be sanitized.
:return: The sanitized JSON object with sensitive information redacted.
"""
# do not modify the original dict
log_upd = deepcopy(data)
if not log_upd:
json_upd = deepcopy(data)
if not json_upd:
return {}

# filter out base64 encoded images in the media player's media_image_url attribute
if "msg_data" in log_upd:
if (
"attributes" in log_upd["msg_data"]
and MediaAttr.MEDIA_IMAGE_URL in log_upd["msg_data"]["attributes"]
and (
media_image_url := log_upd["msg_data"]["attributes"][
MediaAttr.MEDIA_IMAGE_URL
]
)
and media_image_url.startswith("data:")
):
log_upd["msg_data"]["attributes"][MediaAttr.MEDIA_IMAGE_URL] = "data:***"
elif isinstance(log_upd["msg_data"], list):
for item in log_upd["msg_data"]:
if (
"attributes" in item
and MediaAttr.MEDIA_IMAGE_URL in item["attributes"]
and (
media_image_url := item["attributes"][MediaAttr.MEDIA_IMAGE_URL]
)
and media_image_url.startswith("data:")
):
item["attributes"][MediaAttr.MEDIA_IMAGE_URL] = "data:***"
def sanitize_for_logging(value: Any) -> Any:
if value and isinstance(value, (dict, list)):
if isinstance(value, list):
return [sanitize_for_logging(item) for item in value]

for k, v in value.items():
if k in _SENSITIVE_KEYS:
value[k] = _REDACTED_VALUE
else:
value[k] = sanitize_for_logging(v)
return value

return log_upd
return sanitize_for_logging(_filter_base64_images(json_upd))