From c9e25dbeff962d843e0f03cc52479ed1a8844d8c Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 13:09:56 +0300 Subject: [PATCH 01/19] Replace `SynchronousAdapter` with `SynchronousPushHandler` --- README.md | 10 +- docs/guide/en/README.md | 2 +- docs/guide/en/adapter-list.md | 10 +- docs/guide/en/adapter-sync.md | 21 ---- docs/guide/en/configuration-manual.md | 33 +---- docs/guide/en/configuration-with-config.md | 5 +- docs/guide/en/queue-names-advanced.md | 2 +- docs/guide/en/queue-names.md | 15 ++- docs/guide/en/synchronous-mode.md | 39 ++++++ .../Push/SynchronousPushHandler.php | 27 ++++ src/Queue.php | 43 ++++++- .../App/InMemoryAdapter.php | 48 +++----- tests/Benchmark/QueueBench.php | 2 +- tests/Integration/MiddlewareTest.php | 11 +- tests/TestCase.php | 35 +----- tests/Unit/Adapter/SynchronousAdapterTest.php | 64 ---------- tests/Unit/QueueTest.php | 116 ++++++++++-------- 17 files changed, 224 insertions(+), 259 deletions(-) delete mode 100644 docs/guide/en/adapter-sync.md create mode 100644 docs/guide/en/synchronous-mode.md create mode 100644 src/Middleware/Push/SynchronousPushHandler.php rename src/Adapter/SynchronousAdapter.php => tests/App/InMemoryAdapter.php (60%) delete mode 100644 tests/Unit/Adapter/SynchronousAdapterTest.php diff --git a/README.md b/README.md index ee7df188..a9cd8c98 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,11 @@ composer require yiisoft/queue For production use, you should install an adapter package that matches your message broker ([AMQP](https://github.com/yiisoft/queue-amqp), [Kafka](https://github.com/g41797/queue-kafka), [NATS](https://github.com/g41797/queue-nats), and [others](docs/guide/en/adapter-list.md)). See the [adapter list](docs/guide/en/adapter-list.md) and follow the adapter-specific documentation for installation and configuration details. -> For development and testing, you can start without an external broker using the built-in [`SynchronousAdapter`](docs/guide/en/adapter-sync.md). -> This adapter processes messages immediately in the same process, so it won't provide true async execution, -> but it's useful for getting started and writing tests. +> If you don't have an external broker — whether for development, testing, or because you want to +> design around `QueueInterface` from day one and add a real broker later — you can run the queue +> in [synchronous mode](docs/guide/en/synchronous-mode.md) (the adapter argument is optional). +> In this mode messages are processed immediately in the same process, so it won't provide true +> async execution, but call sites stay the same when you switch to a real adapter. ### 2. Configure the queue @@ -127,7 +129,7 @@ By default, Yii Framework uses [yiisoft/yii-console](https://github.com/yiisoft/ See [Console commands](docs/guide/en/console-commands.md) for more details. -> In case you're using the `SynchronousAdapter` for development purposes, you should not use these commands, as you have no asynchronous processing available. The messages are processed immediately when pushed. +> In case you're running the queue in synchronous mode (no adapter) for development purposes, you should not use these commands, as you have no asynchronous processing available. The messages are processed immediately when pushed. ## Documentation diff --git a/docs/guide/en/README.md b/docs/guide/en/README.md index 8c8ad5f5..2e091754 100644 --- a/docs/guide/en/README.md +++ b/docs/guide/en/README.md @@ -5,7 +5,7 @@ - [Prerequisites and installation](prerequisites-and-installation.md) - [Configuration with yiisoft/config](configuration-with-config.md) - [Adapter list](adapter-list.md) -- [Synchronous adapter](adapter-sync.md) +- [Synchronous mode](synchronous-mode.md) - [Queue names](queue-names.md) ## Build and handle messages diff --git a/docs/guide/en/adapter-list.md b/docs/guide/en/adapter-list.md index a24b199f..a0306d14 100644 --- a/docs/guide/en/adapter-list.md +++ b/docs/guide/en/adapter-list.md @@ -1,9 +1,11 @@ -Queue Adapters -------------- +# Queue Adapters -* [Synchronous](adapter-sync.md) - adapter for development and test environments -* [AMQP](https://github.com/yiisoft/queue-amqp) - adapter over AMQP protocol via [amqplib](https://github.com/php-amqplib/php-amqplib) +If you don't need (or don't yet have) a real broker, the queue can be used in +[synchronous mode](synchronous-mode.md) — no adapter is required. + +For asynchronous processing pick one of the adapters below. +* [AMQP](https://github.com/yiisoft/queue-amqp) - adapter over AMQP protocol via [amqplib](https://github.com/php-amqplib/php-amqplib) There are other queue adapters contributed and maintained by the community and available on GitHub, such as: * [NATS](https://github.com/g41797/queue-nats) - [NATS](https://nats.io/) JetStream adapter diff --git a/docs/guide/en/adapter-sync.md b/docs/guide/en/adapter-sync.md deleted file mode 100644 index 27bcb267..00000000 --- a/docs/guide/en/adapter-sync.md +++ /dev/null @@ -1,21 +0,0 @@ -Synchronous Adapter -================== - -Run tasks synchronously in the same process. The adapter is intended for use when developing and debugging an application. - -Configuration example: - -```php -$logger = $DIContainer->get(\Psr\Log\LoggerInterface::class); - -$worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class); -$loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class); -$adapter = new Yiisoft\Queue\Adapter\SynchronousAdapter($loop, $worker); - -$queue = new Yiisoft\Queue\Queue( - $adapter, - $worker, - $loop, - $logger -); -``` diff --git a/docs/guide/en/configuration-manual.md b/docs/guide/en/configuration-manual.md index 23c8b689..ab5b902c 100644 --- a/docs/guide/en/configuration-manual.md +++ b/docs/guide/en/configuration-manual.md @@ -16,7 +16,6 @@ To use the queue, you need to create instances of the following classes: use Psr\Container\ContainerInterface; use Psr\Log\NullLogger; use Yiisoft\Injector\Injector; -use Yiisoft\Queue\Adapter\SynchronousAdapter; use Yiisoft\Queue\Cli\SimpleLoop; use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; @@ -70,7 +69,8 @@ $worker = new Worker( // Create loop (SignalLoop requires ext-pcntl; SimpleLoop works without it) $loop = new SimpleLoop(); -// Create queue (adapter is wired in a second step due to mutual dependency) +// Create queue. Without an adapter the queue runs in synchronous mode (messages are processed +// immediately on push). Pass an adapter (e.g., AMQP, Redis) for asynchronous processing. $queue = new Queue( $worker, $loop, @@ -78,12 +78,6 @@ $queue = new Queue( $pushMiddlewareDispatcher, ); -// SynchronousAdapter needs a queue reference — create it after the queue -$adapter = new SynchronousAdapter($worker, $queue); - -// Attach the adapter to the queue (returns a new Queue instance) -$queue = $queue->withAdapter($adapter); - // Now you can push messages $message = new \Yiisoft\Queue\Message\Message('file-download', ['url' => 'https://example.com/file.pdf']); $queue->push($message); @@ -91,28 +85,7 @@ $queue->push($message); ## Using Queue Provider -For multiple queue names, use `AdapterFactoryQueueProvider` (maps queue names to adapter definitions) or `PredefinedQueueProvider` (maps queue names to pre-built queue instances): - -```php -use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; -use Yiisoft\Queue\Adapter\SynchronousAdapter; - -// AdapterFactoryQueueProvider: each queue name maps to an adapter definition. -// The provider wraps each adapter in a Queue with the given name. -$definitions = [ - 'queue1' => new SynchronousAdapter($worker, $queue), - 'queue2' => SynchronousAdapter::class, -]; - -$provider = new AdapterFactoryQueueProvider( - $queue, - $definitions, - $container, -); - -$queueForQueue1 = $provider->get('queue1'); -$queueForQueue2 = $provider->get('queue2'); -``` +For multiple queue names, use `PredefinedQueueProvider` (maps queue names to pre-built queue instances): ```php use Yiisoft\Queue\Provider\PredefinedQueueProvider; diff --git a/docs/guide/en/configuration-with-config.md b/docs/guide/en/configuration-with-config.md index a0e5f583..49ffb64a 100644 --- a/docs/guide/en/configuration-with-config.md +++ b/docs/guide/en/configuration-with-config.md @@ -15,5 +15,6 @@ Advanced applications eventually need the following tweaks: - **Named handlers or callable definitions** — map a short message type to a callable in [`yiisoft/queue.handlers` config](message-handler-advanced.md) when another application is the message producer and you cannot use FQCN as message type. - **Middleware pipelines** — adjust push/consume/failure behavior: collect metrics, modify messages, and so on. See [Middleware pipelines](middleware-pipelines.md) for details. -For development and testing you can start with the synchronous adapter. -For production you have to use a [real backend adapter](adapter-list.md) (AMQP, Kafka, SQS, etc.). If you do not have any preference, it's simpler to start with [yiisoft/queue-amqp](https://github.com/yiisoft/queue-amqp) and [RabbitMQ](https://www.rabbitmq.com/). +If you don't have a broker yet (for development, testing, or as a stepping stone before introducing +async processing), you can run the queue in [synchronous mode](synchronous-mode.md). +For real asynchronous processing pick a [backend adapter](adapter-list.md) (AMQP, Kafka, SQS, etc.). If you do not have any preference, it's simpler to start with [yiisoft/queue-amqp](https://github.com/yiisoft/queue-amqp) and [RabbitMQ](https://www.rabbitmq.com/). diff --git a/docs/guide/en/queue-names-advanced.md b/docs/guide/en/queue-names-advanced.md index df67b1fa..21cdd8cc 100644 --- a/docs/guide/en/queue-names-advanced.md +++ b/docs/guide/en/queue-names-advanced.md @@ -58,7 +58,7 @@ $provider = new QueueFactoryProvider( [ 'emails' => [ 'class' => Queue::class, - '__construct()' => [$worker, $pushDispatcher, $eventDispatcher, $adapter], + '__construct()' => [$worker, $loop, $logger, $pushDispatcher, $adapter], ], ], $container, diff --git a/docs/guide/en/queue-names.md b/docs/guide/en/queue-names.md index 35d0d49f..9e2dfca5 100644 --- a/docs/guide/en/queue-names.md +++ b/docs/guide/en/queue-names.md @@ -30,19 +30,22 @@ If you use only a single queue, you can inject `QueueInterface` directly. #### 1.1 Configure an Adapter -Adapter is what actually sends messages to a queue broker. - -Minimal DI configuration example: +An adapter is what actually delivers messages to a queue broker. Pick one from the +[adapter list](adapter-list.md), install it, and bind it in DI: ```php -use Yiisoft\Queue\Adapter\SynchronousAdapter; use Yiisoft\Queue\Adapter\AdapterInterface; return [ - AdapterInterface::class => SynchronousAdapter::class, + AdapterInterface::class => YourBrokerAdapter::class, ]; ``` -> `SynchronousAdapter` is for learning/testing only. For production, install a real adapter, see adapter list: [adapter-list](adapter-list.md). + +Refer to the chosen adapter's documentation for connection settings and any additional bindings. + +If you don't have a broker yet, you can skip this step — the queue will run in +[synchronous mode](synchronous-mode.md) and process messages immediately on `push()`. You can +plug in a real adapter later without changing any call sites. #### 1.2. Configure a default queue name diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md new file mode 100644 index 00000000..e96ddfa4 --- /dev/null +++ b/docs/guide/en/synchronous-mode.md @@ -0,0 +1,39 @@ +Synchronous Mode +================ + +Run tasks synchronously in the same process. Useful for: + +- developing and debugging an application; +- writing tests; +- production setups where the application is built around `QueueInterface` from day one but + doesn't have an external broker yet — you can switch to a real adapter later without touching + the call sites. + +To enable it, construct the queue without an adapter (the `adapter` argument defaults to `null`): + +```php +$logger = $DIContainer->get(\Psr\Log\LoggerInterface::class); + +$worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class); +$loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class); +$pushMiddlewareDispatcher = $DIContainer->get( + \Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher::class +); + +$queue = new Yiisoft\Queue\Queue( + $worker, + $loop, + $logger, + $pushMiddlewareDispatcher, +); +``` + +In synchronous mode every message passed to `push()` is processed immediately by the worker. +The value returned from `push()` is the message after push-middlewares — without an `IdEnvelope`, +since no adapter is involved to assign an ID. + +Limitations: + +- `run()` does nothing and returns `0`. +- `listen()` throws `BadMethodCallException`. +- `status()` throws `BadMethodCallException` — there is no message storage to track IDs. diff --git a/src/Middleware/Push/SynchronousPushHandler.php b/src/Middleware/Push/SynchronousPushHandler.php new file mode 100644 index 00000000..d625e2b2 --- /dev/null +++ b/src/Middleware/Push/SynchronousPushHandler.php @@ -0,0 +1,27 @@ +worker->process($message, $this->queue); + + return $message; + } +} diff --git a/src/Queue.php b/src/Queue.php index 7eab5c47..32db983c 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,6 +5,7 @@ namespace Yiisoft\Queue; use BackedEnum; +use BadMethodCallException; use Psr\Log\LoggerInterface; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; @@ -13,6 +14,7 @@ use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface; use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\Push\SynchronousPushHandler; use Yiisoft\Queue\Worker\WorkerInterface; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Provider\QueueProviderInterface; @@ -23,21 +25,23 @@ final class Queue implements QueueInterface * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] */ private array $middlewareDefinitions; - private AdapterPushHandler $adapterPushHandler; + private MessageHandlerPushInterface $finalPushHandler; private string $name; public function __construct( - private readonly AdapterInterface $adapter, private readonly WorkerInterface $worker, private readonly LoopInterface $loop, private readonly LoggerInterface $logger, private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher, + private readonly ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, ) { $this->name = StringNormalizer::normalize($name); $this->middlewareDefinitions = $middlewareDefinitions; - $this->adapterPushHandler = new AdapterPushHandler($this->adapter); + $this->finalPushHandler = $adapter === null + ? new SynchronousPushHandler($worker, $this) + : new AdapterPushHandler($adapter); } public function getName(): string @@ -59,6 +63,14 @@ public function push( $this->createPushHandler(...$middlewareDefinitions), ); + if ($this->adapter === null) { + $this->logger->info( + 'Processed message with message type "{messageType}" synchronously.', + ['messageType' => $message->getType()], + ); + return $message; + } + /** @var string $messageId */ $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; $this->logger->info( @@ -71,6 +83,13 @@ public function push( public function run(int $max = 0): int { + if ($this->adapter === null) { + $this->logger->debug( + 'Queue is in synchronous mode (no adapter). Messages are processed on push. run() does nothing.', + ); + return 0; + } + $this->logger->debug('Start processing queue messages.'); $count = 0; @@ -95,6 +114,12 @@ public function run(int $max = 0): int public function listen(): void { + if ($this->adapter === null) { + throw new BadMethodCallException( + 'Cannot listen without an adapter. Queue is in synchronous mode.', + ); + } + $this->logger->info('Start listening to the queue.'); $this->adapter->subscribe(fn(MessageInterface $message) => $this->handle($message)); $this->logger->info('Finish listening to the queue.'); @@ -102,6 +127,12 @@ public function listen(): void public function status(string|int $id): MessageStatus { + if ($this->adapter === null) { + throw new BadMethodCallException( + 'Cannot get message status without an adapter. Queue is in synchronous mode.', + ); + } + return $this->adapter->status($id); } @@ -131,12 +162,12 @@ private function handle(MessageInterface $message): bool private function createPushHandler(MiddlewarePushInterface|callable|array|string ...$middlewares): MessageHandlerPushInterface { return new class ( - $this->adapterPushHandler, + $this->finalPushHandler, $this->pushMiddlewareDispatcher, array_merge($this->middlewareDefinitions, $middlewares), ) implements MessageHandlerPushInterface { public function __construct( - private readonly AdapterPushHandler $adapterPushHandler, + private readonly MessageHandlerPushInterface $finishHandler, private readonly PushMiddlewareDispatcher $dispatcher, /** * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] @@ -148,7 +179,7 @@ public function handlePush(MessageInterface $message): MessageInterface { return $this->dispatcher ->withMiddlewares($this->middlewares) - ->dispatch($message, $this->adapterPushHandler); + ->dispatch($message, $this->finishHandler); } }; } diff --git a/src/Adapter/SynchronousAdapter.php b/tests/App/InMemoryAdapter.php similarity index 60% rename from src/Adapter/SynchronousAdapter.php rename to tests/App/InMemoryAdapter.php index 21193b04..90dcd88a 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/tests/App/InMemoryAdapter.php @@ -2,34 +2,28 @@ declare(strict_types=1); -namespace Yiisoft\Queue\Adapter; +namespace Yiisoft\Queue\Tests\App; use InvalidArgumentException; -use Yiisoft\Queue\MessageStatus; -use Yiisoft\Queue\Message\MessageInterface; -use Yiisoft\Queue\QueueInterface; -use Yiisoft\Queue\Worker\WorkerInterface; +use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Message\IdEnvelope; +use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\MessageStatus; use function count; -final class SynchronousAdapter implements AdapterInterface +final class InMemoryAdapter implements AdapterInterface { + /** @var array */ private array $messages = []; private int $current = 0; - public function __construct( - private readonly WorkerInterface $worker, - private readonly QueueInterface $queue, - ) {} - - public function __destruct() + public function push(MessageInterface $message): MessageInterface { - $this->runExisting(function (MessageInterface $message): bool { - $this->worker->process($message, $this->queue); + $id = count($this->messages) + $this->current; + $this->messages[] = $message; - return true; - }); + return new IdEnvelope($message, $id); } public function runExisting(callable $handlerCallback): void @@ -42,12 +36,17 @@ public function runExisting(callable $handlerCallback): void } } - public function status(string|int $id): MessageStatus + public function subscribe(callable $handlerCallback): void + { + $this->runExisting($handlerCallback); + } + + public function status(int|string $id): MessageStatus { $id = (int) $id; if ($id < 0) { - throw new InvalidArgumentException('This adapter IDs start with 0.'); + throw new InvalidArgumentException('IDs start with 0.'); } if ($id < $this->current) { @@ -60,17 +59,4 @@ public function status(string|int $id): MessageStatus throw new InvalidArgumentException('There is no message with the given ID.'); } - - public function push(MessageInterface $message): MessageInterface - { - $key = count($this->messages) + $this->current; - $this->messages[] = $message; - - return new IdEnvelope($message, $key); - } - - public function subscribe(callable $handlerCallback): void - { - $this->runExisting($handlerCallback); - } } diff --git a/tests/Benchmark/QueueBench.php b/tests/Benchmark/QueueBench.php index 1bfd072f..0f2e73ab 100644 --- a/tests/Benchmark/QueueBench.php +++ b/tests/Benchmark/QueueBench.php @@ -57,11 +57,11 @@ public function __construct() $this->adapter = new VoidAdapter($this->serializer); $this->queue = new Queue( - $this->adapter, $worker, new SimpleLoop(0), $logger, new PushMiddlewareDispatcher(new MiddlewareFactoryPush($container, $callableFactory)), + $this->adapter, ); } diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 5da141f9..70d2f736 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -11,7 +11,6 @@ use Yiisoft\Injector\Injector; use Yiisoft\Test\Support\Container\SimpleContainer; use Yiisoft\Test\Support\Log\SimpleLogger; -use Yiisoft\Queue\Adapter\SynchronousAdapter; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageInterface; @@ -58,16 +57,14 @@ public function testFullStackPush(): void new TestMiddleware('common 1'), new TestMiddleware('common 2'), ); + $worker = $this->createMock(WorkerInterface::class); + $worker->method('process')->willReturnArgument(0); $queue = new Queue( - new SynchronousAdapter( - $this->createMock(WorkerInterface::class), - $this->createMock(QueueInterface::class), - ), - $this->createMock(WorkerInterface::class), + $worker, $this->createMock(LoopInterface::class), $this->createMock(LoggerInterface::class), $pushMiddlewareDispatcher, - 'test', + name: 'test', ); $queue = $queue ->withMiddlewares(new TestMiddleware('Won\'t be executed')) diff --git a/tests/TestCase.php b/tests/TestCase.php index 0f327e29..a37f8888 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -5,17 +5,14 @@ namespace Yiisoft\Queue\Tests; use BackedEnum; -use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase as BaseTestCase; use Psr\Container\ContainerInterface; use Psr\Log\NullLogger; use RuntimeException; use Yiisoft\Injector\Injector; use Yiisoft\Queue\Provider\QueueProviderInterface; -use Yiisoft\Queue\Stubs\StubAdapter; use Yiisoft\Test\Support\Container\SimpleContainer; use Yiisoft\Queue\Adapter\AdapterInterface; -use Yiisoft\Queue\Adapter\SynchronousAdapter; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Cli\SimpleLoop; use Yiisoft\Queue\Middleware\CallableFactory; @@ -36,7 +33,6 @@ abstract class TestCase extends BaseTestCase { protected ?ContainerInterface $container = null; protected ?Queue $queue = null; - protected ?AdapterInterface $adapter = null; protected ?LoopInterface $loop = null; protected ?WorkerInterface $worker = null; protected array $eventHandlers = []; @@ -48,7 +44,6 @@ protected function setUp(): void $this->container = null; $this->queue = null; - $this->adapter = null; $this->loop = null; $this->worker = null; $this->eventHandlers = []; @@ -67,18 +62,6 @@ protected function getQueue(): Queue return $this->queue; } - /** - * @return AdapterInterface|MockObject - */ - protected function getAdapter(): AdapterInterface - { - if ($this->adapter === null) { - $this->adapter = $this->createAdapter($this->needsRealAdapter()); - } - - return $this->adapter; - } - protected function getLoop(): LoopInterface { if ($this->loop === null) { @@ -107,28 +90,19 @@ protected function getContainer(): ContainerInterface } protected function createQueue( - AdapterInterface $adapter = new StubAdapter(), + ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, ): Queue { return new Queue( - $adapter, $this->getWorker(), $this->getLoop(), new NullLogger(), $this->getPushMiddlewareDispatcher(), + $adapter, $name, ); } - protected function createAdapter(bool $realAdapter = false): AdapterInterface - { - if ($realAdapter) { - return new SynchronousAdapter($this->getWorker(), $this->createQueue()); - } - - return $this->createMock(AdapterInterface::class); - } - protected function createLoop(): LoopInterface { return new SimpleLoop(); @@ -184,11 +158,6 @@ protected function getMessageHandlers(): array ]; } - protected function needsRealAdapter(): bool - { - return false; - } - protected function getPushMiddlewareDispatcher(): PushMiddlewareDispatcher { return new PushMiddlewareDispatcher( diff --git a/tests/Unit/Adapter/SynchronousAdapterTest.php b/tests/Unit/Adapter/SynchronousAdapterTest.php deleted file mode 100644 index 467f5106..00000000 --- a/tests/Unit/Adapter/SynchronousAdapterTest.php +++ /dev/null @@ -1,64 +0,0 @@ -push($message); - - self::assertArrayHasKey(IdEnvelope::MESSAGE_ID_KEY, $envelope->getMetadata()); - $id = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; - - $wrongId = "$id "; - self::assertSame(MessageStatus::WAITING, $adapter->status($wrongId)); - } - - public function testIdSetting(): void - { - $message = new Message('simple', []); - $adapter = new SynchronousAdapter(new StubWorker(), new StubQueue()); - - $ids = []; - $envelope = $adapter->push($message); - $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; - $envelope = $adapter->push($message); - $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; - $envelope = $adapter->push($message); - $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; - - self::assertCount(3, array_unique($ids)); - } - - public function testStatusIdLessZero(): void - { - $adapter = new SynchronousAdapter(new StubWorker(), new StubQueue()); - - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('This adapter IDs start with 0.'); - $adapter->status('-1'); - } - - public function testStatusNotMessage(): void - { - $adapter = new SynchronousAdapter(new StubWorker(), new StubQueue()); - - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('There is no message with the given ID.'); - $adapter->status('1'); - } -} diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 77c8e13f..7566df9b 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -4,12 +4,14 @@ namespace Yiisoft\Queue\Tests\Unit; +use BadMethodCallException; use Yiisoft\Queue\Cli\SignalLoop; -use Yiisoft\Queue\MessageStatus; +use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\Message; +use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Tests\App\FakeAdapter; +use Yiisoft\Queue\Tests\App\InMemoryAdapter; use Yiisoft\Queue\Tests\TestCase; -use Yiisoft\Queue\Message\IdEnvelope; use function extension_loaded; @@ -22,79 +24,103 @@ enum TestQueue: string final class QueueTest extends TestCase { - private bool $needsRealAdapter = true; + public function testPushSuccessful(): void + { + $adapter = new FakeAdapter(); + $queue = $this->createQueue($adapter); + $message = new Message('simple', null); + $queue->push($message); - protected function setUp(): void + self::assertSame([$message], $adapter->pushMessages); + } + + public function testPushSynchronouslyProcessesMessage(): void { - parent::setUp(); + $queue = $this->createQueue(); + $message = new Message('simple', null); - $this->needsRealAdapter = true; + $queue->push($message); + $queue->push(clone $message); + + self::assertSame(2, $this->executionTimes); } - public function testPushSuccessful(): void + public function testRunWithoutAdapterReturnsZero(): void { - $adapter = new FakeAdapter(); - $queue = $this->createQueue($adapter); + $queue = $this->createQueue(); $message = new Message('simple', null); $queue->push($message); + $queue->push(clone $message); - self::assertSame([$message], $adapter->pushMessages); + self::assertSame(0, $queue->run()); + self::assertSame(2, $this->executionTimes); + } + + public function testListenThrowsWithoutAdapter(): void + { + $queue = $this->createQueue(); + + $this->expectException(BadMethodCallException::class); + $this->expectExceptionMessage('Cannot listen without an adapter. Queue is in synchronous mode.'); + $queue->listen(); + } + + public function testStatusThrowsWithoutAdapter(): void + { + $queue = $this->createQueue(); + + $this->expectException(BadMethodCallException::class); + $this->expectExceptionMessage('Cannot get message status without an adapter. Queue is in synchronous mode.'); + $queue->status('1'); } - public function testRun(): void + public function testRunWithAdapter(): void { - $queue = $this->createQueue($this->getAdapter()); + $queue = $this->createQueue(new InMemoryAdapter()); $message = new Message('simple', null); - $message2 = clone $message; $queue->push($message); - $queue->push($message2); - $queue->run(); + $queue->push(clone $message); - self::assertEquals(2, $this->executionTimes); + self::assertSame(2, $queue->run()); + self::assertSame(2, $this->executionTimes); } - public function testRunPartly(): void + public function testRunPartlyWithAdapter(): void { + $queue = $this->createQueue(new InMemoryAdapter()); $message = new Message('simple', null); - $queue = $this->createQueue($this->getAdapter()); - $message2 = clone $message; $queue->push($message); - $queue->push($message2); - $queue->run(1); + $queue->push(clone $message); - self::assertEquals(1, $this->executionTimes); + self::assertSame(1, $queue->run(1)); + self::assertSame(1, $this->executionTimes); } - public function testListen(): void + public function testListenWithAdapter(): void { - $queue = $this->createQueue($this->getAdapter()); + $queue = $this->createQueue(new InMemoryAdapter()); $message = new Message('simple', null); - $message2 = clone $message; $queue->push($message); - $queue->push($message2); + $queue->push(clone $message); + $queue->listen(); - self::assertEquals(2, $this->executionTimes); + self::assertSame(2, $this->executionTimes); } - public function testStatus(): void + public function testStatusWithAdapter(): void { - $queue = $this->createQueue($this->getAdapter()); - $message = new Message('simple', null); - $envelope = $queue->push($message); + $queue = $this->createQueue(new InMemoryAdapter()); + $envelope = $queue->push(new Message('simple', null)); self::assertArrayHasKey(IdEnvelope::MESSAGE_ID_KEY, $envelope->getMetadata()); - /** - * @var int|string $id - */ + /** @var int|string $id */ $id = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; - $status = $queue->status($id); - self::assertSame(MessageStatus::WAITING, $status); + self::assertSame(MessageStatus::WAITING, $queue->status($id)); $queue->run(); - $status = $queue->status($id); - self::assertSame(MessageStatus::DONE, $status); + self::assertSame(MessageStatus::DONE, $queue->status($id)); } public function testRunWithSignalLoop(): void @@ -104,14 +130,13 @@ public function testRunWithSignalLoop(): void } $this->loop = new SignalLoop(); - $queue = $this->createQueue($this->getAdapter()); + $queue = $this->createQueue(); $message = new Message('simple', null); - $message2 = clone $message; $queue->push($message); - $queue->push($message2); - $queue->run(); + $queue->push(clone $message); - self::assertEquals(2, $this->executionTimes); + self::assertSame(0, $queue->run()); + self::assertSame(2, $this->executionTimes); } public function testGetName(): void @@ -127,9 +152,4 @@ public function testGetNameWithBackedEnum(): void $this->assertSame('high-priority', $queue->getName()); } - - protected function needsRealAdapter(): bool - { - return $this->needsRealAdapter; - } } From ee6e8c35d31df171d31287442f0012c0bc5d8559 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 14:37:39 +0300 Subject: [PATCH 02/19] Update README.md Co-authored-by: Alexander Makarov --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a9cd8c98..c9eb6880 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ See the [adapter list](docs/guide/en/adapter-list.md) and follow the adapter-spe > design around `QueueInterface` from day one and add a real broker later — you can run the queue > in [synchronous mode](docs/guide/en/synchronous-mode.md) (the adapter argument is optional). > In this mode messages are processed immediately in the same process, so it won't provide true -> async execution, but call sites stay the same when you switch to a real adapter. +> async execution, but the code stays the same when you switch to a real adapter. ### 2. Configure the queue From 0a1022b04f4b316c458a57dad448d91d1af10b1b Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 17:33:19 +0300 Subject: [PATCH 03/19] Don't throw exception on `listen()` in synchronous mode --- README.md | 2 +- docs/guide/en/console-commands.md | 2 ++ docs/guide/en/synchronous-mode.md | 2 +- src/Queue.php | 5 ++--- tests/Unit/QueueTest.php | 6 +++--- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index a9cd8c98..91b9926c 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ By default, Yii Framework uses [yiisoft/yii-console](https://github.com/yiisoft/ See [Console commands](docs/guide/en/console-commands.md) for more details. -> In case you're running the queue in synchronous mode (no adapter) for development purposes, you should not use these commands, as you have no asynchronous processing available. The messages are processed immediately when pushed. +> In case you're running the queue in synchronous mode (no adapter), `queue:listen` logs an info message and exits. The messages are processed immediately when pushed. ## Documentation diff --git a/docs/guide/en/console-commands.md b/docs/guide/en/console-commands.md index 36f90310..0157e7f5 100644 --- a/docs/guide/en/console-commands.md +++ b/docs/guide/en/console-commands.md @@ -33,6 +33,8 @@ The following command launches a daemon, which infinitely consumes messages from yii queue:listen [queueName] ``` +> **Note:** If the queue is not configured with an adapter (synchronous mode), the command logs an info message and exits gracefully. + ## 3. Listen to multiple queues The following command iterates through multiple queues and is meant to be used in development environment only, as it consumes a lot of CPU for iterating through queues. You can pass to it: diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md index e96ddfa4..d190223f 100644 --- a/docs/guide/en/synchronous-mode.md +++ b/docs/guide/en/synchronous-mode.md @@ -35,5 +35,5 @@ since no adapter is involved to assign an ID. Limitations: - `run()` does nothing and returns `0`. -- `listen()` throws `BadMethodCallException`. +- `listen()` logs an info message and returns without listening. - `status()` throws `BadMethodCallException` — there is no message storage to track IDs. diff --git a/src/Queue.php b/src/Queue.php index 32db983c..c35ced1b 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -115,9 +115,8 @@ public function run(int $max = 0): int public function listen(): void { if ($this->adapter === null) { - throw new BadMethodCallException( - 'Cannot listen without an adapter. Queue is in synchronous mode.', - ); + $this->logger->info('Cannot listen without an adapter. Queue is in synchronous mode.'); + return; } $this->logger->info('Start listening to the queue.'); diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 7566df9b..abd1f58b 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -56,13 +56,13 @@ public function testRunWithoutAdapterReturnsZero(): void self::assertSame(2, $this->executionTimes); } - public function testListenThrowsWithoutAdapter(): void + public function testListenWithoutAdapter(): void { $queue = $this->createQueue(); - $this->expectException(BadMethodCallException::class); - $this->expectExceptionMessage('Cannot listen without an adapter. Queue is in synchronous mode.'); $queue->listen(); + + $this->expectNotToPerformAssertions(); } public function testStatusThrowsWithoutAdapter(): void From c89194da8d1a9bd8ffda475f4f63affdbd5d49a4 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 17:52:25 +0300 Subject: [PATCH 04/19] Introduce MessageStatus::NOT_FOUND --- docs/guide/en/message-status.md | 13 ++++++++----- docs/guide/en/synchronous-mode.md | 2 +- src/Adapter/AdapterInterface.php | 6 ++---- src/MessageStatus.php | 2 ++ src/Queue.php | 5 +---- src/QueueInterface.php | 4 +--- tests/App/InMemoryAdapter.php | 5 ++--- tests/Unit/QueueTest.php | 7 ++----- 8 files changed, 19 insertions(+), 25 deletions(-) diff --git a/docs/guide/en/message-status.md b/docs/guide/en/message-status.md index bd89b815..89ac619a 100644 --- a/docs/guide/en/message-status.md +++ b/docs/guide/en/message-status.md @@ -7,7 +7,7 @@ The API surface is: - `QueueInterface::status(string|int $id): MessageStatus` - `AdapterInterface::status(string|int $id): MessageStatus` -Status tracking support depends on the adapter. If an adapter doesn't keep status history, calling `status()` with that ID will throw `InvalidArgumentException`. +Status tracking support depends on the adapter. If an adapter doesn't support status tracking or can't find the message by ID, it returns `MessageStatus::NOT_FOUND`. ## Getting a message ID @@ -30,6 +30,9 @@ The ID type (`string` or `int`) and how long it stays queryable are adapter-spec Statuses are represented by the `Yiisoft\Queue\MessageStatus` enum: +- `MessageStatus::NOT_FOUND` + The message is not known to the queue, or the adapter doesn't support status tracking. + - `MessageStatus::WAITING` The message is in the queue and has not been picked up yet. @@ -42,7 +45,7 @@ Statuses are represented by the `Yiisoft\Queue\MessageStatus` enum: In addition to enum cases, `MessageStatus` provides a string key via `MessageStatus::key()`: ```php -$statusKey = $status->key(); // "waiting", "reserved" or "done" +$statusKey = $status->key(); // "not-found", "waiting", "reserved" or "done" ``` ## Querying a status @@ -73,10 +76,10 @@ if ($status === MessageStatus::DONE) { } ``` -## Errors and edge cases +## Edge cases -- **Unknown ID** - If an adapter can't find the message by ID, it must throw `InvalidArgumentException`. +- **Unknown ID or unsupported tracking** + If an adapter doesn't support status tracking or can't find the message by ID, it returns `MessageStatus::NOT_FOUND`. - **Timing** `RESERVED` can be short-lived and difficult to observe: depending on the adapter, a message may move from `WAITING` to `RESERVED` and then to `DONE` quickly. diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md index d190223f..a8692e1a 100644 --- a/docs/guide/en/synchronous-mode.md +++ b/docs/guide/en/synchronous-mode.md @@ -36,4 +36,4 @@ Limitations: - `run()` does nothing and returns `0`. - `listen()` logs an info message and returns without listening. -- `status()` throws `BadMethodCallException` — there is no message storage to track IDs. +- `status()` always returns `MessageStatus::NOT_FOUND` — there is no message storage to track IDs. diff --git a/src/Adapter/AdapterInterface.php b/src/Adapter/AdapterInterface.php index 2d98df99..69491cb1 100644 --- a/src/Adapter/AdapterInterface.php +++ b/src/Adapter/AdapterInterface.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue\Adapter; -use InvalidArgumentException; use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Message\MessageInterface; @@ -18,11 +17,10 @@ interface AdapterInterface public function runExisting(callable $handlerCallback): void; /** - * Returns status code of a message with the given id. + * Returns status code of a message with the given ID. + * Returns {@see MessageStatus::NOT_FOUND} when status tracking is not supported or there is no such id. * * @param int|string $id ID of a message. - * - * @throws InvalidArgumentException When there is no such id in the adapter. */ public function status(string|int $id): MessageStatus; diff --git a/src/MessageStatus.php b/src/MessageStatus.php index 7420ee1d..a8cf61e1 100644 --- a/src/MessageStatus.php +++ b/src/MessageStatus.php @@ -6,6 +6,7 @@ enum MessageStatus: int { + case NOT_FOUND = 0; case WAITING = 1; case RESERVED = 2; case DONE = 3; @@ -13,6 +14,7 @@ enum MessageStatus: int public function key(): string { return match ($this) { + self::NOT_FOUND => 'not-found', self::WAITING => 'waiting', self::RESERVED => 'reserved', self::DONE => 'done', diff --git a/src/Queue.php b/src/Queue.php index c35ced1b..c1a7488c 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,7 +5,6 @@ namespace Yiisoft\Queue; use BackedEnum; -use BadMethodCallException; use Psr\Log\LoggerInterface; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; @@ -127,9 +126,7 @@ public function listen(): void public function status(string|int $id): MessageStatus { if ($this->adapter === null) { - throw new BadMethodCallException( - 'Cannot get message status without an adapter. Queue is in synchronous mode.', - ); + return MessageStatus::NOT_FOUND; } return $this->adapter->status($id); diff --git a/src/QueueInterface.php b/src/QueueInterface.php index 9e9e7530..80de37c6 100644 --- a/src/QueueInterface.php +++ b/src/QueueInterface.php @@ -31,9 +31,7 @@ public function run(int $max = 0): int; public function listen(): void; /** - * @param int|string $id A message id - * - * @throws InvalidArgumentException when there is no such id in the adapter + * @param int|string $id A message ID. * * @return MessageStatus */ diff --git a/tests/App/InMemoryAdapter.php b/tests/App/InMemoryAdapter.php index 90dcd88a..9eaceb9c 100644 --- a/tests/App/InMemoryAdapter.php +++ b/tests/App/InMemoryAdapter.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue\Tests\App; -use InvalidArgumentException; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\MessageInterface; @@ -46,7 +45,7 @@ public function status(int|string $id): MessageStatus $id = (int) $id; if ($id < 0) { - throw new InvalidArgumentException('IDs start with 0.'); + return MessageStatus::NOT_FOUND; } if ($id < $this->current) { @@ -57,6 +56,6 @@ public function status(int|string $id): MessageStatus return MessageStatus::WAITING; } - throw new InvalidArgumentException('There is no message with the given ID.'); + return MessageStatus::NOT_FOUND; } } diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index abd1f58b..cbb724d6 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue\Tests\Unit; -use BadMethodCallException; use Yiisoft\Queue\Cli\SignalLoop; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\Message; @@ -65,13 +64,11 @@ public function testListenWithoutAdapter(): void $this->expectNotToPerformAssertions(); } - public function testStatusThrowsWithoutAdapter(): void + public function testStatusReturnsNotFoundWithoutAdapter(): void { $queue = $this->createQueue(); - $this->expectException(BadMethodCallException::class); - $this->expectExceptionMessage('Cannot get message status without an adapter. Queue is in synchronous mode.'); - $queue->status('1'); + self::assertSame(MessageStatus::NOT_FOUND, $queue->status('1')); } public function testRunWithAdapter(): void From b1ebd7b4b7657f202ed39b798fea0cdc87a64e3e Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 17:54:38 +0300 Subject: [PATCH 05/19] Refactor --- src/Queue.php | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index c1a7488c..037e2eff 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -38,7 +38,7 @@ public function __construct( ) { $this->name = StringNormalizer::normalize($name); $this->middlewareDefinitions = $middlewareDefinitions; - $this->finalPushHandler = $adapter === null + $this->finalPushHandler = $this->isSynchronous() ? new SynchronousPushHandler($worker, $this) : new AdapterPushHandler($adapter); } @@ -62,7 +62,7 @@ public function push( $this->createPushHandler(...$middlewareDefinitions), ); - if ($this->adapter === null) { + if ($this->isSynchronous()) { $this->logger->info( 'Processed message with message type "{messageType}" synchronously.', ['messageType' => $message->getType()], @@ -82,7 +82,7 @@ public function push( public function run(int $max = 0): int { - if ($this->adapter === null) { + if ($this->isSynchronous()) { $this->logger->debug( 'Queue is in synchronous mode (no adapter). Messages are processed on push. run() does nothing.', ); @@ -113,7 +113,7 @@ public function run(int $max = 0): int public function listen(): void { - if ($this->adapter === null) { + if ($this->isSynchronous()) { $this->logger->info('Cannot listen without an adapter. Queue is in synchronous mode.'); return; } @@ -125,7 +125,7 @@ public function listen(): void public function status(string|int $id): MessageStatus { - if ($this->adapter === null) { + if ($this->isSynchronous()) { return MessageStatus::NOT_FOUND; } @@ -179,4 +179,9 @@ public function handlePush(MessageInterface $message): MessageInterface } }; } + + private function isSynchronous(): bool + { + return $this->adapter === null; + } } From 0e17a9cade7c830781170d2b8eaff02fac6fb248 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:00:13 +0300 Subject: [PATCH 06/19] fix rector-cs.yml --- .github/workflows/rector-cs.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/rector-cs.yml b/.github/workflows/rector-cs.yml index 03b7c75c..6307f295 100644 --- a/.github/workflows/rector-cs.yml +++ b/.github/workflows/rector-cs.yml @@ -1,7 +1,7 @@ name: Rector + PHP CS Fixer on: - pull_request_target: + pull_request: paths: - 'config/**' - 'src/**' @@ -21,8 +21,5 @@ concurrency: jobs: rector: uses: yiisoft/actions/.github/workflows/rector-cs.yml@master - secrets: - token: ${{ secrets.YIISOFT_GITHUB_TOKEN }} with: - repository: ${{ github.event.pull_request.head.repo.full_name }} php: '8.1' From 884ea35cf72382fa843a545d91dda6bf5d285531 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:03:55 +0300 Subject: [PATCH 07/19] fix psalm --- src/Queue.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Queue.php b/src/Queue.php index 037e2eff..3fcb2f59 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -40,7 +40,7 @@ public function __construct( $this->middlewareDefinitions = $middlewareDefinitions; $this->finalPushHandler = $this->isSynchronous() ? new SynchronousPushHandler($worker, $this) - : new AdapterPushHandler($adapter); + : new AdapterPushHandler($this->adapter); } public function getName(): string @@ -180,6 +180,9 @@ public function handlePush(MessageInterface $message): MessageInterface }; } + /** + * @psalm-assert-if-false !null $this->adapter + */ private function isSynchronous(): bool { return $this->adapter === null; From 0eeb85f55cda308d42b6da5b2cc0ba4459b0a0f7 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:05:38 +0300 Subject: [PATCH 08/19] fix --- .github/workflows/rector-cs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/rector-cs.yml b/.github/workflows/rector-cs.yml index 6307f295..82e23f14 100644 --- a/.github/workflows/rector-cs.yml +++ b/.github/workflows/rector-cs.yml @@ -22,4 +22,5 @@ jobs: rector: uses: yiisoft/actions/.github/workflows/rector-cs.yml@master with: + repository: ${{ github.event.pull_request.head.repo.full_name }} php: '8.1' From 1cbb04ec5d2ae7c2cd9be6a1bf99686640e22dbc Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:12:21 +0300 Subject: [PATCH 09/19] fix --- .github/workflows/rector-cs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rector-cs.yml b/.github/workflows/rector-cs.yml index 82e23f14..8d6fdb5c 100644 --- a/.github/workflows/rector-cs.yml +++ b/.github/workflows/rector-cs.yml @@ -20,7 +20,7 @@ concurrency: jobs: rector: - uses: yiisoft/actions/.github/workflows/rector-cs.yml@master + uses: yiisoft/actions/.github/workflows/rector-cs.yml@dev-fix-rector-cs with: repository: ${{ github.event.pull_request.head.repo.full_name }} php: '8.1' From 62521e74a69140974c95bf8ab77718b5c191eb56 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:13:10 +0300 Subject: [PATCH 10/19] fix --- .github/workflows/rector-cs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rector-cs.yml b/.github/workflows/rector-cs.yml index 8d6fdb5c..9c6e18d1 100644 --- a/.github/workflows/rector-cs.yml +++ b/.github/workflows/rector-cs.yml @@ -20,7 +20,7 @@ concurrency: jobs: rector: - uses: yiisoft/actions/.github/workflows/rector-cs.yml@dev-fix-rector-cs + uses: yiisoft/actions/.github/workflows/rector-cs.yml@fix-rector-cs with: repository: ${{ github.event.pull_request.head.repo.full_name }} php: '8.1' From 7f8c23ee0da9a225ba42d839830751f2effa649e Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:15:49 +0300 Subject: [PATCH 11/19] test --- .github/workflows/{bechmark.yml => bechmark.yml_} | 0 .github/workflows/{build.yml => build.yml_} | 0 ...composer-require-checker.yml => composer-require-checker.yml_} | 0 .github/workflows/{mutation.yml => mutation.yml_} | 0 .github/workflows/{static.yml => static.yml_} | 0 5 files changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{bechmark.yml => bechmark.yml_} (100%) rename .github/workflows/{build.yml => build.yml_} (100%) rename .github/workflows/{composer-require-checker.yml => composer-require-checker.yml_} (100%) rename .github/workflows/{mutation.yml => mutation.yml_} (100%) rename .github/workflows/{static.yml => static.yml_} (100%) diff --git a/.github/workflows/bechmark.yml b/.github/workflows/bechmark.yml_ similarity index 100% rename from .github/workflows/bechmark.yml rename to .github/workflows/bechmark.yml_ diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml_ similarity index 100% rename from .github/workflows/build.yml rename to .github/workflows/build.yml_ diff --git a/.github/workflows/composer-require-checker.yml b/.github/workflows/composer-require-checker.yml_ similarity index 100% rename from .github/workflows/composer-require-checker.yml rename to .github/workflows/composer-require-checker.yml_ diff --git a/.github/workflows/mutation.yml b/.github/workflows/mutation.yml_ similarity index 100% rename from .github/workflows/mutation.yml rename to .github/workflows/mutation.yml_ diff --git a/.github/workflows/static.yml b/.github/workflows/static.yml_ similarity index 100% rename from .github/workflows/static.yml rename to .github/workflows/static.yml_ From b55ab088d0841672c7195e9b156dc9a8ea320415 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:19:28 +0300 Subject: [PATCH 12/19] test --- .github/workflows/rector-cs.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rector-cs.yml b/.github/workflows/rector-cs.yml index 9c6e18d1..42122476 100644 --- a/.github/workflows/rector-cs.yml +++ b/.github/workflows/rector-cs.yml @@ -12,7 +12,7 @@ on: - '.php-cs-fixer.dist.php' permissions: - contents: read + contents: write concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -20,7 +20,7 @@ concurrency: jobs: rector: - uses: yiisoft/actions/.github/workflows/rector-cs.yml@fix-rector-cs + uses: yiisoft/actions/.github/workflows/rector-cs.yml@master with: repository: ${{ github.event.pull_request.head.repo.full_name }} php: '8.1' From 0f4ac4123c163d87914465ffb3addb289751a613 Mon Sep 17 00:00:00 2001 From: vjik <525501+vjik@users.noreply.github.com> Date: Sun, 26 Apr 2026 15:20:42 +0000 Subject: [PATCH 13/19] Apply PHP CS Fixer and Rector changes (CI) --- src/QueueInterface.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/QueueInterface.php b/src/QueueInterface.php index 80de37c6..296cd7e6 100644 --- a/src/QueueInterface.php +++ b/src/QueueInterface.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue; -use InvalidArgumentException; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; From 02bf1963994ba7643fec231de7b0993c3ea6c30e Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sun, 26 Apr 2026 18:21:47 +0300 Subject: [PATCH 14/19] revert --- .github/workflows/{bechmark.yml_ => bechmark.yml} | 0 .github/workflows/{build.yml_ => build.yml} | 0 ...composer-require-checker.yml_ => composer-require-checker.yml} | 0 .github/workflows/{mutation.yml_ => mutation.yml} | 0 .github/workflows/{static.yml_ => static.yml} | 0 5 files changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{bechmark.yml_ => bechmark.yml} (100%) rename .github/workflows/{build.yml_ => build.yml} (100%) rename .github/workflows/{composer-require-checker.yml_ => composer-require-checker.yml} (100%) rename .github/workflows/{mutation.yml_ => mutation.yml} (100%) rename .github/workflows/{static.yml_ => static.yml} (100%) diff --git a/.github/workflows/bechmark.yml_ b/.github/workflows/bechmark.yml similarity index 100% rename from .github/workflows/bechmark.yml_ rename to .github/workflows/bechmark.yml diff --git a/.github/workflows/build.yml_ b/.github/workflows/build.yml similarity index 100% rename from .github/workflows/build.yml_ rename to .github/workflows/build.yml diff --git a/.github/workflows/composer-require-checker.yml_ b/.github/workflows/composer-require-checker.yml similarity index 100% rename from .github/workflows/composer-require-checker.yml_ rename to .github/workflows/composer-require-checker.yml diff --git a/.github/workflows/mutation.yml_ b/.github/workflows/mutation.yml similarity index 100% rename from .github/workflows/mutation.yml_ rename to .github/workflows/mutation.yml diff --git a/.github/workflows/static.yml_ b/.github/workflows/static.yml similarity index 100% rename from .github/workflows/static.yml_ rename to .github/workflows/static.yml From bacf410359b2283258ffd5e2e4e3ecd3620c3c01 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Wed, 29 Apr 2026 13:37:44 +0300 Subject: [PATCH 15/19] Update docs/guide/en/synchronous-mode.md Co-authored-by: Alexander Makarov --- docs/guide/en/synchronous-mode.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md index a8692e1a..316d55a8 100644 --- a/docs/guide/en/synchronous-mode.md +++ b/docs/guide/en/synchronous-mode.md @@ -1,5 +1,4 @@ -Synchronous Mode -================ +# Synchronous Mode Run tasks synchronously in the same process. Useful for: From 2b0fa1eabd5f8bf02228f4d97c0acb54e75bf2d9 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Wed, 29 Apr 2026 13:38:04 +0300 Subject: [PATCH 16/19] Update docs/guide/en/synchronous-mode.md Co-authored-by: Alexander Makarov --- docs/guide/en/synchronous-mode.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md index 316d55a8..8fe4493a 100644 --- a/docs/guide/en/synchronous-mode.md +++ b/docs/guide/en/synchronous-mode.md @@ -8,7 +8,7 @@ Run tasks synchronously in the same process. Useful for: doesn't have an external broker yet — you can switch to a real adapter later without touching the call sites. -To enable it, construct the queue without an adapter (the `adapter` argument defaults to `null`): +To enable it, create the queue instance without an adapter (the `adapter` argument defaults to `null`): ```php $logger = $DIContainer->get(\Psr\Log\LoggerInterface::class); From d0e84b460cca31a03952bc6500ef8eaea7d44e60 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Wed, 29 Apr 2026 13:47:20 +0300 Subject: [PATCH 17/19] improve phpdoc --- src/Middleware/Push/MiddlewarePushStack.php | 3 +-- src/Queue.php | 11 +++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/Middleware/Push/MiddlewarePushStack.php b/src/Middleware/Push/MiddlewarePushStack.php index 97972461..e26f65a6 100644 --- a/src/Middleware/Push/MiddlewarePushStack.php +++ b/src/Middleware/Push/MiddlewarePushStack.php @@ -19,8 +19,7 @@ final class MiddlewarePushStack implements MessageHandlerPushInterface /** * @param Closure[] $middlewares Middlewares. - * @param MessageHandlerPushInterface $finishHandler Fallback handler - * events. + * @param MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are processed. */ public function __construct( private readonly array $middlewares, diff --git a/src/Queue.php b/src/Queue.php index 3fcb2f59..25fc4127 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -24,7 +24,14 @@ final class Queue implements QueueInterface * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] */ private array $middlewareDefinitions; + + /** + * @var MessageHandlerPushInterface $finalPushHandler The final push handler in the middleware chain, responsible + * for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or + * {@see AdapterPushHandler} otherwise. + */ private MessageHandlerPushInterface $finalPushHandler; + private string $name; public function __construct( @@ -164,6 +171,10 @@ private function createPushHandler(MiddlewarePushInterface|callable|array|string ) implements MessageHandlerPushInterface { public function __construct( private readonly MessageHandlerPushInterface $finishHandler, + /** + * @var MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are + * processed. + */ private readonly PushMiddlewareDispatcher $dispatcher, /** * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] From e94de0168352597894f87cdaf6ca1519ffd2705e Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Wed, 29 Apr 2026 13:48:26 +0300 Subject: [PATCH 18/19] Rename `InMemoryAdapter` to `MemoryAdapter` --- tests/App/{InMemoryAdapter.php => MemoryAdapter.php} | 2 +- tests/Unit/QueueTest.php | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) rename tests/App/{InMemoryAdapter.php => MemoryAdapter.php} (96%) diff --git a/tests/App/InMemoryAdapter.php b/tests/App/MemoryAdapter.php similarity index 96% rename from tests/App/InMemoryAdapter.php rename to tests/App/MemoryAdapter.php index 9eaceb9c..53c1c5cc 100644 --- a/tests/App/InMemoryAdapter.php +++ b/tests/App/MemoryAdapter.php @@ -11,7 +11,7 @@ use function count; -final class InMemoryAdapter implements AdapterInterface +final class MemoryAdapter implements AdapterInterface { /** @var array */ private array $messages = []; diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index cbb724d6..2f9236f9 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -9,7 +9,7 @@ use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Tests\App\FakeAdapter; -use Yiisoft\Queue\Tests\App\InMemoryAdapter; +use Yiisoft\Queue\Tests\App\MemoryAdapter; use Yiisoft\Queue\Tests\TestCase; use function extension_loaded; @@ -73,7 +73,7 @@ public function testStatusReturnsNotFoundWithoutAdapter(): void public function testRunWithAdapter(): void { - $queue = $this->createQueue(new InMemoryAdapter()); + $queue = $this->createQueue(new MemoryAdapter()); $message = new Message('simple', null); $queue->push($message); $queue->push(clone $message); @@ -84,7 +84,7 @@ public function testRunWithAdapter(): void public function testRunPartlyWithAdapter(): void { - $queue = $this->createQueue(new InMemoryAdapter()); + $queue = $this->createQueue(new MemoryAdapter()); $message = new Message('simple', null); $queue->push($message); $queue->push(clone $message); @@ -95,7 +95,7 @@ public function testRunPartlyWithAdapter(): void public function testListenWithAdapter(): void { - $queue = $this->createQueue(new InMemoryAdapter()); + $queue = $this->createQueue(new MemoryAdapter()); $message = new Message('simple', null); $queue->push($message); $queue->push(clone $message); @@ -107,7 +107,7 @@ public function testListenWithAdapter(): void public function testStatusWithAdapter(): void { - $queue = $this->createQueue(new InMemoryAdapter()); + $queue = $this->createQueue(new MemoryAdapter()); $envelope = $queue->push(new Message('simple', null)); self::assertArrayHasKey(IdEnvelope::MESSAGE_ID_KEY, $envelope->getMetadata()); From 2479f7cbf64572970ccb3ba9f83b751a2d78f9ac Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Wed, 29 Apr 2026 13:51:43 +0300 Subject: [PATCH 19/19] fix --- src/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue.php b/src/Queue.php index 25fc4127..63ee0bbf 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -170,11 +170,11 @@ private function createPushHandler(MiddlewarePushInterface|callable|array|string array_merge($this->middlewareDefinitions, $middlewares), ) implements MessageHandlerPushInterface { public function __construct( - private readonly MessageHandlerPushInterface $finishHandler, /** * @var MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are * processed. */ + private readonly MessageHandlerPushInterface $finishHandler, private readonly PushMiddlewareDispatcher $dispatcher, /** * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[]