⚡️ dkam ( Posted by: dkam ) :

Converting a queue of parallel API requests into serial request

One of the systems I need for a site like Booko, is a queuing system. As requests to view books and their prices arrive, we queue up any book with stale prices, pushing their ISBN into Beanstalkd. These price refresh requests are picked up by one of many multithreaded price lookup processes. Beanstalkd and the daemons have been working together nicely since 2010.

Recently, however, I became aware that one of the many APIs we use to get prices had introduced a throttle, restricting the speed of incoming API requests resulting in 503 type errors.

Too many people wanted to know how much books cost.

The pricing daemon processes queue up and wait for requests, when they pick up a request, they spawn a thread per-shop. The threads look up prices and are then collected, the prices collated and saved to the database. The existing code expects to handle the entire lifecycle of the prices

  • Creating the new price
  • Handing the price over to the shop model to get the data
  • Receiving the result back and performing some final checks
  • Setting the price state to ready

Here's the current arrangement:

The API we're talking to requires us to serialise and throttle all the request which come to that shop. This specific API has a very handy feature I'll make use of - it allows submitting 10 lookups in a single query.

The solution I implemented was to make use of Redis and its Pubsub system. First, the price grabbing daemon pushes the ISBN into a Redis list.

redis.rpush('shop_queue', '9781743535875')

Once that's in the queue, we subscribe to the channel '9781743535875' and wait for the response from Redis.

  redis.subscribe_with_timeout(15, '9781743535875') do |on|
    on.message do |channel, msg|
      shop_data = JSON.parse(msg)
      price.update_attributes( shop_data )  unless shop_data.nil?

When the message arrives, we parse the JSON data and update the price model with the data, then we unsubscribe from Redis.

The new daemon reads up to 10 ISBNS from the list and removes them from the list. It uses a pipelined command to ensure the two actions are atomic.

query_size = 10

redis.multi do
  isbns = redis.lrange "shop_queue", 0, (query_size - 1)
  redis.ltrim 'shop_queue', (isbns.length ), -1

The daemon then makes the API requests, looking up the data. After performing the lookup, the daemon publishes the data into the ISBN named channel.

redis.publish('9781743535875', price_data.to_json)

Here's a diagram:

The complete setup is like this :