Fibering Rail’s ConnectionPool aka EventMachine+Fibers is still complicated

One of the arguments I frequently hear against threads is that the synchronization problems are a pain in the ass to deal with. They are hard to reproduce, hard to debug, blah blah, etc etc.

Well it turns out, I am kinda tired of dealing with broken threads in Ruby and also threadsaftey issues in gems which I don’t have control over.

So I’ve been playing around with EventMachine+Fibers and came across the em_postgresql gem by Mike Perham (incidentally, an old coworker of mine). Sweet! Mike has done a lot of the initial leg work for making the EM+Fibers paradigm usable in real world Ruby/Rails applications, thanks Mike!

We’ll be using his em_postgresql gem as a case study to show how some of the threading synchronization complexities are still present when using EventMachine+Fibers. Let’s get started.

So Rails uses a database connection pool that assumes you’re running in a threaded environment. Part of the em_postgresql gem is to hack ActiveRecord::ConnectionAdapters::ConnectionPool to use fibers instead of threads (to be “fiber aware” as is sometimes said). Well, unfortunately it doesn’t work, as this code snippet demonstrates:

gem "postgres-pr"
gem "em_postgresql"
require "eventmachine"
require "fiber"
require "active_record"
require "benchmark"

ActiveRecord::Base.logger = Logger.new(STDOUT)
ActiveRecord::Base.establish_connection :adapter  => "em_postgresql",
                                        :port     => 5432,
                                        :pool     => 2,
                                        :username => "cjbottaro",
                                        :host     => "localhost",
                                        :database => "test",
                                        :timeout  => 3

EM.run do
  Fiber.new do
    fibers = []
    time = Benchmark.realtime do
      fibers << Fiber.new{ ActiveRecord::Base.connection.execute "select pg_sleep(1)" }.tap{ |fiber| fiber.resume }
      fibers << Fiber.new{ ActiveRecord::Base.connection.execute "select pg_sleep(1)" }.tap{ |fiber| fiber.resume }
      fibers << Fiber.new{ ActiveRecord::Base.connection.execute "select pg_sleep(1)" }.tap{ |fiber| fiber.resume }
      fibers.each do |fiber|
        while fiber.alive?
          current_fiber = Fiber.current
          EM.next_tick{ current_fiber.resume }
          Fiber.yield
        end
      end
    end
    puts time
    EM.stop
  end.resume
end

We specify our our pool size to be 2 and we spawn 3 fibers. We expect that only two connections to the database are made. We also expect that the first two SQL statements run in parallel, then the third runs after they are done, giving us a total running time of ~2 seconds.

This is what we get instead:

$ ruby em_postgres.rb 
Connecting to localhost:5432
Connecting to localhost:5432
Connecting to localhost:5432
  SQL (0.6ms)   SET client_min_messages TO 'panic'
  SQL (1.0ms)   SET client_min_messages TO 'panic'
  SQL (1.1ms)   SET standard_conforming_strings = on
  SQL (0.4ms)   SET client_min_messages TO 'notice'
  SQL (1.3ms)   SET standard_conforming_strings = on
  SQL (0.8ms)   SET client_min_messages TO 'panic'
  SQL (1.0ms)   SET client_min_messages TO 'notice'
  SQL (1.0ms)   SET standard_conforming_strings = on
  SQL (0.5ms)   SET client_min_messages TO 'notice'
  SQL (1001.1ms)   select pg_sleep(1)
  SQL (1001.0ms)   select pg_sleep(1)
  SQL (1000.7ms)   select pg_sleep(1)
1.0179660320281982

Three connections are made and all 3 statements run in parallel giving us a running time of ~1 second. What went wrong?

First let’s look at ConnectionPool#checkout.

      def checkout
        # Checkout an available connection
        @connection_mutex.synchronize do
          loop do
            conn = if @checked_out.size < @connections.size
                     checkout_existing_connection
                   elsif @connections.size < @size
                     checkout_new_connection
                   end
            return conn if conn
            # No connections available; wait for one
            if @queue.wait(@timeout)
              next
            else
              # try looting dead threads
              clear_stale_cached_connections!
              if @size == @checked_out.size
                raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}.  The max pool size is currently #{@size}; consider increasing it."
              end
            end
          end
        end
      end

em_postgresql hacks @connection_mutex.synchronize to do nothing. I think the rational is that we don’t have to worry about synchronization problems (like when using threads) because we won’t be preempted unless we explicitly say to in the code (by calling Fiber.yield).

The problem is that we are unknowingly calling methods that deep down eventually call Fiber.yield.

Considering our example case, when we call checkout, the condition elsif @connections.size < @size is met and we call checkout_new_connection which is defined as follows:

      def checkout_new_connection
        c = new_connection
        @connections << c
        checkout_and_verify(c)
      end

Fiber.yield is called not once by twice in that unsuspecting bit of code. Remember that the whole point of em_postgresql is to allow network IO to occur in parallel. Creating a new connection to the database falls under that category, so new_connection is calling Fiber.yield… and the key thing to notice is that the fiber is yielding before @connections is modified. Thus our next fiber will call checkout and hit that same condition as before elsif @connections.size < @size and end up in new_connection where the same thing will happen again with the 3rd fiber. And thus our connection pool’s purpose is defeated.

The second time Fiber.yield is called in that code is in checkout_and_verify(c) which eventually calls #verify! on the connection. em_postgresql defines #verify! to issue a SELECT 1 statement to the database, which is network IO, thus Fiber.yield is called. em_postgres caught the issue with this and overwrote checkout_and_verify(c) as follows:

      def checkout_and_verify(c)
        @checked_out << c
        c.run_callbacks :checkout
        c.verify!
        c
      end
    end

It differs from the original in that @checked_out is modified before calling c.verify! (which yields the fiber). Thus any fibers that enter checkout will see the new size of @checked_out when evaluating the if statements.

This same strategy needs to be applied to checkout_new_connection and @connections. The problem lies in that @connections is a list of connections and the act of simply creating a connection yields the fiber, so it’s impossible to modify @connections before yielding the fiber.

I’ll leave it as an exercise for the reader to come up with a working solution (it’s not too hard).

In conclusion, even without random preempting, as is the case with cooperative concurrency, it is still somewhat difficult to manage synchronization issues… at least I think so.

1 Comment

  1. Hello. My fiber aware implementation of synchronize primitives — https://github.com/prepor/em-synchrony/blob/master/lib/em-synchrony/thread.rb

Leave a Reply