amqp-php-consumer¶
This is the documentation for the amqp-php-consumer library.
This library allows you to define consumers for AMQP with doctrine annotations. For consuming messages the AMQP-Library videlalvaro/php-amqplib is used.
Contents:
Getting started¶
Installation¶
The amqp-php-consumer library is available on Packagist. You can install it using Composer:
$ composer require rebuy/amqp-php-consumer
Note
This library follows Semantic Versioning. Except for major versions, we aim to not introduce BC breaks in new releases. You should still test your application after upgrading though. What is a bug fix for somebody could break something for others when they where (probably unawares) relying on that bug.
Configuration¶
There are two things you need to do to get started:
Creating Consumers¶
Let’s assume you have an amqp message which will be published when an order has been created. This message has
the routing key order-created
with the payload {"order_id": $SOME_ID}
. In this example we create a consumer
which sends an email to the customer when this message will be published.
First of all, you have to create a PHP class which represents this message:
namespace My\Consumer;
use JMS\Serializer\Annotation\Type;
use Rebuy\Amqp\Consumer\Message\MessageInterface;
class OrderCreatedMessage implements MessageInterface
{
/**
* @Type("integer")
*
* @var int
*/
public $orderId;
public static function getRoutingKey()
{
return 'order-created';
}
}
Note
Since this library uses the jms/serializer component to deserialize the payload for all messages, we have
to define a @Type
for the property $orderId
.
With this message we are able to create our consumer which will send an email to the customer:
class OrderConsumer
{
private $orderService;
private $emailService;
public function __construct($orderService, $emailService)
{
$this->orderService = $orderService;
$this->emailService = $emailService;
}
/**
* @Consumer(name="order-created-send-email")
*/
public function sendMail(OrderCreatedMessage $message)
{
$order = $this->orderService->loadOrder($message->orderId);
$this->emailService->sendOrderCreatedEmail($order);
}
}
Note
You can create multiple consumers which consume the same message, but they must use a different name, otherwise
an ConsumerException
is thrown.
Now that you have created a consumer, you can go on to the next section and create the consumer manager
Create and Configure the ConsumerManager¶
The consumer manager is responsible for registering a consumer and starting the consuming process.
Create AMQP Connection¶
You need to create an AMQP connection with an AMQP channel which will then be used by the comsuner manager:
$connection = new PhpAmqpLib\Connection\AMQPSocketConnection('localhost', 5672, 'username', 'password');
$channel = $connection->channel();
$passive = false;
$durable = true;
$autoDelete = false;
$type = 'topic';
$channel->exchange_declare('your-exchange-name', $type, $passive, $durable, $autoDelete);
If you need other values than the ones defined, feel free to adjust them, but it is necessary to declare the exchange before you can go on.
Create a JMS Serializer¶
In order to deserialize the payload of an AMQP message we have to create a Serializer object
The easiest way to do so is by using the SerializerBuilder
from the JMS library:
use Rebuy\Amqp\Consumer\Serializer\JMSSerializerAdapter;
use JMS\Serializer\SerializerBuilder;
$serializer = SerializerBuilder::create()->build();
$serializerAdapter = new JMSSerializerAdapter($serializer);
If you’d rather want to use the symfony serializer, do the following:
use Rebuy\Amqp\Consumer\Serializer\SymfonySerializerAdapter;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Serializer\Encoder\XmlEncoder;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
$encoders = [new XmlEncoder(), new JsonEncoder()];
$normalizers = [new ObjectNormalizer()];
$serializer = new Serializer($normalizers, $encoders);
$serializerAdapter = new SymfonySerializerAdapter($serializer);
Create the Annotation Parser¶
The annotation parser is responsible for parsing all the consumer annotations and creating a ConsumerContainer. The container is an abstraction of the consumer method and holds all information which are necessary to consume the message:
$reader = new Doctrine\Common\Annotations\AnnotationReader();
$parser = new Rebuy\Amqp\Consumer\Annotation\Parser($reader);
Tip
You can also use a FileCacheReader instead of the AnnotationReader. Example:
$reader = new FileCacheReader(new AnnotationReader(), '/path/to/cache');
Tying it all together¶
We have now everything we need to create the consumer manager, register consumers and start the consuming process:
$manager = new Rebuy\Amqp\Consumer\ConsumerManager($channel, $exchangeName, $serializerAdapter, $parser);
$manager->registerConsumer(new MyConsumer());
$manager->wait()
Caution
The consuming process might stop under the following conditions:
- An exception in one of the consumers is thrown
- No message has been processed in the last 900 seconds (this value can be altered with the method
ConsumerManager#setIdleTimeout
)
Note
The wait
method is a blocking process. This method waits for new messages and passes every message to
its desired consumer (if one exists).
Events¶
There are currently two events dispatched when consuming a message:
Rebuy\Amqp\Consumer\ConsumerEvents::PRE_CONSUME
: Before the message is consumedRebuy\Amqp\Consumer\ConsumerEvents::POST_CONSUME
: After the message has been consumed
These events are dispatched by an symfony2 event dispatcher. If you want to listen to one of these events, you have to create a subsriber/listener, add it to the event dispatcher and set the dispatcher to the manager:
$dispatcher = new Symfony\Component\EventDispatcher\EventDispatcher();
$dispatcher->addListener(Rebuy\Amqp\Consumer\ConsumerEvents::PRE_CONSUME, $myListener);
$dispatcher->addSubscriber(new MySubscriber());
$manager = new Rebuy\Amqp\Consumer\ConsumerManager(...);
$manager->setEventDispatcher($dispatcher);
Implemented Subscriber¶
Some useful subscribers are already shipped with this library:
- TimingSubscriber: Uses symfony/stopwatch and league/statsd for writing timing metrics to statds
- LogSubscriber: Uses a LoggerInterface to log a debug message for every consumed message
Error Handlers¶
You can register several error handlers which will be called when an exception in the consuming process is thrown.
Every error handler must implement the interface Rebuy\Amqp\Consumer\Handler\ErrorHandlerInterface
, this
interface only requires one method handle(ConsumerContainerException $ex)
.
An error handler can be registered in the following way:
$manager = new Rebuy\Amqp\Consumer\ConsumerManager(...);
$manager->registerErrorHandler(new MyErrorHandler());
Danger
As soon as one error handler is registered, the consuming of the message is considered successful. If you want to stop the consuming process, you must throw the passed exception (or an own exception) by yourself.
Implemented error handlers¶
Currently there are two error handlers implemented in this library:
- RequeuerHandler: Requeues the message so it can be processed at a later time
- LoggerHandler: Uses a LoggerInterface to log a warning message (this handler is only useful in combination with the RequeuerHandler)
Contributing¶
We are happy for contributions. Before you invest a lot of time however, best open an issue on github to discuss your idea. Then we can coordinate efforts if somebody is already working on the same thing.
Testing the Library¶
This chapter describes how to run the tests that are included with this library.
First clone the repository, install the vendors, then run the tests:
$ git clone https://github.com/rebuy/amqp-php-consumer.git
$ cd amqp-php-consumer
$ composer install --dev
$ bin/phpunit