From 20114650867efba2186a3001534871dde7dc625c Mon Sep 17 00:00:00 2001 From: Thibaud Girard Date: Wed, 28 Feb 2024 14:18:24 +0100 Subject: [PATCH] feat(amqp): make amqp v2 compatible --- .github/workflows/ci.yml | 3 ++- src/AmqpBundle/Amqp/Consumer.php | 24 ++++++++++++++++--- src/AmqpBundle/Amqp/Producer.php | 4 ++-- src/AmqpBundle/Tests/Units/Amqp/Consumer.php | 4 ++-- .../Units/Factory/Mock/MockAMQPChannel.php | 10 ++++---- .../Units/Factory/Mock/MockAMQPExchange.php | 2 +- .../Units/Factory/Mock/MockAMQPQueue.php | 5 ++-- 7 files changed, 36 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 29307d8..54bbfa4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,6 +9,7 @@ jobs: matrix: php-version: ['7.4', '8.0', '8.1' ] symfony-version: ['^4.4', '^5.0'] + amqp-version: ['1.11.0', '2.1.2'] fail-fast: false steps: - uses: actions/checkout@master @@ -16,7 +17,7 @@ jobs: with: php-version: ${{ matrix.php-version }} coverage: xdebug2 - extensions: amqp + extensions: amqp-${{ matrix.amqp-version }} - name: Install symfony version from matrix env: SYMFONY_VERSION: ${{ matrix.symfony-version }} diff --git a/src/AmqpBundle/Amqp/Consumer.php b/src/AmqpBundle/Amqp/Consumer.php index 766e7ad..4e67372 100644 --- a/src/AmqpBundle/Amqp/Consumer.php +++ b/src/AmqpBundle/Amqp/Consumer.php @@ -63,7 +63,13 @@ public function ackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool $this->eventDispatcher->dispatch($ackEvent,AckEvent::NAME); } - return $this->call($this->queue, 'ack', [$deliveryTag, $flags]); + $isAcked = $this->call($this->queue, 'ack', [$deliveryTag, $flags]); + + if ($isAcked === null) { + return true; + } + + return $isAcked; } /** @@ -83,7 +89,13 @@ public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): boo $this->eventDispatcher->dispatch($nackEvent, NackEvent::NAME); } - return $this->call($this->queue, 'nack', [$deliveryTag, $flags]); + $isNacked = $this->call($this->queue, 'nack', [$deliveryTag, $flags]); + + if ($isNacked === null) { + return true; + } + + return $isNacked; } /** @@ -100,7 +112,13 @@ public function purge(): bool $this->eventDispatcher->dispatch($purgeEvent, PurgeEvent::NAME); } - return $this->call($this->queue, 'purge'); + $isPurged = $this->call($this->queue, 'purge'); + + if ($isPurged === null) { + return true; + } + + return $isPurged; } /** diff --git a/src/AmqpBundle/Amqp/Producer.php b/src/AmqpBundle/Amqp/Producer.php index 3af083f..3fe843a 100644 --- a/src/AmqpBundle/Amqp/Producer.php +++ b/src/AmqpBundle/Amqp/Producer.php @@ -63,13 +63,13 @@ public function publishMessage(string $message, int $flags = AMQP_NOPARAM, array } if (!$routingKeys) { - return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]); + return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]) === null; } // Publish the message for each routing keys $success = true; foreach ($routingKeys as $routingKey) { - $success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]); + $success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]) === null; } return (bool) $success; diff --git a/src/AmqpBundle/Tests/Units/Amqp/Consumer.php b/src/AmqpBundle/Tests/Units/Amqp/Consumer.php index 9360834..be7cb52 100644 --- a/src/AmqpBundle/Tests/Units/Amqp/Consumer.php +++ b/src/AmqpBundle/Tests/Units/Amqp/Consumer.php @@ -221,8 +221,8 @@ public function testPurge() ->and($queue = $this->getQueue($msgList)) ->and($consumer = new Base($queue, [])) // Purge the queue - ->boolean($consumer->purge()) - ->isTrue() + ->integer($consumer->purge()) + ->isEqualTo(1) ->mock($queue) ->call('purge') ->once() diff --git a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php index 935d1ec..759cd4a 100644 --- a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php +++ b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php @@ -11,15 +11,15 @@ public function __construct(\AMQPConnection $amqp_connection) { } - public function qos($prefetchSize, $prefetchCount, $global = NULL) + public function qos(int $size, int $count, bool $global = NULL): void { } - public function setPrefetchSize($count){ - + public function setPrefetchSize(int $size): void + { } - public function setPrefetchCount($count){ - + public function setPrefetchCount(int $count): void + { } } diff --git a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPExchange.php b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPExchange.php index 04040b7..dc5917f 100644 --- a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPExchange.php +++ b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPExchange.php @@ -11,7 +11,7 @@ public function __construct(\AMQPChannel $amqp_channel) { } - public function declareExchange() + public function declareExchange(): void { } } diff --git a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPQueue.php b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPQueue.php index 9beb77f..a1d481a 100644 --- a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPQueue.php +++ b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPQueue.php @@ -11,11 +11,12 @@ public function __construct(\AMQPChannel $amqp_channel) { } - public function bind($exchange_name, $routing_key = null, $arguments = array()) + public function bind(string $exchange_name, string $routing_key = null, array $arguments = array()): void { } - public function declareQueue() + public function declareQueue(): int { + return 1; } }