8000 Feature: Add fifo middleware by hadeli · Pull Request #64 · brefphp/symfony-messenger · GitHub
[go: up one dir, main page]

Skip to content

Feature: Add fifo middleware #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,13 @@ resources:
```

[Symfony Amazon SQS Messenger](https://symfony.com/doc/current/messenger.html#amazon-sqs) will automatically calculate/set
the `MessageGroupId` and `MessageDeduplicationId` parameters required for FIFO queues, but you can set them explicitly:
the `MessageGroupId` and `MessageDeduplicationId` parameters required for FIFO queues, but you can set them explicitly if you implement interfaces:

[AWS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) have limitations :
* Max length of the messageDeduplicationId is 128 characters.
* Can contain only alphanumeric characters and punctuation.

##### Method 1 : Set explicitly

```php
use Symfony\Component\Messenger\MessageBus;
Expand All @@ -217,6 +223,42 @@ use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
/* @var MessageBus $messageBus */
$messageBus->dispatch(new MyAsyncMessage(), [new AmazonSqsFifoStamp('my-group-message-id', 'my-deduplication-id')]);
```

##### Method 2 : Use interfaces and middleware
###### Step 1: Implements the interfaces on your message class
You can implement the interfaces on your message class to automatically set the `MessageGroupId` and `MessageDeduplicationId` parameters.
These interfaces are independent each other.

```php
use Bref\Symfony\Messenger\Event\Sqs\WithMessageGroupId;
use Bref\Symfony\Messenger\Event\Sqs\WithMessageDeduplicationId;

class MyAsyncMessage implements WithMessageGroupId, WithMessageDeduplicationId
{
public function messageGroupId(): string
{
// Implement the interface
}

public function messageDeduplicationId(): string
{
// Implement the interface
}

}
```

###### Step 2: Use the middleware

```yaml
# messenger.yaml
framework:
messenger:
buses:
your_bus:
middleware:
- 'Bref\Symfony\Messenger\Service\Sqs\Middleware\AddFifoStampToEnveloppe'
```
Everything else is identical to the normal SQS queue.

### SNS
Expand Down
14 changes: 14 additions & 0 deletions src/Event/Sqs/WithMessageDeduplicationId.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Bref\Symfony\Messenger\Event\Sqs;

interface WithMessageDeduplicationId
{
/**
* Max length of the messageDeduplicationId is 128 characters.
* Can contain only alphanumeric characters and punctuation.
* @See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
*/
public function messageDeduplicationId(): string;

}
9 changes: 9 additions & 0 deletions src/Event/Sqs/WithMessageGroupId.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

namespace Bref\Symfony\Messenger\Event\Sqs;

interface WithMessageGroupId
{
public function messageGroupId(): string;

}
40 changes: 40 additions & 0 deletions src/Service/Sqs/Middleware/AddFifoStampToEnveloppe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php declare(strict_types=1);

namespace Bref\Symfony\Messenger\Service\Sqs\Middleware;

use Bref\Symfony\Messenger\Event\Sqs\WithMessageDeduplicationId;
use Bref\Symfony\Messenger\Event\Sqs\WithMessageGroupId;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

class AddFifoStampToEnveloppe implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$messageGroupId = null;
$messageDeduplicationId = null;
if ($message instanceof WithMessageGroupId) {
$messageGroupId = $message->messageGroupId();
}
if ($message instanceof WithMessageDeduplicationId) {
$messageDeduplicationId = $message->messageDeduplicationId();
}

if ($messageGroupId || $messageDeduplicationId) {
$envelope = $envelope->with($this->fifoStamp($messageGroupId, $messageDeduplicationId));
}

return $stack->next()->handle($envelope, $stack);
}

private function fifoStamp(?string $messageGroupId, ?string $messageDeduplicationId): AmazonSqsFifoStamp
{
return new AmazonSqsFifoStamp(
$messageGroupId,
$messageDeduplicationId,
);
}
}
166 changes: 166 additions & 0 deletions tests/Unit/Service/Sqs/Middleware/AddFifoStampToEnveloppeTest.php
5359
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
<?php declare(strict_types=1);

namespace Bref\Symfony\Messenger\Test\Unit\Service\Sqs\Middleware;

use Bref\Symfony\Messenger\Event\Sqs\WithMessageDeduplicationId;
use Bref\Symfony\Messenger\Event\Sqs\WithMessageGroupId;
use Bref\Symfony\Messenger\Service\Sqs\Middleware\AddFifoStampToEnveloppe;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

class AddFifoStampToEnveloppeTest extends TestCase
{

public function testHandleWithGroupIdOnly(): void
{
$message = new WithMessageGroupIdMessage('groupId');
$stack = $this->getMockBuilder(StackInterface::class)
->disableOriginalConstructor()
->onlyMethods(['next'])
->getMock();
$stack->expects($this->once())
->method('next')
->willReturn(new EmptyMiddlewareInterface());
$envelope = new Envelope($message);
$middleware = new AddFifoStampToEnveloppe();
$envelope = $middleware->handle($envelope, $stack);
$stamp = $envelope->last(AmazonSqsFifoStamp::class);
$this->assertNotNull($stamp);
/** @var AmazonSqsFifoStamp $stamp */
$this->assertEquals('groupId', $stamp->getMessageGroupId());
$this->assertNull($stamp->getMessageDeduplicationId());
}

public function testHandleWithDeduplicationIdOnly(): void
{
$message = new WithMessageDeduplicationIdMessage('deduplicationId');
$stack = $this->getMockBuilder(StackInterface::class)
->disableOriginalConstructor()
->onlyMethods(['next'])
->getMock();
$stack->expects($this->once())
->method('next')
->willReturn(new EmptyMiddlewareInterface());
$envelope = new Envelope($message);
$middleware = new AddFifoStampToEnveloppe();
$envelope = $middleware->handle($envelope, $stack);
$stamp = $envelope->last(AmazonSqsFifoStamp::class);
$this->assertNotNull($stamp);
/** @var AmazonSqsFifoStamp $stamp */
$this->assertEquals('deduplicationId', $stamp->getMessageDeduplicationId());
$this->assertNull($stamp->getMessageGroupId());
}

public function testHandleWithGroupIdAndDeduplicationId(): void
{
$message = new WithMessageDeduplicationIdAndMessageGroupIdMessage('groupId', 'deduplicationId');
$stack = $this->getMockBuilder(StackInterface::class)
->disableOriginalConstructor()
->onlyMethods(['next'])
->getMock();
$stack->expects($this->once())
->method('next')
->willReturn(new EmptyMiddlewareInterface());
$envelope = new Envelope($message);
$middleware = new AddFifoStampToEnveloppe();
$envelope = $middleware->handle($envelope, $stack);
$stamp = $envelope->last(AmazonSqsFifoStamp::class);
$this->assertNotNull($stamp);
/** @var AmazonSqsFifoStamp $stamp */
$this->assertEquals('deduplicationId', $stamp->getMessageDeduplicationId());
$this->assertEquals('groupId', $stamp->getMessageGroupId());
}

public function testHandleWithoutId(): void
{
$message = new WithoutIdMessage();
$stack = $this->getMockBuilder(StackInterface::class)
->disableOriginalConstructor()
->onlyMethods(['next'])
->getMock();
$stack->expects($this->once())
->method('next')
->willReturn(new EmptyMiddlewareInterface());
$envelope = new Envelope($message);
$middleware = new AddFifoStampToEnveloppe();
$envelope = $middleware->handle($envelope, $stack);
$stamp = $envelope->last(AmazonSqsFifoStamp::class);
$this->assertNull($stamp);
}
}

class EmptyMiddlewareInterface implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
return $envelope;
}
}


class WithMessageDeduplicationIdAndMessageGroupIdMessage implements WithMessageDeduplicationId, WithMessageGroupId
{
private string $messageGroupId;
private string $messageDeduplicationId;

public function __construct(
string $messageGroupId,
string $messageDeduplicationId
)
{
$this->messageGroupId = $messageGroupId;
$this->messageDeduplicationId = $messageDeduplicationId;
}

public function messageDeduplicationId(): string
{
return $this->messageDeduplicationId;
}

public function messageGroupId(): string
{
return $this->messageGroupId;
}
}


class WithMessageDeduplicationIdMessage implements WithMessageDeduplicationId
{
private string $messageDeduplicationId;

public function __construct(
string $messageDeduplicationId
)
{
$this->messageDeduplicationId = $messageDeduplicationId;
}

public function messageDeduplicationId(): string
{
return $this->messageDeduplicationId;
}
}


class WithMessageGroupIdMessage implements WithMessageGroupId
{
private string $messageGroupId;

public function __construct(
string $messageGroupId
)
{
$this->messageGroupId = $messageGroupId;
}

public function messageGroupId(): string
{
return $this->messageGroupId;
}
}
class WithoutIdMessage
{
}
0