Skip to content

Commit ab8a81a

Browse files
Radovan Kepákkucix
Radovan Kepák
andauthored
Support for bulk consumer (#45)
* Support for bulk consumer * Updated README.md * Added test for base consumer * Fixed styles in README.md Co-authored-by: Petr Kučera <[email protected]>
1 parent ae30026 commit ab8a81a

11 files changed

+787
-44
lines changed

.docs/README.md

+44
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,50 @@ final class TestConsumer implements IConsumer
237237
}
238238
```
239239

240+
### Consuming messages in bulk
241+
242+
Sometimes, you want to consume more messages at once, for this purpose, there is BulkConsumer.
243+
244+
TestBulkConsumer.php
245+
246+
```php
247+
<?php
248+
249+
declare(strict_types=1);
250+
251+
use Bunny\Message;
252+
use Contributte\RabbitMQ\Consumer\IConsumer;
253+
254+
final class TestConsumer
255+
{
256+
257+
/**
258+
* @param Message[] $messages
259+
* @return array(delivery_tag => MESSAGE_STATUS)
260+
*/
261+
public function consume(array $messages): array
262+
{
263+
$return = [];
264+
$data = [];
265+
foreach($messages as $message) {
266+
$data[$message->deliveryTag] = json_decode($message->content);
267+
}
268+
269+
/**
270+
* @todo bulk message action
271+
*/
272+
273+
foreach(array_keys($data) as $tag) {
274+
$return[$tag] = IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
275+
}
276+
277+
return $return;
278+
}
279+
280+
}
281+
```
282+
283+
240284
### Running a consumer trough CLI
241285

242286
There are two consumer commands prepared. `rabbitmq:consumer` wiil consume messages for specified amount of time (in

src/Console/Command/AbstractConsumerCommand.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected function validateConsumerName(string $consumerName): void
3737
throw new \InvalidArgumentException(
3838
sprintf(
3939
"Consumer [$consumerName] does not exist. \n\n Available consumers: %s",
40-
implode('', array_map(function($s): string {
40+
implode('', array_map(static function($s): string {
4141
return "\n\t- [{$s}]";
4242
}, $this->consumersDataBag->getDataKeys()))
4343
)

src/Consumer/BulkConsumer.php

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Contributte\RabbitMQ\Consumer;
6+
7+
use Bunny\AbstractClient;
8+
use Bunny\Channel;
9+
use Bunny\Client;
10+
use Bunny\Message;
11+
use Contributte\RabbitMQ\Consumer\Exception\UnexpectedConsumerResultTypeException;
12+
use Contributte\RabbitMQ\Queue\IQueue;
13+
14+
class BulkConsumer extends Consumer
15+
{
16+
/**
17+
* @var BulkMessage[]
18+
*/
19+
protected array $buffer = [];
20+
protected int $bulkSize;
21+
protected int $bulkTime;
22+
protected ?int $stopTime = null;
23+
24+
public function __construct(
25+
string $name,
26+
IQueue $queue,
27+
callable $callback,
28+
?int $prefetchSize,
29+
?int $prefetchCount,
30+
int $bulkSize,
31+
int $bulkTime
32+
) {
33+
parent::__construct($name, $queue, $callback, $prefetchSize, $prefetchCount);
34+
35+
if ($bulkSize > 0 && $bulkTime > 0) {
36+
$this->bulkSize = $bulkSize;
37+
$this->bulkTime = $bulkTime;
38+
} else {
39+
throw new \InvalidArgumentException("Configuration values bulkSize and bulkTime must have value greater than zero");
40+
}
41+
}
42+
43+
public function consume(?int $maxSeconds = null, ?int $maxMessages = null): void
44+
{
45+
$this->maxMessages = $maxMessages;
46+
if ($maxSeconds > 0) {
47+
$this->stopTime = time() + $maxSeconds;
48+
}
49+
50+
$channel = $this->queue->getConnection()->getChannel();
51+
52+
if ($this->prefetchSize !== null || $this->prefetchCount !== null) {
53+
$channel->qos($this->prefetchSize ?? 0, $this->prefetchCount ?? 0);
54+
}
55+
56+
$this->setupConsume($channel);
57+
$this->startConsumeLoop($channel);
58+
59+
//process rest of items
60+
$this->processBuffer($channel->getClient());
61+
}
62+
63+
private function setupConsume(Channel $channel): void
64+
{
65+
$channel->consume(
66+
function (Message $message, Channel $channel, Client $client): void {
67+
$this->messages++;
68+
$bulkCount = $this->addToBuffer(new BulkMessage($message, $channel));
69+
if ($bulkCount >= $this->bulkSize || $this->isMaxMessages() || $this->isStopTime()) {
70+
$client->stop();
71+
}
72+
},
73+
$this->queue->getName()
74+
);
75+
}
76+
77+
private function startConsumeLoop(Channel $channel): void
78+
{
79+
do {
80+
$channel->getClient()->run($this->getTtl());
81+
$this->processBuffer($channel->getClient());
82+
} while (!$this->isStopTime() && !$this->isMaxMessages());
83+
}
84+
85+
private function addToBuffer(BulkMessage $message): int
86+
{
87+
$this->buffer[] = $message;
88+
89+
return count($this->buffer);
90+
}
91+
92+
private function processBuffer(AbstractClient $client): void
93+
{
94+
if (count($this->buffer) === 0) {
95+
return;
96+
}
97+
98+
$messages = [];
99+
foreach ($this->buffer as $bulkMessage) {
100+
$message = $bulkMessage->getMessage();
101+
$messages[$message->deliveryTag] = $message;
102+
}
103+
104+
try {
105+
$result = call_user_func($this->callback, $messages);
106+
} catch (\Throwable $e) {
107+
$result = array_map(static fn () => IConsumer::MESSAGE_NACK, $messages);
108+
}
109+
110+
if (!is_array($result)) {
111+
$result = array_map(static fn () => IConsumer::MESSAGE_NACK, $messages);
112+
$this->sendResultsBack($client, $result);
113+
114+
throw new UnexpectedConsumerResultTypeException(
115+
'Unexpected result from consumer. Expected array(delivery_tag => MESSAGE_STATUS [constant from IConsumer]) but get ' . gettype($result)
116+
);
117+
}
118+
$result = array_map('intval', $result);
119+
120+
$this->sendResultsBack($client, $result);
121+
122+
$this->buffer = [];
123+
}
124+
125+
private function sendResultsBack(AbstractClient $client, array $result): void
126+
{
127+
if ($client instanceof Client) {
128+
foreach ($this->buffer as $bulkMessage) {
129+
$this->sendResponse(
130+
$bulkMessage->getMessage(),
131+
$bulkMessage->getChannel(),
132+
$result[$bulkMessage->getMessage()->deliveryTag] ?? IConsumer::MESSAGE_NACK,
133+
$client
134+
);
135+
}
136+
}
137+
}
138+
139+
private function isStopTime(): bool
140+
{
141+
return $this->stopTime !== null && $this->stopTime < time();
142+
}
143+
144+
private function getTtl(): int
145+
{
146+
if ($this->stopTime > 0) {
147+
return min($this->bulkTime, $this->stopTime - time());
148+
}
149+
150+
return $this->bulkTime;
151+
}
152+
}

src/Consumer/BulkMessage.php

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Contributte\RabbitMQ\Consumer;
6+
7+
use Bunny\Channel;
8+
use Bunny\Message;
9+
10+
class BulkMessage
11+
{
12+
private Message $message;
13+
private Channel $channel;
14+
15+
public function __construct(
16+
Message $message,
17+
Channel $channel
18+
) {
19+
$this->message = $message;
20+
$this->channel = $channel;
21+
}
22+
23+
public function getMessage(): Message {
24+
return $this->message;
25+
}
26+
27+
public function getChannel(): Channel {
28+
return $this->channel;
29+
}
30+
}

src/Consumer/Consumer.php

+57-43
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,20 @@
99
use Bunny\Message;
1010
use Contributte\RabbitMQ\Queue\IQueue;
1111

12-
final class Consumer
12+
class Consumer
1313
{
1414

15-
private string $name;
16-
private IQueue $queue;
15+
protected string $name;
16+
protected IQueue $queue;
1717

1818
/**
1919
* @var callable
2020
*/
21-
private $callback;
22-
private int $messages = 0;
23-
private ?int $prefetchSize = null;
24-
private ?int $prefetchCount = null;
21+
protected $callback;
22+
protected int $messages = 0;
23+
protected ?int $prefetchSize = null;
24+
protected ?int $prefetchCount = null;
25+
protected ?int $maxMessages = null;
2526

2627

2728
public function __construct(
@@ -52,62 +53,75 @@ public function getCallback(): callable
5253

5354
public function consume(?int $maxSeconds = null, ?int $maxMessages = null): void
5455
{
56+
$this->maxMessages = $maxMessages;
5557
$channel = $this->queue->getConnection()->getChannel();
5658

5759
if ($this->prefetchSize !== null || $this->prefetchCount !== null) {
5860
$channel->qos($this->prefetchSize ?? 0, $this->prefetchCount ?? 0);
5961
}
6062

6163
$channel->consume(
62-
function (Message $message, Channel $channel, Client $client) use ($maxMessages): void {
64+
function (Message $message, Channel $channel, Client $client): void {
65+
$this->messages++;
6366
$result = call_user_func($this->callback, $message);
6467

65-
switch ($result) {
66-
case IConsumer::MESSAGE_ACK:
67-
// Acknowledge message
68-
$channel->ack($message);
68+
$this->sendResponse($message, $channel, $result, $client);
6969

70-
break;
70+
if ($this->isMaxMessages()) {
71+
$client->stop();
72+
}
73+
},
74+
$this->queue->getName()
75+
);
7176

72-
case IConsumer::MESSAGE_NACK:
73-
// Message will be requeued
74-
$channel->nack($message);
77+
$channel->getClient()->run($maxSeconds);
78+
}
7579

76-
break;
80+
protected function sendResponse(Message $message, Channel $channel, int $result, Client $client): void
81+
{
82+
switch ($result) {
83+
case IConsumer::MESSAGE_ACK:
84+
// Acknowledge message
85+
$channel->ack($message);
7786

78-
case IConsumer::MESSAGE_REJECT:
79-
// Message will be discarded
80-
$channel->reject($message, false);
87+
break;
8188

82-
break;
89+
case IConsumer::MESSAGE_NACK:
90+
// Message will be requeued
91+
$channel->nack($message);
8392

84-
case IConsumer::MESSAGE_REJECT_AND_TERMINATE:
85-
// Message will be discarded
86-
$channel->reject($message, false);
87-
$client->stop();
93+
break;
8894

89-
break;
95+
case IConsumer::MESSAGE_REJECT:
96+
// Message will be discarded
97+
$channel->reject($message, false);
9098

91-
case IConsumer::MESSAGE_ACK_AND_TERMINATE:
92-
// Acknowledge message and terminate
93-
$channel->ack($message);
94-
$client->stop();
99+
break;
95100

96-
break;
101+
case IConsumer::MESSAGE_REJECT_AND_TERMINATE:
102+
// Message will be discarded
103+
$channel->reject($message, false);
104+
$client->stop();
97105

98-
default:
99-
throw new \InvalidArgumentException(
100-
"Unknown return value of consumer [{$this->name}] user callback"
101-
);
102-
}
106+
break;
103107

104-
if ($maxMessages !== null && ++$this->messages >= $maxMessages) {
105-
$client->stop();
106-
}
107-
},
108-
$this->queue->getName()
109-
);
108+
case IConsumer::MESSAGE_ACK_AND_TERMINATE:
109+
// Acknowledge message and terminate
110+
$channel->ack($message);
111+
$client->stop();
110112

111-
$channel->getClient()->run($maxSeconds);
113+
break;
114+
115+
default:
116+
throw new \InvalidArgumentException(
117+
"Unknown return value of consumer [{$this->name}] user callback"
118+
);
119+
}
112120
}
121+
122+
protected function isMaxMessages(): bool
123+
{
124+
return $this->maxMessages !== null && $this->messages >= $this->maxMessages;
125+
}
126+
113127
}

0 commit comments

Comments
 (0)