php-rsmq

PHP implementation of Redis Simple Message Queue

View on GitHub

Redis Simple Message Queue

Build Status codecov License GitHub issues Latest Stable Version Latest Unstable Version composer.lock Total Downloads GitHub stars Dependents

A lightweight message queue for PHP that requires no dedicated queue server. Just a Redis server. See smrchy/rsmq for more information.

This is a fork of eislambey/php-rsmq with the following changes:

Table of Contents

Installation

composer require andrewbreksa/rsmq

Methods

Construct

Creates a new instance of RSMQ.

Parameters:

Example:

<?php
use Predis\Client;
use AndrewBreksa\RSMQ\RSMQClient;

$predis = new Client(
    [
        'host' => '127.0.0.1',
        'port' => 6379
    ]
);
$this->rsmq = new RSMQClient($predis);

Queue

createQueue

Create a new queue.

Parameters:

Returns:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$rsmq->createQueue('myqueue');

listQueues

List all queues

Returns an array:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queues = $rsmq->listQueues();

deleteQueue

Deletes a queue and all messages.

Parameters:

Returns:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$rsmq->deleteQueue('myqueue');

getQueueAttributes

Get queue attributes, counter and stats

Parameters:

Returns a \AndrewBreksa\RSMQ\QueueAttributes object with the following properties:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$attributes =  $rsmq->getQueueAttributes('myqueue');
echo "visibility timeout: ", $attributes->getVt(), "\n";
echo "delay for new messages: ", $attributes->getDelay(), "\n";
echo "max size in bytes: ", $attributes->getMaxSize(), "\n";
echo "total received messages: ", $attributes->getTotalReceived(), "\n";
echo "total sent messages: ", $attributes->getTotalSent(), "\n";
echo "created: ", $attributes->getCreated(), "\n";
echo "last modified: ", $attributes->getModified(), "\n";
echo "current n of messages: ", $attributes->getMessageCount(), "\n";
echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";

setQueueAttributes

Sets queue parameters.

Parameters:

Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.

Returns a \AndrewBreksa\RSMQ\QueueAttributes object with the following properties:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queue = 'myqueue';
$vt = 50;
$delay = 10;
$maxsize = 2048;
$rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);

Messages

sendMessage

Sends a new message.

Parameters:

Returns:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$id = $rsmq->sendMessage('myqueue', 'a message');
echo "Message Sent. ID: ", $id;

receiveMessage

Receive the next message from the queue.

Parameters:

Returns a \AndrewBreksa\RSMQ\Message object with the following properties:

Note: Will return an empty array if no message is there

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$message = $rsmq->receiveMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();

deleteMessage

Parameters:

Returns:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$id = $rsmq->sendMessage('queue', 'a message');
$rsmq->deleteMessage('queue', $id);

popMessage

Receive the next message from the queue and delete it.

Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.

Parameters:

Returns a \AndrewBreksa\RSMQ\Message object with the following properties:

Note: Will return an empty object if no message is there

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$message = $rsmq->popMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();

changeMessageVisibility

Change the visibility timer of a single message. The time when the message will be visible again is calculated from the current time (now) + vt.

Parameters:

Returns:

Throws:

Example:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queue = 'myqueue';
$id = $rsmq->sendMessage($queue, 'a message');
if($rsmq->changeMessageVisibility($queue, $id, 60)) {
	echo "Message hidden for 60 secs";
}

Realtime

When creating an instance of AndrewBreksa\RSMQ\RSMQClient, you can enable the realtime PUBLISH for new messages by passing true for the $realtime argument of \AndrewBreksa\RSMQ\RSMQClient::__construct. On every new message that is sent via sendMessage, a Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}.

Example for RSMQ with default settings:

The realtime option enables sending a PUBLISH when a new message is sent to RSMQ, however no further functionality is built on this feature. Your app could use the Redis SUBSCRIBE command to be notified of new messages and then attempt to poll from the queue, however due to how the Redis pub/sub system works, all listeners will be notified of the new message, this method doesn’t lend itself to driving message handling in environments with more than one subscribed process.

QueueWorker

The QueueWorker class provides an easy way to consume RSMQ messages, to use it:

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

use AndrewBreksa\RSMQ\ExecutorInterface;
use AndrewBreksa\RSMQ\Message;
use AndrewBreksa\RSMQ\QueueWorker;
use AndrewBreksa\RSMQ\WorkerSleepProvider;

$executor = new class() implements ExecutorInterface{
    public function __invoke(Message $message) : bool {
        //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish"
        return true;
    }
};

$sleepProvider = new class() implements WorkerSleepProvider{
    public function getSleep() : ?int {
        /**
         * This allows you to return null to stop the worker, which can be used with something like redis to mark.
         *
         * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject
         * before we process a message.
         */
        return 1;
    }
};

$worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue');
$worker->work(); // here we can optionally pass true to only process one message

LICENSE

The MIT LICENSE. See LICENSE