Create and Configure the ConsumerManager

The consumer manager is responsible for registering a consumer and starting the consuming process.

Create AMQP Connection

You need to create an AMQP connection with an AMQP channel which will then be used by the comsuner manager:

$connection = new PhpAmqpLib\Connection\AMQPSocketConnection('localhost', 5672, 'username', 'password');
$channel = $connection->channel();

$passive = false;
$durable = true;
$autoDelete = false;
$type = 'topic';

$channel->exchange_declare('your-exchange-name', $type, $passive, $durable, $autoDelete);

If you need other values than the ones defined, feel free to adjust them, but it is necessary to declare the exchange before you can go on.

Create a JMS Serializer

In order to deserialize the payload of an AMQP message we have to create a Serializer object The easiest way to do so is by using the SerializerBuilder from the JMS library:

use Rebuy\Amqp\Consumer\Serializer\JMSSerializerAdapter;
use JMS\Serializer\SerializerBuilder;

$serializer = SerializerBuilder::create()->build();
$serializerAdapter = new JMSSerializerAdapter($serializer);

If you’d rather want to use the symfony serializer, do the following:

use Rebuy\Amqp\Consumer\Serializer\SymfonySerializerAdapter;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Serializer\Encoder\XmlEncoder;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

$encoders = [new XmlEncoder(), new JsonEncoder()];
$normalizers = [new ObjectNormalizer()];

$serializer = new Serializer($normalizers, $encoders);
$serializerAdapter = new SymfonySerializerAdapter($serializer);

Create the Annotation Parser

The annotation parser is responsible for parsing all the consumer annotations and creating a ConsumerContainer. The container is an abstraction of the consumer method and holds all information which are necessary to consume the message:

$reader = new Doctrine\Common\Annotations\AnnotationReader();
$parser = new Rebuy\Amqp\Consumer\Annotation\Parser($reader);

Tip

You can also use a FileCacheReader instead of the AnnotationReader. Example: $reader = new FileCacheReader(new AnnotationReader(), '/path/to/cache');

Tying it all together

We have now everything we need to create the consumer manager, register consumers and start the consuming process:

$manager = new Rebuy\Amqp\Consumer\ConsumerManager($channel, $exchangeName, $serializerAdapter, $parser);
$manager->registerConsumer(new MyConsumer());

$manager->wait()

Caution

The consuming process might stop under the following conditions:

  • An exception in one of the consumers is thrown
  • No message has been processed in the last 900 seconds (this value can be altered with the method ConsumerManager#setIdleTimeout)

Note

The wait method is a blocking process. This method waits for new messages and passes every message to its desired consumer (if one exists).