Skip to content
74 changes: 72 additions & 2 deletions src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,73 @@
class Redis implements Publisher, Consumer
{
private const int POP_TIMEOUT = 2;
private const int RECONNECT_BACKOFF_MS = 100;
private const int RECONNECT_MAX_BACKOFF_MS = 5_000;

private bool $closed = false;
/**
* @var (callable(Queue, \Throwable, int, int): void)|null
*/
private $reconnectCallback = null;
/**
* @var (callable(Queue, int): void)|null
*/
private $reconnectSuccessCallback = null;

public function __construct(private readonly Connection $connection)
{
}

public function setReconnectCallback(?callable $callback): self
{
$this->reconnectCallback = $callback;

return $this;
}

public function setReconnectSuccessCallback(?callable $callback): self
{
$this->reconnectSuccessCallback = $callback;

return $this;
}

public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
$reconnectBackoffMs = self::RECONNECT_BACKOFF_MS;
$reconnectAttempt = 0;

while (!$this->closed) {
/**
* Waiting for next Job.
*/
try {
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
} catch (\RedisException $e) {
if ($reconnectAttempt > 0) {
$this->triggerReconnectSuccessCallback($queue, $reconnectAttempt);
}

$reconnectBackoffMs = self::RECONNECT_BACKOFF_MS;
$reconnectAttempt = 0;
} catch (\RedisException|\RedisClusterException $e) {
if ($this->closed) {
break;
}

throw $e;
$reconnectAttempt++;

try {
$this->connection->close();
} catch (\Throwable) {
}

$sleepMs = \mt_rand(0, $reconnectBackoffMs);
$this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs);

\usleep($sleepMs * 1000);
$reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2);

continue;
}

if (!$nextMessage) {
Expand Down Expand Up @@ -104,6 +150,30 @@ public function close(): void
$this->closed = true;
}

private function triggerReconnectCallback(Queue $queue, \Throwable $error, int $attempt, int $sleepMs): void
{
if (!\is_callable($this->reconnectCallback)) {
return;
}

try {
($this->reconnectCallback)($queue, $error, $attempt, $sleepMs);
} catch (\Throwable) {
}
}

private function triggerReconnectSuccessCallback(Queue $queue, int $attempts): void
{
if (!\is_callable($this->reconnectSuccessCallback)) {
return;
}

try {
($this->reconnectSuccessCallback)($queue, $attempts);
} catch (\Throwable) {
}
}

public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
$payload = [
Expand Down
67 changes: 59 additions & 8 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

class Redis implements Connection
{
protected const int CONNECT_MAX_ATTEMPTS = 5;
protected const int CONNECT_BACKOFF_MS = 100;
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;

protected string $host;
protected int $port;
protected ?string $user;
Expand Down Expand Up @@ -178,8 +182,12 @@ public function ping(): bool

public function close(): void
{
$this->redis?->close();
$this->redis = null;
try {
$this->redis?->close();
} catch (\Throwable) {
} finally {
$this->redis = null;
}
}

protected function getRedis(): \Redis
Expand All @@ -188,15 +196,58 @@ protected function getRedis(): \Redis
return $this->redis;
}

$this->redis = new \Redis();

$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
$this->redis->connect($this->host, $this->port, $connectTimeout);

if ($this->readTimeout >= 0) {
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
$redis = new \Redis();
$connected = false;

try {
$redis->connect($this->host, $this->port, $connectTimeout);
$connected = true;

if ($this->readTimeout >= 0) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
}

$this->redis = $redis;
return $this->redis;
Comment thread
greptile-apps[bot] marked this conversation as resolved.
} catch (\RedisException $e) {
if ($connected) {
try {
$redis->close();
} catch (\Throwable) {
}
}

if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
throw new \RedisException(
\sprintf(
'Failed to connect to Redis at %s:%d after %d attempts: %s',
$this->host,
$this->port,
self::CONNECT_MAX_ATTEMPTS,
$e->getMessage(),
),
(int)$e->getCode(),
$e,
);
}

// Exponential backoff with full jitter to avoid thundering herd on recovery.
$backoffMs = \min(
self::CONNECT_MAX_BACKOFF_MS,
self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)),
);
\usleep(\mt_rand(0, $backoffMs) * 1000);
}
}

return $this->redis;
throw new \RedisException(\sprintf(
'Unreachable: Redis connect loop for %s:%d exited after %d attempts without success or exception.',
$this->host,
$this->port,
self::CONNECT_MAX_ATTEMPTS,
));
}
}
47 changes: 43 additions & 4 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

class RedisCluster implements Connection
{
protected const int CONNECT_MAX_ATTEMPTS = 5;
protected const int CONNECT_BACKOFF_MS = 100;
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;

protected array $seeds;
protected float $connectTimeout;
protected float $readTimeout;
Expand Down Expand Up @@ -175,8 +179,12 @@ public function ping(): bool

public function close(): void
{
$this->redis?->close();
$this->redis = null;
try {
$this->redis?->close();
} catch (\Throwable) {
} finally {
$this->redis = null;
}
}

protected function getRedis(): \RedisCluster
Expand All @@ -187,7 +195,38 @@ protected function getRedis(): \RedisCluster

$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
$readTimeout = $this->readTimeout < 0 ? 0 : $this->readTimeout;
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
return $this->redis;

for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
try {
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
return $this->redis;
} catch (\RedisClusterException $e) {
if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
throw new \RedisClusterException(
\sprintf(
'Failed to connect to Redis cluster nodes [%s] after %d attempts: %s',
\implode(', ', $this->seeds),
self::CONNECT_MAX_ATTEMPTS,
$e->getMessage(),
),
(int)$e->getCode(),
$e,
);
}

// Exponential backoff with full jitter to avoid thundering herd on recovery.
$backoffMs = \min(
self::CONNECT_MAX_BACKOFF_MS,
self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)),
);
\usleep(\mt_rand(0, $backoffMs) * 1000);
}
}

throw new \RedisClusterException(\sprintf(
'Unreachable: Redis cluster connect loop for nodes [%s] exited after %d attempts without success or exception.',
\implode(', ', $this->seeds),
self::CONNECT_MAX_ATTEMPTS,
));
}
}
Loading
Loading