-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore_affinity_queue.py
More file actions
191 lines (153 loc) · 6.83 KB
/
core_affinity_queue.py
File metadata and controls
191 lines (153 loc) · 6.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# -*- coding: utf-8 -*-
# core_affinity_queue.py
"""
Core-affinity policy and reporting for token-managed routing.
This module defines task-weight categories, per-weight core preference rules,
and lightweight affinity metrics used to describe how work is distributed
across physical cores.
It does not own mailbox routing or execution. Actual token placement is
performed by the pinned worker queue layer.
"""
import threading
from dataclasses import dataclass
from enum import Enum
from typing import List
from .tg_print import tg_print
class TaskWeight(Enum):
"""Routing weight classes used by the affinity policy."""
HEAVY = "heavy" # High difficulty work gets Core 1
MEDIUM = "medium" # Balanced work gets Core 2+
LIGHT = "light" # Simple work gets Core 3+
@dataclass
class CorePreference:
"""Allowed-core set and preferred starting core for one weight class."""
allowed_cores: List[int] # Cores this weight can use
preferred_core: int # First choice
class CoreAffinityPolicy:
"""Builds and exposes per-weight core eligibility rules.
The policy derives allowed-core chains from detected physical core count
and preserves weight isolation rules where possible.
"""
def __init__(self, num_cores: int):
self.num_cores = num_cores
self._build_preferences()
def _build_preferences(self):
"""Construct per-weight core preference chains from available core count."""
# Heavy can use ALL cores, prefers Core 1
heavy_cores = list(range(1, self.num_cores + 1))
# Medium starts at Core 2 (NEVER Core 1)
medium_cores = list(range(2, self.num_cores + 1)) if self.num_cores >= 2 else []
if not medium_cores:
# Fallback for 1-core system (shouldn't happen but handle it)
medium_cores = [1]
# Light starts at Core 3 (NEVER Cores 1-2)
light_cores = list(range(3, self.num_cores + 1)) if self.num_cores >= 3 else []
if not light_cores:
# Fallback: use medium's cores
light_cores = medium_cores
self.preferences = {
TaskWeight.HEAVY: CorePreference(
allowed_cores=heavy_cores,
preferred_core=heavy_cores[0]
),
TaskWeight.MEDIUM: CorePreference(
allowed_cores=medium_cores,
preferred_core=medium_cores[0]
),
TaskWeight.LIGHT: CorePreference(
allowed_cores=light_cores,
preferred_core=light_cores[0]
)
}
tg_print('affinity', f'Policy built for {self.num_cores} cores')
tg_print('affinity', f'Heavy: {heavy_cores} preferred={heavy_cores[0]}')
tg_print('affinity', f'Medium: {medium_cores} preferred={medium_cores[0] if medium_cores else "N/A"}')
tg_print('affinity', f'Light: {light_cores} preferred={light_cores[0] if light_cores else "N/A"}')
def get_preference_chain(self, weight: TaskWeight) -> List[int]:
"""Return allowed cores for the given weight in preference order."""
return self.preferences[weight].allowed_cores
def can_use_core(self, weight: TaskWeight, core_id: int) -> bool:
"""Return whether the given core is eligible for the given weight."""
return core_id in self.preferences[weight].allowed_cores
class CoreAffinityQueue:
"""Policy and reporting layer for weight-based core affinity.
This class classifies tokens, exposes the allowed core chain for each
weight class, and records routing outcomes reported by the execution
queue layer.
It does not place tokens into mailboxes directly.
"""
def __init__(self, topology, workers_per_core: int = 4):
self.topology = topology
self.workers_per_core = workers_per_core
self.num_cores = topology.physical_cores
# Build affinity policy
self.policy = CoreAffinityPolicy(self.num_cores)
# Simple counters (no CoreQueue objects!)
self._affinity_counts = {
core_id: {'heavy': 0, 'medium': 0, 'light': 0}
for core_id in range(1, self.num_cores + 1)
}
# Metrics
self.total_routed = 0
self.routing_failures = 0
self._routing_lock = threading.Lock()
def get_valid_cores_for_weight(self, weight: TaskWeight) -> List[int]:
"""Return the allowed core chain for the given weight."""
return self.policy.get_preference_chain(weight)
def record_task_routed(self, core_id: int, weight: TaskWeight):
"""Record one completed routing decision reported by the queue layer."""
with self._routing_lock:
self.total_routed += 1
self._affinity_counts[core_id][weight.value] += 1
def get_affinity_report(self) -> dict:
"""Return per-core weight distribution percentages and totals."""
report = {}
for core_id, counts in self._affinity_counts.items():
total = sum(counts.values())
if total > 0:
# Calculate percentages
report[f'core_{core_id}'] = {
'heavy': (counts['heavy'] / total) * 100,
'medium': (counts['medium'] / total) * 100,
'light': (counts['light'] / total) * 100,
'total_tasks': total
}
else:
# No tasks yet
report[f'core_{core_id}'] = {
'heavy': 0.0,
'medium': 0.0,
'light': 0.0,
'total_tasks': 0
}
return report
def get_stats(self) -> dict:
"""Return a composite snapshot of affinity configuration and routing totals."""
affinity_report = self.get_affinity_report()
return {
'num_cores': self.num_cores,
'workers_per_core': self.workers_per_core,
'total_routed': self.total_routed,
'routing_failures': self.routing_failures,
'affinity_distribution': affinity_report
}
# TODO: Add this too the dashboard in a live core viewer
def print_affinity_report(self):
"""Print a human-readable per-core affinity distribution report."""
print()
print("=" * 70)
print("CORE AFFINITY REPORT")
print("=" * 70)
# Get report dict
report = self.get_affinity_report()
# Print each core's stats
for core_id in range(1, self.num_cores + 1):
core_key = f'core_{core_id}'
if core_key in report:
stats = report[core_key]
print(f"\nCore {core_id}:")
print(f" Heavy: {stats['heavy']:>5.1f}%")
print(f" Medium: {stats['medium']:>5.1f}%")
print(f" Light: {stats['light']:>5.1f}%")
print(f" Total: {stats['total_tasks']} tasks")
print("=" * 70)