Recording implemented ideas.

Live streaming with Rails application

02 Nov 2013

Data streaming is a very useful content delivery strategy for some web applications. While live streaming was possible in Rails applications for quite a while, in most cases integrating such strategy into existing stack can be a trial task.

The project that I'm working on right now has a live dashboard page, couple days ago we decided to refine it, so I started thinking about possible solution. This page has a simple requirements:

Currently this page uses juggernaut in order to avoid blocking connections. Juggernaut is a Node.js web socket application with Redis for message queuing. It also provides simple API for Rails applications. Unfortunately, it was deprecated and was hard to integrate into authorisation system (which is based on Devise and CanCan).

Streaming options overview

As it follows from the requirements dashboard page does not require duplex channel. Using raw web sockets involves implementing messaging protocol which wasn't an option. We could use something like faye to get a message channels, but it could bring unnecessary complexity during deployment and authentication. So I would prefer to use HTTP transport if possible.

AJAX based pooling strategy was a possible option, but Server-Sent Events option looked more interesting, since browser will handle many edge cases like connects and disconnects. So I felt like SSE satisfies all requirements and it was a lightweight solution.

Long lasting connections in Ruby applications

While we could implement any streaming strategy in Rails application, it was obvious that it will bring blocking IO. Ruby provides several possible solutions to blocking IO:

We are using open source Passenger with MRI in production, so I knew that in order to get decent concurrent IO we had to change this stack to Passenger Enterprise or Puma with potentially different interpreter. Due to this fact and assumption that we will be sending small json documents to the client application without additional processing, I decided to take approach with non-blocking IO.

There is no much options for non-blocking IO in Ruby application, so my obvious choice was EventMachine.

EventMachine is an event-driven I/O and lightweight concurrency library for Ruby. It provides event-driven I/O using the Reactor pattern.

As I mentioned, it requires different web server, I decided to use Thin which is based on EventMachine, so I won't have to start run loop manually.

Implementing SSE with EventMachine

So far architecture decisions do not require changes to the existing Rails application. Eventmachine will be a standalone application, that needs some sort of enqueueing layer. Simple PubSub implementation should be enough for such operation. We already had Redis installed on our servers for Sidekiq and other operations, so I decided to use Redis PUBLISH and SUBSCRIBE commands.

Implementing EventMachine entry point

I started with simple EventMachine based application.

require 'thin'
require 'eventmachine'

EM.run do
  EM.add_periodic_timer(1) do
  end

  Signal.trap("INT")  { EM.stop }
  Signal.trap("TERM") { EM.stop }
end

Here I started EventMachine run loop with EM.run. In block I subscribed with Signal.trap to the system signals in order to stop run loop. Also I created a timer (EM.add_periodic_timer) with 1 second interval, which will be used for simulating messages.

Simple PubSub with Redis

After that I implemented simple PubSub wrapper. Since I used EvenMachine, I decided to go with non-blocking IO operations for Redis too. I did that with em-synchrony, redis gem already has support for em-synchrony.

require 'redis'
require 'em-synchrony'

class PubSub
  def initialize(channel)
    @client  = Redis.new
    @channel = channel
  end

  def publish(message)
    EM.synchrony do
      @client.publish(@channel, message)
    end
  end

  def subscribe(&b)
    return unless block_given?
    EM.synchrony do
      @client.subscribe(@channel) do |on|
        on.message do |channel, message|
          yield(message)
        end
      end
    end
  end

  def unsubscribe
    EM.synchrony do
      @client.unsubscribe(@channel)
    end
  end
end

This is a simple class with 3 methods: publish, subscribe, unsubscribe. I also wrapped all commands in EM.synchrony block in order to perform non-blocking operations.

Integrating Sinatra into EventMachine

The only thing left is the actual SSE handling code. It should response to the connections on the specific path. Once client connected it should subscribe to the specified Redis channel and write received message until connection is closed. While I could do that with EvenMachine, this process could be quite tedious, I wanted to have higher level API for connection buffers and request lifecycle. I knew that Sinatra had such API and it integrated well with Rails and EventMachine, so I decided to use it for SSE.

require 'sinatra/base'
require './pub_sub'

class SSE < Sinatra::Base
  disable :run

  get '/stream/test' do
    erb :index
  end

  get '/stream/:channel' do
    channel = params[:channel]
    pub_sub = PubSub.new(channel)

    content_type 'text/event-stream'
    stream :keep_open do |out|
      pub_sub.subscribe do |message|
        if out.closed?
          pub_sub.unsubscribe
          next
        end

        out << "event: #{channel}\n\n"
        out << "data: #{message}\n\n"
      end
    end
  end
end

__END__
@@ index
<article id="log"></article>
<script>
  var source = new EventSource('/stream/channel_1');
  source.addEventListener('message', function (event) {
    log.innerText += '\n' + event.data;
  }, false);
</script>

I created new Sinatra::Base application and disabled auto run (disable :run), since I will start this application from EventMachine. I defined /stream/:channel route, which will create a new PubSub client for the specified channel and will start streaming (stream :keep_open). Once connection is closed it will close PubSub connection. I also defined /stream/test route in order to test application. The only part left is to start application from EventMachine.

require 'thin'
require 'eventmachine'
require './pub_sub'
require './sse'

pub_sub = PubSub.new('channel_1')
EM.run do
  EM.add_periodic_timer(1) do
    pub_sub.publish("foo#{rand(10)}")
  end

  Thin::Server.start(SSE, '0.0.0.0', 4567)

  Signal.trap("INT")  { EM.stop }
  Signal.trap("TERM") { EM.stop }
end

I started Thin server with application on port 4567 and also added simple random data output pub_sub.publish("foo#{rand(10)}") for channel_1.

Integrating into Rails application. The simple case.

All things that I did so far were outside of the Rails application. There are 2 ways to integrate this solution into existing application. Using Thin in production is one of them and this case is really simple. I will test authorisation with Devise example application, which is a simple Rails 4 application.

Since Sinatra is just a Rake compatible engine, I can mount it in router:

require 'see'

authenticate :user do
  mount SSE => '/stream'
end

This route will be only available for authenticated users, I could leverage Warden or CanCan checks with the similar way. I could also simulate data with PubSub and non-blocking Redis client, I will demonstrate that with custom initialiser:

require 'pub_sub'

pub_sub = PubSub.new('channel_1')
EM.next_tick do
  EM.add_periodic_timer(1) do
    pub_sub.publish("foo#{rand(10)}")
  end
end

I used EM.next_tick instead of EM.run, since Thin will start EventMachine for me.

Unfortunately, using Thin in production is not always an option. We won't be able integrate EventMachine based application into other stacks so easy, but there is still a way to do that. I will cover that in my next post.

« Back to blog