Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repos:
- id: check-merge-conflict
- id: check-ast
fail_fast: True
language_version: python3.12
- id: check-json
- id: check-added-large-files
args: ['--maxkb=200']
Expand All @@ -35,4 +36,5 @@ repos:
hooks:
- id: mypy
files: 'src/.*\.py$'
additional_dependencies: ['types-setuptools==57.0.2']
additional_dependencies: ['types-setuptools>=57.0.2']
language_version: python3.12
12 changes: 2 additions & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,10 @@ Workflows
:target: https://pypi.org/project/workflows/
:alt: Supported Python versions

.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
.. image:: https://img.shields.io/badge/code%20style-ruff-000000.svg
:target: https://github.com/astral-sh/ruff
:alt: Code style: black

.. image:: https://img.shields.io/lgtm/grade/python/g/DiamondLightSource/python-workflows.svg?logo=lgtm&logoWidth=18
:target: https://lgtm.com/projects/g/DiamondLightSource/python-workflows/context:python
:alt: Language grade: Python

.. image:: https://img.shields.io/lgtm/alerts/g/DiamondLightSource/python-workflows.svg?logo=lgtm&logoWidth=18
:target: https://lgtm.com/projects/g/DiamondLightSource/python-workflows/alerts/
:alt: Total alerts

Workflows enables light-weight services to process tasks in a message-oriented
environment.

Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,20 @@ filename = "pyproject.toml"
filename = "src/workflows/__init__.py"

[tool.ruff.lint]
select = ["E", "F", "W", "C4", "I"]
select = ["E", "F", "W", "C4", "I", "ANN"]
unfixable = ["F841"]
# E501 line too long (handled by formatter)
ignore = ["E501"]
ignore = ["E501", "ANN204", "ANN401"]

[tool.ruff.lint.isort]
known-first-party = ["dxtbx_*", "dxtbx"]
required-imports = ["from __future__ import annotations"]

[tool.ruff.lint.per-file-ignores]
"tests/**" = ["ANN"]

[tool.mypy]
mypy_path = "src/"

[tool.pyright]
exclude = ["tests", ".venv"]
44 changes: 27 additions & 17 deletions src/workflows/contrib/start_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import optparse
import sys
from collections.abc import Callable
from optparse import SUPPRESS_HELP, OptionParser
from typing import Any

import workflows
import workflows.frontend
Expand All @@ -17,20 +19,22 @@ class ServiceStarter:
used in a number of scenarios."""

@staticmethod
def on_parser_preparation(parser):
def on_parser_preparation(parser: OptionParser) -> OptionParser | None:
"""Plugin hook to manipulate the OptionParser object before command line
parsing. If a value is returned here it will replace the OptionParser
object."""

@staticmethod
def on_parsing(options, args):
def on_parsing(
options: optparse.Values, args: list[str]
) -> tuple[optparse.Values, list[str]] | None:
"""Plugin hook to manipulate the command line parsing results.
A tuple of values can be returned, which will replace (options, args).
"""

@staticmethod
def on_transport_factory_preparation(
transport_factory,
transport_factory: Callable[[], CommonTransport],
) -> Callable[[], CommonTransport] | None:
"""Plugin hook to intercept/manipulate newly created Transport factories
before first invocation."""
Expand All @@ -41,28 +45,32 @@ def on_transport_preparation(transport: CommonTransport) -> CommonTransport | No
before connecting."""

@staticmethod
def before_frontend_construction(kwargs):
def before_frontend_construction(kwargs: dict[str, Any]) -> dict[str, Any] | None:
"""Plugin hook to manipulate the Frontend object constructor arguments. If
a value is returned here it will replace the keyword arguments
dictionary passed to the constructor."""

@staticmethod
def on_frontend_preparation(frontend):
def on_frontend_preparation(
frontend: workflows.frontend.Frontend,
) -> workflows.frontend.Frontend | None:
"""Plugin hook to manipulate the Frontend object before starting it. If a
value is returned here it will replace the Frontend object."""

def run(
self,
cmdline_args=None,
program_name="start_service",
version=None,
cmdline_args: list[str] | None = None,
program_name: str = "start_service",
version: str | None = None,
add_metrics_option: bool = False,
**kwargs,
):
**kwargs: Any,
) -> None:
"""Example command line interface to start services.
:param cmdline_args: List of command line arguments to pass to parser
:param program_name: Name of the command line tool to display in help
:param version: Version number to print when run with '--version'

Args:
cmdline_args: List of command line arguments to pass to parser
program_name: Name of the command line tool to display in help
version: Version number to print when run with '--version'
"""

# Enumerate all known services
Expand Down Expand Up @@ -163,10 +171,12 @@ def on_transport_preparation_hook() -> CommonTransport:
if options.service not in known_services:
# First check whether the provided service name is a case-insensitive match.
service_lower = options.service.lower()
match = {s.lower(): s for s in known_services}.get(service_lower, None)
match = (
[match]
if match
exact_match = {s.lower(): s for s in known_services}.get(
service_lower, None
)
match: list[str] = (
[exact_match]
if exact_match
# Next, check whether the provided service name is a partial
# case-sensitive match.
else [s for s in known_services if s.startswith(options.service)]
Expand Down
43 changes: 25 additions & 18 deletions src/workflows/contrib/status_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import curses
import threading
import time
from typing import Any
from collections.abc import Mapping
from typing import Any, Callable

import workflows.transport
from workflows.services.common_service import CommonService

basestring = (str, bytes)
from workflows.transport.common_transport import CommonTransport


class Monitor: # pragma: no cover
Expand All @@ -19,29 +19,29 @@ class Monitor: # pragma: no cover
shutdown = False
"""Set to true to end the main loop and shut down the service monitor."""

cards: dict[Any, Any] = {}
cards: list
"""Register card shown for seen services"""

border_chars = ()
"""Characters used for frame borders."""
border_chars_text = ("|", "|", "=", "=", "/", "\\", "\\", "/")
"""Example alternative set of frame border characters."""

def __init__(self, transport=None):
def __init__(self, transport: Callable[[], CommonTransport] | str | None = None):
"""Set up monitor and connect to the network transport layer"""
if transport is None or isinstance(transport, basestring):
self._transport = workflows.transport.lookup(transport)()
else:
if callable(transport):
self._transport = transport()
else:
self._transport = workflows.transport.lookup(transport)()
assert self._transport.connect(), "Could not connect to transport layer"
self._lock = threading.RLock()
self._node_status = {}
self.message_box = None
self._node_status: dict = {}
self.message_box: curses.window | None = None
self._transport.subscribe_broadcast(
"transient.status", self.update_status, retroactive=True
)

def update_status(self, header, message):
def update_status(self, header: Mapping[str, Any], message: Any) -> None:
"""Process incoming status message. Acquire lock for status dictionary before updating."""
with self._lock:
if self.message_box:
Expand Down Expand Up @@ -70,14 +70,21 @@ def update_status(self, header, message):
self._node_status[message["host"]] = message
self._node_status[message["host"]]["last_seen"] = receipt_time

def run(self):
def run(self) -> None:
"""A wrapper for the real _run() function to cleanly enable/disable the
curses environment."""
curses.wrapper(self._run)

def _boxwin(
self, height, width, row, column, title=None, title_x=7, color_pair=None
):
self,
height: int,
width: int,
row: int,
column: int,
title: str | None = None,
title_x: int = 7,
color_pair: int | None = None,
) -> curses.window:
with self._lock:
box = curses.newwin(height, width, row, column)
box.clear()
Expand All @@ -91,7 +98,7 @@ def _boxwin(
box.noutrefresh()
return curses.newwin(height - 2, width - 2, row + 1, column + 1)

def _redraw_screen(self, stdscr):
def _redraw_screen(self, stdscr: curses.window) -> None:
"""Redraw screen. This could be to initialize, or to redraw after resizing."""
with self._lock:
stdscr.clear()
Expand All @@ -105,7 +112,7 @@ def _redraw_screen(self, stdscr):
self.message_box.scrollok(True)
self.cards = []

def _get_card(self, number):
def _get_card(self, number: int) -> curses.window:
with self._lock:
if number < len(self.cards):
return self.cards[number]
Expand All @@ -123,7 +130,7 @@ def _get_card(self, number):
return self.cards[number]
raise RuntimeError("Card number too high")

def _erase_card(self, number):
def _erase_card(self, number: int) -> None:
"""Destroy cards with this or higher number."""
with self._lock:
if number < (len(self.cards) - 1):
Expand All @@ -141,7 +148,7 @@ def _erase_card(self, number):
obliterate.noutrefresh()
del self.cards[number]

def _run(self, stdscr):
def _run(self, stdscr: curses.window) -> None:
"""Start the actual service monitor"""
with self._lock:
curses.use_default_colors()
Expand Down
Loading