Skip to content

Commit 6ce8bc5

Browse files
authored
store rooms scoped (#49)
* room scoped * pep
1 parent 4b43b9b commit 6ce8bc5

6 files changed

Lines changed: 70 additions & 27 deletions

File tree

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ jobs:
1313
strategy:
1414
max-parallel: 4
1515
matrix:
16-
python-version: ["3.10", "3.11", "3.12", "3.13"]
16+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
1717
steps:
18-
- uses: actions/checkout@v4
18+
- uses: actions/checkout@v6
1919

2020
- name: Set up Python ${{ matrix.python-version }}
21-
uses: actions/setup-python@v5
21+
uses: actions/setup-python@v6
2222
with:
2323
python-version: ${{ matrix.python-version }}
2424
- name: Install dependencies

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ dist/
1212

1313
# Temporary sample files
1414
sample*.py
15+
16+
17+
# vscode
18+
.vscode/

thingsdb/client/client.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
self._scope = '@t' # default to thingsdb scope
5959
self._pool_idx = 0
6060
self._reconnecting = False
61-
self._rooms: dict[int, RoomBase] = dict()
61+
self._rooms: dict[str, dict[int, RoomBase]] = defaultdict(dict)
6262
self._rooms_lock = asyncio.Lock()
6363

6464
if ssl is True:
@@ -74,7 +74,10 @@ def get_rooms(self) -> tuple[RoomBase, ...]:
7474
Returns:
7575
a tuple with unique Room instances.
7676
"""
77-
return tuple(self._rooms.values())
77+
total: list[RoomBase] = []
78+
for rooms in self._rooms.values():
79+
total.extend(rooms.values())
80+
return tuple(total)
7881

7982
def get_event_loop(self) -> asyncio.AbstractEventLoop:
8083
"""Can be used to get the event loop.
@@ -632,10 +635,19 @@ async def _connect(self, timeout: int | None = 5):
632635
self._pool_idx += 1
633636
self._pool_idx %= len(self._pool)
634637

635-
async def _on_room(self, room_id: int, pkg: Package):
638+
async def _on_room(self, scope: str | None, room_id: int, pkg: Package):
636639
async with self._rooms_lock:
637640
try:
638-
room = self._rooms[room_id]
641+
if scope is None:
642+
# Fallback for ThingsDB < 1.8.6
643+
for rooms in self._rooms.values():
644+
room = rooms.get(room_id)
645+
if room is not None:
646+
break
647+
else:
648+
raise KeyError
649+
else:
650+
room = self._rooms[scope][room_id]
639651
except KeyError:
640652
logging.warning(
641653
f'Got an event (tp:{pkg.tp}) for room Id {room_id} but '
@@ -658,7 +670,8 @@ def _on_event(self, pkg: Package):
658670
return
659671

660672
try:
661-
room_id = pkg.data['id']
673+
scope = pkg.data.get('scope') # ThingsDB < 1.8.6
674+
room_id: int = pkg.data['id']
662675
except KeyError:
663676
if pkg.tp == Proto.ON_WARN:
664677
warn = pkg.data
@@ -669,7 +682,7 @@ def _on_event(self, pkg: Package):
669682
logging.warning(
670683
f'Unexpected event: tp:{pkg.tp} data:{pkg.data}')
671684
else:
672-
asyncio.ensure_future(self._on_room(room_id, pkg),
685+
asyncio.ensure_future(self._on_room(scope, room_id, pkg),
673686
loop=self.get_event_loop())
674687

675688
def _on_connection_lost(self, protocol: asyncio.Protocol, exc: Exception):
@@ -728,9 +741,10 @@ async def _rejoin(self):
728741

729742
# re-arrange the rooms per scope to combine joins in a less requests
730743
scopes: dict[str, list[int]] = defaultdict(list)
731-
for room in self._rooms.values():
732-
if room.id and room.scope:
733-
scopes[room.scope].append(room.id)
744+
for rooms in self._rooms.values():
745+
for room in rooms.values():
746+
if room.id and room.scope:
747+
scopes[room.scope].append(room.id)
734748

735749
# join request per scope, each for one or more rooms
736750
await asyncio.gather(*[

thingsdb/room/roombase.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from ..client import Client
77
from ..client.protocol import Proto
88
from ..util.is_name import is_name
9+
from ..util.cnscope import cnscope
10+
from ..util.fcscope import fcscope
911
if TYPE_CHECKING:
1012
from ..client.package import Package
1113

@@ -47,11 +49,13 @@ def __init__(
4749
"""
4850
self._client: Client | None = None
4951
self._id = room
50-
self._scope = scope
52+
self._key = None
53+
self._scope = \
54+
None if scope is None else f'@collection:{cnscope(scope)}'
5155
self._wait_join: bool | asyncio.Future[None] | None = False
5256

5357
@property
54-
def id(self):
58+
def id(self) -> int | None:
5559
return self._id if isinstance(self._id, int) else None
5660

5761
@property
@@ -72,7 +76,8 @@ async def no_join(self, client: Client):
7276
"""
7377
async with client._rooms_lock:
7478
if self._scope is None:
75-
self._scope = client.get_default_scope()
79+
scope = client.get_default_scope()
80+
self._scope = f'@collection:{cnscope(scope)}'
7681
self._client = client
7782

7883
if isinstance(self._id, str):
@@ -96,6 +101,7 @@ async def no_join(self, client: Client):
96101
'!is_err(try(room(id)));', id=id, scope=self._scope)
97102
if not is_room:
98103
raise TypeError(f'Id `{id}` is not a room')
104+
assert isinstance(id, int)
99105
self._id = id
100106

101107
async def join(self, client: Client, wait: float | None = 60.0):
@@ -116,7 +122,9 @@ async def join(self, client: Client, wait: float | None = 60.0):
116122
# is created inside the dict *before* the on_join is called.
117123
async with client._rooms_lock:
118124
if self._scope is None:
119-
self._scope = client.get_default_scope()
125+
scope = client.get_default_scope()
126+
self._scope = f'@collection:{cnscope(scope)}'
127+
120128
self._client = client
121129

122130
if isinstance(self._id, str):
@@ -141,19 +149,23 @@ async def join(self, client: Client, wait: float | None = 60.0):
141149
f'the room Id has been returned using the ThingsDB '
142150
f'code `{code}` using scope `{self._scope}`')
143151
self._id = id
152+
assert isinstance(self._id, int)
144153
else:
145154
assert isinstance(self._id, int)
146155
res = await client._join(self._id, scope=self._scope)
147156
if res[0] is None:
148157
raise LookupError(f'room with Id {self._id} not found')
149158

150-
if self._id in client._rooms:
151-
prev = client._rooms[self._id]
159+
self._scope = fcscope(self._scope)
160+
try:
161+
prev = client._rooms[self._scope][self._id]
152162
logging.warning(
153-
f'Room Id {self._id} is previously registered by {prev} '
163+
f'Room Id {self._key} is previously registered by {prev} '
154164
f'and will be overwritten with {self}')
165+
except KeyError:
166+
pass
155167

156-
client._rooms[self._id] = self
168+
client._rooms[self._scope][self._id] = self
157169
self.on_init()
158170
if wait:
159171
self._wait_join = asyncio.Future()
@@ -255,12 +267,16 @@ def _on_join(self, _data: Any) -> asyncio.Task[None] | None:
255267
asyncio.ensure_future(self.on_join(), loop=loop)
256268

257269
def _on_stop(self, func: Callable[[], None]):
258-
try:
259-
assert self._client
260-
if isinstance(self._id, int):
261-
del self._client._rooms[self._id]
262-
except KeyError:
263-
pass
270+
assert self._client
271+
if isinstance(self._id, int) and isinstance(self._scope, str):
272+
rooms = self._client._rooms.get(self._scope)
273+
if rooms is not None:
274+
try:
275+
del rooms[self._id]
276+
except KeyError:
277+
pass
278+
if not rooms:
279+
del self._client._rooms[self._scope]
264280
func()
265281

266282
def _emit_handler(self, data: _TEvent):

thingsdb/util/fcscope.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .cnscope import cnscope
2+
3+
4+
def fcscope(scope: str) -> str:
5+
if scope.startswith("@collection:"):
6+
return scope
7+
8+
cn = cnscope(scope)
9+
return f'@collection:{cn}'

thingsdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.3.2'
1+
__version__ = '1.4.0'

0 commit comments

Comments
 (0)