Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added
### Changed

- Bumping cloudevents dep to 2.0

### Deprecated
### Removed
### Fixed
Expand Down
48 changes: 24 additions & 24 deletions cmd/fcloudevent/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import logging
from func_python.cloudevent import serve
from cloudevents.http import CloudEvent
from cloudevents.core.v1.event import CloudEvent

# Set the default logging level to INFO
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -30,30 +30,30 @@ async def handle(scope, receive, send):
event = scope["event"]
if not event:
error_event = CloudEvent(
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
{"message": "No CloudEvent found in scope"}
attributes={"type": "dev.functions.error", "source": "/fcloudevent/error"},
data={"message": "No CloudEvent found in scope"}
)
await send(error_event, 500)
return
logging.info(f"Received CloudEvent: type={event['type']}, source={event['source']}")

logging.info(f"Received CloudEvent: type={event.get_type()}, source={event.get_source()}")

# Handle event data - it might be bytes or dict
event_data = event.data
event_data = event.get_data()
if isinstance(event_data, bytes):
import json
event_data = json.loads(event_data)
logging.info(f"CloudEvent data: {event_data}")

response_event = CloudEvent(
{
attributes={
"type": "com.example.response.static",
"source": "/fcloudevent/static",
},
{
data={
"message": "OK: static CloudEvent handler",
"received_event_type": event['type'],
"received_event_source": event['source'],
"received_event_type": event.get_type(),
"received_event_source": event.get_source(),
"received_data": event_data
}
)
Expand All @@ -79,34 +79,34 @@ async def handle(self, scope, receive, send):
if not event:
# This shouldn't happen with our fix, but let's handle it gracefully
error_event = CloudEvent(
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
{"message": "No CloudEvent found in scope"}
attributes={"type": "dev.functions.error", "source": "/fcloudevent/error"},
data={"message": "No CloudEvent found in scope"}
)
await send(error_event, 500)
return

self.event_count += 1
logging.info(f"Received CloudEvent #{self.event_count}: type={event['type']}, source={event['source']}")

logging.info(f"Received CloudEvent #{self.event_count}: type={event.get_type()}, source={event.get_source()}")

# Handle event data - it might be bytes or dict
event_data = event.data
event_data = event.get_data()
if isinstance(event_data, bytes):
import json
event_data = json.loads(event_data)
logging.info(f"CloudEvent data: {event_data}")
logging.info(f"Total events processed: {self.event_count}")

response_event = CloudEvent(
{
attributes={
"type": "com.example.response.instanced",
"source": "/fcloudevent/instanced",
},
{
data={
"message": "OK: instanced CloudEvent handler",
"event_count": self.event_count,
"received_event_type": event['type'],
"received_event_source": event['source'],
"received_event_type": event.get_type(),
"received_event_source": event.get_source(),
"received_data": event_data
}
)
Expand Down
59 changes: 42 additions & 17 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ license = "Apache-2.0"
repository = "https://github.com/knative-extensions/func-python"

[tool.poetry.dependencies]
python = "^3.8"
python = "^3.10"
hypercorn = "^0.17.3"
cloudevents = "^1.11.0"
cloudevents = "^2.0.0"

[tool.pytest.ini_options]
pythonpath = ["src"]
Expand Down
28 changes: 15 additions & 13 deletions src/func_python/cloudevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import hypercorn.config
import hypercorn.asyncio

from cloudevents.http import from_http, CloudEvent
from cloudevents.conversion import to_structured, to_binary
from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields
from cloudevents.core.v1.event import CloudEvent
from cloudevents.core.bindings.http import (
from_http_event, to_structured_event, to_binary_event, HTTPMessage,
)
from cloudevents.core.exceptions import CloudEventValidationError

import func_python.sock

Expand Down Expand Up @@ -50,8 +52,8 @@ async def handle(self, scope, receive, send):


class ASGIApplication():
""" ASGIApplication is a wrapper around a Function instance which
exposes it as an ASGI Application.
""" ASGIApplication is a wrapper around a Function instance which
exposes it as an ASGI Application.
"""
def __init__(self, f):
self.f = f
Expand Down Expand Up @@ -153,12 +155,12 @@ async def __call__(self, scope, receive, send):
send = CloudEventSender(send)
# Delegate processing to user's Function
await self.f.handle(scope, receive, send)
except (MissingRequiredFields, InvalidRequiredFields) as e:
except (CloudEventValidationError, ValueError) as e:
# Log the non-CloudEvent request for debugging
logging.warning(f"Received non-CloudEvent request: {scope['method']} {scope['path']}")
headers_dict = {k.decode('utf-8'): v.decode('utf-8') for k, v in scope.get('headers', [])}
logging.debug(f"Request headers: {headers_dict}")

# Return 400 Bad Request for non-CloudEvent requests
await send({
'type': 'http.response.start',
Expand Down Expand Up @@ -231,7 +233,7 @@ async def decode_event(scope, receive):
k.decode("utf-8").lower(): v.decode("utf-8")
for k, v in scope.get("headers", [])
}
return from_http(headers, body)
return from_http_event(HTTPMessage(headers=headers, body=body))


async def receive_body(receive):
Expand Down Expand Up @@ -264,7 +266,7 @@ async def send_exception_cloudevent(send, status, message):

# Check if send is a CloudEventSender with structured method
if hasattr(send, 'structured'):
await send.structured(CloudEvent(attributes, data), status)
await send.structured(CloudEvent(attributes=attributes, data=data), status)
else:
# Fallback to plain HTTP error response if send is not a CloudEventSender
logging.warning("send_exception_cloudevent called with non-CloudEventSender, falling back to HTTP response")
Expand Down Expand Up @@ -293,13 +295,13 @@ async def __call__(self, event, status: int = 200):

async def structured(self, event, status=200):
"""send as a structured cloudevent"""
headers, body = to_structured(event)
await self._send_encoded_cloudevent(headers, body, status)
msg = to_structured_event(event)
await self._send_encoded_cloudevent(msg.headers, msg.body, status)

async def binary(self, event, status=200):
"""send as a binary cloudevent"""
headers, body = to_binary(event)
await self._send_encoded_cloudevent(headers, body, status)
msg = to_binary_event(event)
await self._send_encoded_cloudevent(msg.headers, msg.body, status)

async def http(self, message):
"""Send a raw http response, bypassing the automatic cloudevent
Expand Down
Loading
Loading