diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed5bb80..c757ff5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,6 +16,8 @@ jobs: adapter: [ AMQP, + AMQPAck, + AMQPSwoole, Pool, SwooleRedisCluster, Swoole, diff --git a/composer.json b/composer.json index 076dad8..b767ab2 100644 --- a/composer.json +++ b/composer.json @@ -19,13 +19,13 @@ }, "scripts":{ "test": "phpunit", - "check": "vendor/bin/phpstan analyse", + "check": "vendor/bin/phpstan --memory-limit=2G analyse", "format": "vendor/bin/pint", "lint": "vendor/bin/pint --test" }, "require": { "php": ">=8.3", - "appwrite-labs/php-amqplib": "^0.1", + "appwrite-labs/php-amqplib": "0.1.*", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", @@ -33,14 +33,15 @@ "utopia-php/fetch": "0.4.*" }, "require-dev": { - "ext-redis": "*", - "swoole/ide-helper": "4.8.8", - "phpunit/phpunit": "^9.5.5", - "laravel/pint": "^0.2.3", - "workerman/workerman": "^4.0", - "phpstan/phpstan": "^1.8" + "swoole/ide-helper": "5.1.7", + "phpunit/phpunit": "9.*", + "laravel/pint": "1.*", + "workerman/workerman": "4.*", + "phpstan/phpstan": "1.*" }, "suggest": { + "ext-redis": "Needed to support Redis.", + "ext-amqp": "Needed to support AMQP.", "ext-swoole": "Needed to support Swoole.", "workerman/workerman": "Needed to support Workerman." }, diff --git a/composer.lock b/composer.lock index b0efdc6..4528bc6 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "9de2edbb13039237d2a64acf9578bc19", + "content-hash": "350529d3b55908338187ee3a1f284071", "packages": [ { "name": "appwrite-labs/php-amqplib", - "version": "0.1.1", + "version": "0.1.2", "source": { "type": "git", "url": "https://github.com/appwrite-labs/php-amqplib.git", - "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861" + "reference": "c8e043045388ddad5ddab5f48df2b9046ca6873f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/bd380cbd63c8c0f063a3893b7a0b889d40876861", - "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861", + "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/c8e043045388ddad5ddab5f48df2b9046ca6873f", + "reference": "c8e043045388ddad5ddab5f48df2b9046ca6873f", "shasum": "" }, "require": { @@ -95,9 +95,9 @@ "swoole" ], "support": { - "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.1" + "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.2" }, - "time": "2025-06-24T18:12:57+00:00" + "time": "2025-07-04T20:54:22+00:00" }, { "name": "brick/math", @@ -430,16 +430,16 @@ }, { "name": "open-telemetry/api", - "version": "1.3.0", + "version": "1.4.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86" + "reference": "b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/4e3bb38e069876fb73c2ce85c89583bf2b28cd86", - "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7", + "reference": "b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7", "shasum": "" }, "require": { @@ -459,7 +459,7 @@ ] }, "branch-alias": { - "dev-main": "1.1.x-dev" + "dev-main": "1.4.x-dev" } }, "autoload": { @@ -496,7 +496,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-07T12:32:21+00:00" + "time": "2025-06-19T23:36:51+00:00" }, { "name": "open-telemetry/context", @@ -559,16 +559,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.3.1", + "version": "1.3.2", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e" + "reference": "196f3a1dbce3b2c0f8110d164232c11ac00ddbb2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/8b3ca1f86d01429c73b407bf1a2075d9c187001e", - "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/196f3a1dbce3b2c0f8110d164232c11ac00ddbb2", + "reference": "196f3a1dbce3b2c0f8110d164232c11ac00ddbb2", "shasum": "" }, "require": { @@ -619,7 +619,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-21T12:02:20+00:00" + "time": "2025-06-16T00:24:51+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -686,22 +686,22 @@ }, { "name": "open-telemetry/sdk", - "version": "1.5.0", + "version": "1.6.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657" + "reference": "1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/cd0d7367599717fc29e04eb8838ec061e6c2c657", - "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a", + "reference": "1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a", "shasum": "" }, "require": { "ext-json": "*", "nyholm/psr7-server": "^1.1", - "open-telemetry/api": "~1.0 || ~1.1", + "open-telemetry/api": "~1.4.0", "open-telemetry/context": "^1.0", "open-telemetry/sem-conv": "^1.0", "php": "^8.1", @@ -724,6 +724,10 @@ "type": "library", "extra": { "spi": { + "OpenTelemetry\\API\\Configuration\\ConfigEnv\\EnvComponentLoader": [ + "OpenTelemetry\\API\\Instrumentation\\Configuration\\General\\ConfigEnv\\EnvComponentLoaderHttpConfig", + "OpenTelemetry\\API\\Instrumentation\\Configuration\\General\\ConfigEnv\\EnvComponentLoaderPeerConfig" + ], "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\HookManagerInterface": [ "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\ExtensionHookManager" ] @@ -772,20 +776,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-22T02:33:34+00:00" + "time": "2025-06-19T23:36:51+00:00" }, { "name": "open-telemetry/sem-conv", - "version": "1.32.0", + "version": "1.32.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sem-conv.git", - "reference": "16585cc0dbc3032a318e274043454679430d2ebf" + "reference": "94daa85ea61a8e2b7e1b0af6be0e875bedda7c22" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/16585cc0dbc3032a318e274043454679430d2ebf", - "reference": "16585cc0dbc3032a318e274043454679430d2ebf", + "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/94daa85ea61a8e2b7e1b0af6be0e875bedda7c22", + "reference": "94daa85ea61a8e2b7e1b0af6be0e875bedda7c22", "shasum": "" }, "require": { @@ -829,7 +833,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-05T03:58:53+00:00" + "time": "2025-06-24T02:32:27+00:00" }, { "name": "paragonie/constant_time_encoding", @@ -1029,16 +1033,16 @@ }, { "name": "phpseclib/phpseclib", - "version": "3.0.45", + "version": "3.0.46", "source": { "type": "git", "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8" + "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/bd81b90d5963c6b9d87de50357585375223f4dd8", - "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", + "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", "shasum": "" }, "require": { @@ -1119,7 +1123,7 @@ ], "support": { "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.45" + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.46" }, "funding": [ { @@ -1135,7 +1139,7 @@ "type": "tidelift" } ], - "time": "2025-06-22T22:54:43+00:00" + "time": "2025-06-26T16:29:55+00:00" }, { "name": "psr/container", @@ -1478,21 +1482,20 @@ }, { "name": "ramsey/uuid", - "version": "4.8.1", + "version": "4.9.0", "source": { "type": "git", "url": "https://github.com/ramsey/uuid.git", - "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28" + "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ramsey/uuid/zipball/fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", - "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", + "url": "https://api.github.com/repos/ramsey/uuid/zipball/4e0e23cc785f0724a0e838279a9eb03f28b092a0", + "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0", "shasum": "" }, "require": { "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12 || ^0.13", - "ext-json": "*", "php": "^8.0", "ramsey/collection": "^1.2 || ^2.0" }, @@ -1551,9 +1554,9 @@ ], "support": { "issues": "https://github.com/ramsey/uuid/issues", - "source": "https://github.com/ramsey/uuid/tree/4.8.1" + "source": "https://github.com/ramsey/uuid/tree/4.9.0" }, - "time": "2025-06-01T06:28:46+00:00" + "time": "2025-06-25T14:20:11+00:00" }, { "name": "symfony/deprecation-contracts", @@ -1624,16 +1627,16 @@ }, { "name": "symfony/http-client", - "version": "v7.3.0", + "version": "v7.3.1", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9" + "reference": "4403d87a2c16f33345dca93407a8714ee8c05a64" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/57e4fb86314015a695a750ace358d07a7e37b8a9", - "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9", + "url": "https://api.github.com/repos/symfony/http-client/zipball/4403d87a2c16f33345dca93407a8714ee8c05a64", + "reference": "4403d87a2c16f33345dca93407a8714ee8c05a64", "shasum": "" }, "require": { @@ -1645,6 +1648,7 @@ }, "conflict": { "amphp/amp": "<2.5", + "amphp/socket": "<1.1", "php-http/discovery": "<1.15", "symfony/http-foundation": "<6.4" }, @@ -1657,7 +1661,6 @@ "require-dev": { "amphp/http-client": "^4.2.1|^5.0", "amphp/http-tunnel": "^1.0|^2.0", - "amphp/socket": "^1.1", "guzzlehttp/promises": "^1.4|^2.0", "nyholm/psr7": "^1.0", "php-http/httplug": "^1.0|^2.0", @@ -1699,7 +1702,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.3.0" + "source": "https://github.com/symfony/http-client/tree/v7.3.1" }, "funding": [ { @@ -1715,7 +1718,7 @@ "type": "tidelift" } ], - "time": "2025-05-02T08:23:16+00:00" + "time": "2025-06-28T07:58:39+00:00" }, { "name": "symfony/http-client-contracts", @@ -2037,16 +2040,16 @@ }, { "name": "tbachert/spi", - "version": "v1.0.3", + "version": "v1.0.5", "source": { "type": "git", "url": "https://github.com/Nevay/spi.git", - "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a" + "reference": "e7078767866d0a9e0f91d3f9d42a832df5e39002" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/Nevay/spi/zipball/506a79c98e1a51522e76ee921ccb6c62d52faf3a", - "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a", + "url": "https://api.github.com/repos/Nevay/spi/zipball/e7078767866d0a9e0f91d3f9d42a832df5e39002", + "reference": "e7078767866d0a9e0f91d3f9d42a832df5e39002", "shasum": "" }, "require": { @@ -2064,7 +2067,7 @@ "extra": { "class": "Nevay\\SPI\\Composer\\Plugin", "branch-alias": { - "dev-main": "0.2.x-dev" + "dev-main": "1.0.x-dev" }, "plugin-optional": true }, @@ -2083,9 +2086,9 @@ ], "support": { "issues": "https://github.com/Nevay/spi/issues", - "source": "https://github.com/Nevay/spi/tree/v1.0.3" + "source": "https://github.com/Nevay/spi/tree/v1.0.5" }, - "time": "2025-04-02T19:38:14+00:00" + "time": "2025-06-29T15:42:06+00:00" }, { "name": "utopia-php/cli", @@ -2444,16 +2447,16 @@ }, { "name": "laravel/pint", - "version": "v0.2.4", + "version": "v1.23.0", "source": { "type": "git", "url": "https://github.com/laravel/pint.git", - "reference": "45c9fe899abfeeb7794c5a8c4074c140250a84c2" + "reference": "9ab851dba4faa51a3c3223dd3d07044129021024" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/laravel/pint/zipball/45c9fe899abfeeb7794c5a8c4074c140250a84c2", - "reference": "45c9fe899abfeeb7794c5a8c4074c140250a84c2", + "url": "https://api.github.com/repos/laravel/pint/zipball/9ab851dba4faa51a3c3223dd3d07044129021024", + "reference": "9ab851dba4faa51a3c3223dd3d07044129021024", "shasum": "" }, "require": { @@ -2461,22 +2464,25 @@ "ext-mbstring": "*", "ext-tokenizer": "*", "ext-xml": "*", - "php": "^8.0" + "php": "^8.2.0" }, "require-dev": { - "friendsofphp/php-cs-fixer": "^3.8.0", - "illuminate/view": "^9.17.0", - "laravel-zero/framework": "^9.1.1", - "mockery/mockery": "^1.5.0", - "nunomaduro/larastan": "^2.1.11", - "nunomaduro/termwind": "^1.10.1", - "pestphp/pest": "^1.21.3" + "friendsofphp/php-cs-fixer": "^3.76.0", + "illuminate/view": "^11.45.1", + "larastan/larastan": "^3.5.0", + "laravel-zero/framework": "^11.45.0", + "mockery/mockery": "^1.6.12", + "nunomaduro/termwind": "^2.3.1", + "pestphp/pest": "^2.36.0" }, "bin": [ "builds/pint" ], "type": "project", "autoload": { + "files": [ + "overrides/Runner/Parallel/ProcessFactory.php" + ], "psr-4": { "App\\": "app/", "Database\\Seeders\\": "database/seeders/", @@ -2506,20 +2512,20 @@ "issues": "https://github.com/laravel/pint/issues", "source": "https://github.com/laravel/pint" }, - "time": "2022-07-13T17:57:52+00:00" + "time": "2025-07-03T10:37:47+00:00" }, { "name": "myclabs/deep-copy", - "version": "1.13.1", + "version": "1.13.2", "source": { "type": "git", "url": "https://github.com/myclabs/DeepCopy.git", - "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c" + "reference": "d25e62e636b0a9b01e3bdebb7823b474876dd829" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/1720ddd719e16cf0db4eb1c6eca108031636d46c", - "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/d25e62e636b0a9b01e3bdebb7823b474876dd829", + "reference": "d25e62e636b0a9b01e3bdebb7823b474876dd829", "shasum": "" }, "require": { @@ -2558,7 +2564,7 @@ ], "support": { "issues": "https://github.com/myclabs/DeepCopy/issues", - "source": "https://github.com/myclabs/DeepCopy/tree/1.13.1" + "source": "https://github.com/myclabs/DeepCopy/tree/1.13.2" }, "funding": [ { @@ -2566,7 +2572,7 @@ "type": "tidelift" } ], - "time": "2025-04-29T12:36:36+00:00" + "time": "2025-07-04T14:07:32+00:00" }, { "name": "nikic/php-parser", @@ -4197,16 +4203,16 @@ }, { "name": "swoole/ide-helper", - "version": "4.8.8", + "version": "5.1.7", "source": { "type": "git", "url": "https://github.com/swoole/ide-helper.git", - "reference": "dd87843a5040831f9ad40b68fb57879b7342ef61" + "reference": "c6f9cd0aa1a1e3691ed736253f0cdce381d96cae" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/swoole/ide-helper/zipball/dd87843a5040831f9ad40b68fb57879b7342ef61", - "reference": "dd87843a5040831f9ad40b68fb57879b7342ef61", + "url": "https://api.github.com/repos/swoole/ide-helper/zipball/c6f9cd0aa1a1e3691ed736253f0cdce381d96cae", + "reference": "c6f9cd0aa1a1e3691ed736253f0cdce381d96cae", "shasum": "" }, "type": "library", @@ -4223,19 +4229,9 @@ "description": "IDE help files for Swoole.", "support": { "issues": "https://github.com/swoole/ide-helper/issues", - "source": "https://github.com/swoole/ide-helper/tree/4.8.8" + "source": "https://github.com/swoole/ide-helper/tree/5.1.7" }, - "funding": [ - { - "url": "https://gitee.com/swoole/swoole?donate=true", - "type": "custom" - }, - { - "url": "https://github.com/swoole", - "type": "github" - } - ], - "time": "2022-03-17T18:24:39+00:00" + "time": "2025-03-22T23:53:02+00:00" }, { "name": "theseer/tokenizer", @@ -4359,8 +4355,6 @@ "platform": { "php": ">=8.3" }, - "platform-dev": { - "ext-redis": "*" - }, + "platform-dev": {}, "plugin-api-version": "2.6.0" } diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f547038..607c784 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -5,21 +5,29 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionBlockedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use PhpAmqpLib\Exception\AMQPTimeoutException; use Utopia\Fetch\Client; use Utopia\Queue\Consumer; use Utopia\Queue\Error\Retryable; use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; -use Utopia\Queue\Result\Commit; use Utopia\Queue\Result\NoCommit; class AMQP implements Publisher, Consumer { - protected ?AMQPChannel $channel = null; + /** + * One channel per coroutine (CID => AMQPChannel). + * Non-coroutine contexts use CID = 0. + */ + protected array $channels = []; + private array $exchangeArguments = []; private array $queueArguments = []; private array $consumerArguments = []; @@ -43,10 +51,42 @@ public function __construct( protected readonly string $vhost = '/', protected readonly int $heartbeat = 0, protected readonly float $connectTimeout = 3.0, - protected readonly float $readWriteTimeout = 3.0 + protected readonly float $readWriteTimeout = 3.0, + protected float $ackTimeout = 5.0, + protected int $maxEnqueueAttempts = 3, + protected bool $requireAck = false, ) { } + public function getConnectionType(): string + { + return AMQPStreamConnection::class; + } + + /** + * Enable or disable waiting for publisher confirms. + */ + public function setRequireAck(bool $require): void + { + $this->requireAck = $require; + } + + public function setAckTimeout(float $timeout): void + { + if ($timeout <= 0) { + throw new \InvalidArgumentException('Ack timeout must be positive'); + } + $this->ackTimeout = $timeout; + } + + public function setMaxEnqueueAttempts(int $maxEnqueueAttempts): void + { + if ($maxEnqueueAttempts < 1) { + throw new \InvalidArgumentException('Max enqueue attempts must be at least 1'); + } + $this->maxEnqueueAttempts = $maxEnqueueAttempts; + } + public function setExchangeArgument(string $key, string $value): void { $this->exchangeArguments[$key] = $value; @@ -86,11 +126,12 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $message = new Message($nextMessage); $result = $messageCallback($message); + match (true) { - $result instanceof Commit => $amqpMessage->ack(true), $result instanceof NoCommit => null, default => $amqpMessage->ack() }; + $successCallback($message); } catch (Retryable $e) { $amqpMessage->nack(requeue: true); @@ -131,9 +172,19 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { - $this->channel?->getConnection()?->close(); + foreach ($this->channels as $cid => $ch) { + try { + $ch->getConnection()?->close(); + } catch (\Throwable) { + // ignore – connection might already be closed + } + unset($this->channels[$cid]); + } } + /** + * @throws \Exception + */ public function enqueue(Queue $queue, array $payload): bool { $payload = [ @@ -142,10 +193,53 @@ public function enqueue(Queue $queue, array $payload): bool 'timestamp' => time(), 'payload' => $payload ]; - $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); + + $message = new AMQPMessage( + \json_encode($payload), + [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ] + ); + $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { - $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); + for ($attempts = 0; $attempts < $this->maxEnqueueAttempts; $attempts++) { + try { + // Redeclare topology, because the queue might not exist yet + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); + + $channel->basic_publish( + $message, + exchange: $queue->namespace, + routing_key: $queue->name, + mandatory: $this->requireAck + ); + + // No need to wait for ack if not required + if (!$this->requireAck) { + return; + } + + // Wait for the message to be acknowledged by the broker + $channel->wait_for_pending_acks($this->ackTimeout); + } catch ( + AMQPTimeoutException | + AMQPConnectionClosedException | + AMQPChannelClosedException | + AMQPConnectionBlockedException $e + ) { + // Retry sending the message if ack is not received or connection has issues + continue; + } + + // Exit the loop if ack is received + break; + } }); + return true; } @@ -187,8 +281,12 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int */ protected function withChannel(callable $callback): void { + $cid = \class_exists('\\Swoole\\Coroutine') + ? \Swoole\Coroutine::getCid() + : 0; + $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection( + $connection = new ($this->getConnectionType())( $this->host, $this->port, $this->user, @@ -198,28 +296,35 @@ protected function withChannel(callable $callback): void read_write_timeout: $this->readWriteTimeout, heartbeat: $this->heartbeat, ); - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); + if (\is_callable($this->connectionConfigHook)) { + ($this->connectionConfigHook)($connection); } + $channel = $connection->channel(); - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); + + if (\is_callable($this->channelConfigHook)) { + ($this->channelConfigHook)($channel); + } + + // Enable publisher confirms if required + if ($this->requireAck) { + $channel->confirm_select(); } return $channel; }; - if (!$this->channel) { - $this->channel = $createChannel(); + if (!isset($this->channels[$cid])) { + $this->channels[$cid] = $createChannel(); } try { - $callback($this->channel); + $callback($this->channels[$cid]); } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); + // discard broken channel for this coroutine + unset($this->channels[$cid]); + // create a new channel once; rethrow on second failure + $this->channels[$cid] = $createChannel(); + $callback($this->channels[$cid]); } } } diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php index 0803e57..c636292 100644 --- a/src/Queue/Broker/AMQPSwoole.php +++ b/src/Queue/Broker/AMQPSwoole.php @@ -2,62 +2,13 @@ namespace Utopia\Queue\Broker; -use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPSwooleConnection; class AMQPSwoole extends AMQP { - /** - * Override the withChannel method to use AMQPSwooleConnection instead of AMQPStreamConnection - * - * @param callable(AMQPChannel $channel): void $callback - * @throws \Exception - */ - protected function withChannel(callable $callback): void + #[\Override] + public function getConnectionType(): string { - $createChannel = function (): AMQPChannel { - $connection = new AMQPSwooleConnection( - $this->host, - $this->port, - $this->user, - $this->password, - $this->vhost, - false, // insist - 'AMQPLAIN', // login_method - 'en_US', // locale - $this->connectTimeout, // connection_timeout - $this->readWriteTimeout, // read_write_timeout - null, // context - false, // keepalive - $this->heartbeat, // heartbeat - 0.0 // channel_rpc_timeout - ); - - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); - } - - $channel = $connection->channel(); - - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); - } - - return $channel; - }; - - if (!$this->channel) { - $this->channel = $createChannel(); - } - - try { - $callback($this->channel); - } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); - } + return AMQPSwooleConnection::class; } } diff --git a/tests/Queue/E2E/Adapter/AMQPAckTest.php b/tests/Queue/E2E/Adapter/AMQPAckTest.php new file mode 100644 index 0000000..bb2029d --- /dev/null +++ b/tests/Queue/E2E/Adapter/AMQPAckTest.php @@ -0,0 +1,27 @@ +getPublisher(); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + public function testConcurrency(): void + { + run(function () { + $publisher = $this->getPublisher(); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + /** + * Override testRetry to run within Swoole coroutines + * @depends testEvents + */ + public function testRetry(): void + { + run(function () { + $publisher = $this->getPublisher(); + go(function () use ($publisher) { + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 1 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 2 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 3 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 4 + ]); + + $this->assertTrue($published); + + sleep(1); + $publisher->retry($this->getQueue()); + sleep(1); + $publisher->retry($this->getQueue(), 2); + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } +} diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php index 54d1752..5b76886 100644 --- a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php +++ b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php @@ -12,7 +12,12 @@ class AMQPSwooleTest extends Base { protected function getPublisher(): Publisher { - return new AMQPSwoole(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); + return new AMQPSwoole( + host: 'amqp', + port: 5672, + user: 'amqp', + password: 'amqp' + ); } protected function getQueue(): Queue diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index e8557a3..a423214 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -10,7 +10,12 @@ class AMQPTest extends Base { protected function getPublisher(): Publisher { - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); + return new AMQP( + host: 'amqp', + port: 5672, + user: 'amqp', + password: 'amqp' + ); } protected function getQueue(): Queue