La séptima vida

...o el gato así lo espera/teme

Playing with MQTT and Server-Sent Events

Let's imagine an industrial machine, like a press, a welder, a rectifier, or an entire assembly line. This imaginary machine is able to send a continuous stream of messages with process measurements, times, parameters, notifications. Imagine now that we have a web page where we can see a summary of this information, in real time. The nice looking graphs move and flow in front of us, live.

Well, this is not news, of course. You can spend some thousands of dollars on a nice SCADA system and then some time developing your application and you'll eventually get there. The number of technologies available to do this is just as large as the number of vendors that implement them.

So, let's try to do it with Perl. At least a little bit. Just enough to see that it is indeed possible. It can be done using MODBUS to extract data from the machine, the MQTT protocol to publish the data to the factory network, and finally Server-Sent Events to send the data to the browser.

MQTT is a lightweight protocol that works just like Twitter. Users can publish messages to a topic, through a broker. The broker will then distribute these publications to the subscribers of the given topic. MQTT clients can both publish and subscribe as the need arises.

On the other end, Server-Sent Events is a technology that allows servers to send data to clients as it becomes available. Wikipedia defines them as follows:

Server-sent events is a standard describing how servers can initiate data transmission towards clients once an initial client connection has been established. They are commonly used to send message updates or continuous data streams to a browser client and designed to enhance native, cross-browser streaming through a JavaScript API called EventSource, through which a client requests a particular URL in order to receive an event stream.

And of course, all of this will be glued together by Perl. This article will just cover a MQTT client that will receive external messages and push them to a web browser asynchronously.

The script starts like this:

use EV;
use AnyEvent::MQTT;
use AnyEvent;
use Plack::Request;
use HTTP::ServerEvent;

use strict;
use warnings;
use feature ':5.10';

Starting with the easy part, we use strict; use warnings; use feature ':5.10' which is standard. I added the 5.10 features just to sprinkle say STDERR debug messages here and there.

Then, there are several modules at work. EV and AnyEvent will implement the asynchrounous loop in which our application will run. But it is a web application. So, there is a server which will consume our application. The script uses Plack::Request to receive HTTP requests from the server and HTTP::ServerEvent to format our responses according to the Server-Sent Events specification. Finally, AnyEvent::MQTT will catch the MQTT messages we subscribe to.

Just after the preamble, we create the MQTT client (look ma! no validation and just with the defaults!) and a web application. A web application is just a code reference according to the PSGI specification. This web application will use the MQTT client to subscribe to a bogus topic and then send all the received messages to the browser.

my $mqtt = AnyEvent::MQTT->new;
my $w;

my $app = sub {
    my $env = shift;
    my $req = Plack::Request->new( $env );
    ...

Our web application, the $app anonymous subroutine, will be executed by the server with some environment data. Plack::Request will make sense of this data and present us with a nice interface to work with.

my $app = sub {
    my $env = shift;
    my $req = Plack::Request->new( $env );

    return sub {
        my $responder = shift;

        my $i = 0;

        my $last_event_id = $req->header('last-event-id');
        $last_event_id and $i = $last_event_id + 1;

        my $writer = $responder->(
            [ 200,
              [
                'Content-Type', 'text/event-stream',
                'Access-Control-Allow-Origin', '*',
              ]
            ]
        );        
    ...

Every event sent to the client must be identified. In case the connection is interrupted, the browser will re-connect automatically and it will send the identification number of the last event it received. The variable $i will be used later to identify each message sent to the browser, and its initial value will be taken from the last-event-id request header.

In order to strem data with Plack, your application must return an anonymous subroutine, which will be called with a responder code reference. This responder must be evaluated against an array reference containing two elements: the status code and an array reference of response headers. The responder will return a writer object. The writer implements two methods, write and close. The former is used to send data to the client, asynchronously, until the connection is closed using the later method.

Now that we have our writer, we need to subscribe to the MQTT message we are interested in. For the sake of the example, this topic is essai/rand. Every time that there is a message published to that topic, our callback will be executed. This callback simply packs the message in a string per the specification of Server-Sent Events and forwards it to the response via the writer object. In our script, if the value of the message is 0, we will send a 'closed' event so that the client terminates the connection, and then we unsubscribe from the topic.

        $w = $mqtt->subscribe(
            topic    => 'essai/rand',
            callback => sub {
                my ($topic, $msg) = @_;
                
                my $hse = HTTP::ServerEvent->as_string(
                    id    => $i++,
                    event => 'data',
                    data  => $msg
                );
                
                $writer->write($hse);

                if ( $msg == 0 ) {
                    $hse = HTTP::ServerEvent->as_string(
                        event => 'closed',
                        data  => "Adiós"
                    );
                    $writer->write($hse);
                    $mqtt->unsubscribe(topic => 'essai/rand');
                    undef $w;
                }
            }
        );

The whole script looks like this:

use EV;
use AnyEvent::MQTT;
use AnyEvent;
use Plack::Request;
use HTTP::ServerEvent;

use strict;
use warnings;
use feature ':5.10';

my $mqtt = AnyEvent::MQTT->new;
my $w;

my $app = sub {
    my $env = shift;
    my $req = Plack::Request->new( $env );
    
    return sub {
        my $responder = shift;
        my $i = 0;

        my $last_event_id = $req->header('last-event-id');
        $last_event_id and $i = $last_event_id + 1;

        my $writer = $responder->(
            [ 200,
              [
                'Content-Type', 'text/event-stream',
                'Access-Control-Allow-Origin', '*',
              ]
            ]
        );        

        $w = $mqtt->subscribe(
            topic    => 'essai/rand',
            callback => sub {
                my ($topic, $msg) = @_;
                
                my $hse = HTTP::ServerEvent->as_string(
                    id    => $i++,
                    event => 'data',
                    data  => $msg
                );
                
                $writer->write($hse);

                if ( $msg == 0 ) {
                    $hse = HTTP::ServerEvent->as_string(
                        event => 'closed',
                        data  => "Adiós"
                    );
                    $writer->write($hse);
                    $mqtt->unsubscribe(topic => 'essai/rand');
                    undef $w;
                }
            }
        );
    };
};

This script should be saved with a psgi extension and it should be launched using an asynchronous PSGI server, such as Twiggy. Once it is running, you can load the following JavaScript code in a HTML file:

<script>
var es = new EventSource('http://localhost:5000');
es.addEventListener('data', function(event) {
    console.log(event.data);
});
es.addEventListener('closed', function(event) {
    console.log(event.data);
    es.close();
});
</script>

And this is how we launch our web app:

julio@julio-lap:$ plackup -r -s Twiggy mqtt.psgi 
Watching ./lib mqtt.psgi for file updates.
Twiggy: Accepting connections at http://0.0.0.0:5000/
127.0.0.1 - - [13/Nov/2016:00:10:49 +0100] "GET / HTTP/1.1" 200 - "-" "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:49.0) Gecko/20100101 Firefox/49.0"

The last bit is missing: Sending MQTT messages so that they appear in the console of our beloved browser:

julio@julio-lap:$ mosquitto_pub -t essai/rand -m 49
julio@julio-lap:$ mosquitto_pub -t essai/rand -m 62

When everything works as expected, this is what you get:

Browser screen shot


References

I found the following resources very useful:

  • The script presented here is based on a presentation by Fumiaki Yoshimatsu which is available from GitHub.
  • The PSGI specification, in particular the part describing delayed responses and streaming, will certainly help understanding both the script in this article and that in the presentation above.
  • The documentation for HTTP::ServerEvent contains links to some interesting resources.

And of course, the great Plack, Twiggy, AnyEvent, and AnyEvent::MQTT hide the enormous complexity behind this simple script. My gratitude lies with the authors of these modules.