Create and Configure the ConsumerManager

The consumer manager is responsible for registering a consumer and starting the consuming process.

Create AMQP Connection

You need to create an AMQP connection with an AMQP channel which will then be used by the consumer manager:

$connection = new PhpAmqpLib\Connection\AMQPSocketConnection('localhost', 5672, 'username', 'password');
$channel = $connection->channel();

$passive = false;
$durable = true;
$autoDelete = false;
$type = 'topic';

$channel->exchange_declare('your-exchange-name', $type, $passive, $durable, $autoDelete);

If you need other values than the ones defined, feel free to adjust them, but it is necessary to declare the exchange before you can go on.

Create a JMS Serializer

In order to deserialize the payload of an AMQP message we have to create a Serializer object The easiest way to do so is by using the SerializerBuilder from the JMS library:

use Rebuy\Amqp\Consumer\Serializer\JMSSerializerAdapter;
use JMS\Serializer\SerializerBuilder;

$serializer = SerializerBuilder::create()->build();
$serializerAdapter = new JMSSerializerAdapter($serializer);

If you’d rather want to use the symfony serializer, do the following:

use Rebuy\Amqp\Consumer\Serializer\SymfonySerializerAdapter;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Serializer\Encoder\XmlEncoder;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

$encoders = [new XmlEncoder(), new JsonEncoder()];
$normalizers = [new ObjectNormalizer()];

$serializer = new Serializer($normalizers, $encoders);
$serializerAdapter = new SymfonySerializerAdapter($serializer);

Tying it all together

We have now everything we need to create the consumer manager, register consumers and start the consuming process:

$manager = new Rebuy\Amqp\Consumer\ConsumerManager($channel, $exchangeName, $serializerAdapter, $parser);
$manager->registerConsumer(new MyConsumer());

$manager->wait()

Caution

The consuming process might stop under the following conditions:

  • An exception in one of the consumers is thrown

  • No message has been processed in the last 900 seconds (this value can be altered with the method ConsumerManager#setIdleTimeout)

Note

The wait method is a blocking process. This method waits for new messages and passes every message to its desired consumer (if one exists).