Why and how we built our own job processing pipeline

Jay Raveendiran, Principal Architect

At Fieldwire, we use Ruby on Rails for our API servers. One of the goals of the backend team is to keep the response time of our API calls low which in turn helps our client apps be responsive and translates to happy and satisfied customers. We achieve this by doing only what is absolutely necessary while serving the API calls and moving expensive or complex pieces to background jobs. We use the wonderful Sidekiq gem for managing our background jobs.

As we started adding more features, the number of requirements for our background job processor kept increasing:

  • Enforce uniqueness based on selected job arguments
  • Enforce uniqueness until processing starts to not duplicate effort
  • Enforce uniqueness while processing to prevent race conditions
  • Allow extending uniqueness locks’ TTL if jobs run longer than expected
  • Allow both immediate and scheduled processing
  • Allow failing jobs to retry with exponential backoff and max retries
  • Allow happens-before or first-ready, first-served ordering

We looked around at the existing options (Sidekiq-unique-jobs, unique jobs add-on to Sidekiq) and used them successfully for some time, but unfortunately they could not be integrated in a straightforward way to match our requirements. This led to us building our own job execution pipeline.

Design goals

  1. Create an easy to understand interface for current and future developers
  2. Leverage production tested and respected gems as much as possible
  3. Minimize overhead added to jobs with such requirements

Deciding on the fundamental building blocks

Due to the scale at which Fieldwire operates, we have to ensure that background jobs are properly distributed over a number of servers. Fortunately, Sidekiq takes care of distribution quite well. Now the challenge became implementing distributed locking to preserve uniqueness across multiple machines. We researched existing solutions and after testing, decided to go forward with redlock-rb which is an implementation of a distributed lock manager built on top of Redis as described by antirez (author of Redis).

The next question to answer was: where do we store the job information for efficient retrieval? We considered a few options: Redis, traditional RDBMS and even NoSQL. It became clear that RDBMS would be the best bet due to the size of the evolving data set, the querying capabilities we were looking for and the familiarity our team already had with RDBMS. We debated whether or not we should have a single table that stores the information for all kinds of jobs but decided against it since having one table per job kind would make it easier to set up job specific indices for fast lookups and migrate existing jobs in the case of payload changes.

In the end, the job tables would be structured as follows:

CREATE TABLE blog_job_infos (
    id bigint NOT NULL,
    -- << fields for carrying job data
    attempts_made integer,
    created_at timestamp without time zone NOT NULL,
    updated_at timestamp without time zone NOT NULL,
    process_after timestamp without time zone,
    failed_at timestamp without time zone,
    processed_at timestamp without time zone
);

Similarly, the corresponding Ruby class for the table defined above would be:

class BlogJobInfo < ActiveRecord::Base
  Lib::UniqueUntilExecutedJob.setup(self, unique_fields: [:blog_id])
  Lib::UniqueWhileExecutingJob.setup(
    self,
    unique_fields: [:blog_id],
    max_reattempts: 5,
    order_related_jobs_by_field: :process_after,
    Sidekiq_queue: :blog_processor
  )

  def execute_job_lock_ttl
    # Job specific TTL for the distributed lock
  end

  def execute_unprocessed_job_on_lock(&heart_beat)
    # Work gets done here. This would be called only after acquiring the lock
  end
end

This succinct approach is possible because enforcing uniqueness before and during the execution of jobs is implemented in Lib::UniqueUntilExecutedJob and Lib::UniqueWhileExecutingJob, simple APIs that can be utilized independently of each other.

End to end flow

  1. Calling BlogJobInfo.create_if_no_scheduled_job_by_unique_fields creates a new job if an identical job isn’t already present (this could be triggered by an API call, webhook or cron job)
  2. Any new job creation triggers BlogJobInfo.execute_qualifying_jobs to find the top n jobs that are to be worked on (n is the degree of concurrency which could be different per job)
  3. On successfully acquiring a job specific queue lock computed using unique_fields of the job, it is added to one of the Sidekiq queues
  4. Once a Sidekiq worker picks up one such enqueued job, it calls blog_job_info.execute which will acquire the execution lock and give up the queue lock before letting the job do its work
  5. Once the job finishes it is marked as processed, the execution lock is released, and checks are done to see if there are more jobs to be executed

How it works

It can be seen from the above flow that we employ two locks (acquired using redlock-rb) for each job: a queue lock and an execution lock. It would be possible to design the system so it uses a single lock but we chose not to do this since we would then have to manage the state of the lock (required for heart_beating which we expand upon in the next paragraph) while the job is waiting in the queue to be picked up by one of the Sidekiq workers. Using two locks has the side-effect of letting the next job be enqueued while a job from the same group is being executed. In this case, even if the enqueued job gets picked up before the executing job finishes, it won’t be able to acquire the execution lock and therefore will not enter its critical section and be marked as processed.

Upon closer inspection, one can see that the method execute_unprocessed_job_on_lock takes in a callable argument: heart_beat. By default, we assign a TTL for the locks, which is specified by the execute_job_lock_ttl method. However, jobs can sometimes take longer than this specified TTL. In such cases, the lifetime of the acquired lock can be extended by calling the passed in heart_beat. This is one of the options provided by redlock-rb. If the attempt to beat the heart succeeds, the job can carry on. However, if it fails, an error will be raised in order to prevent further execution. For example, in one of our jobs we need to traverse down a tree and work on each node but without the knowledge of the tree height or how long each node operation takes. In this particular scenario, heart_beat is called before handling every node in order to keep the lock alive.

blog_job_info.execute, in addition to handling the acquisition of the lock, executing the critical section and marking the job as processed, also takes on one more important role: updating attempts_made and process_after (with exponential backoff) in case of any failures. If the maximum number of attempts have been made, the job will be marked as failed by setting failed_at.

Our requirement to keep jobs unique until processing starts can be implemented by inserting the job only if there are no equivalent scheduled, unprocessed jobs. For highly concurrent systems like ours, such simple checks wouldn’t prevent all the race conditions, but we don’t mind the occasional slippage. This is because even if two equivalent jobs get enqueued, the uniqueness guaranteed during execution will make these jobs sequential and so we can short circuit the second job as required.

The remaining requirements (processing order and immediate vs scheduled processing) can be achieved by using appropriate SQL constructs and queries. This is one place where choosing RDBMS as the backing storage solution is particularly advantageous.

Did we meet the requirements?

Enforce uniqueness only based on selected job arguments

Yes! Using the unique_fields argument in SELECT statements.

Enforce uniqueness until processing starts to not duplicate effort

Yes! INSERTing only after confirming with a SELECT.

Enforce uniqueness while processing to prevent race conditions

Yes! Using queue and execution locks acquired using redlock-rb.

Allow extending uniqueness lockss TTL while processing if jobs run longer than expected

Yes! Using heart_beat passed into execute_unprocessed_job_on_lock.

Allow both immediate and scheduled processing

Yes! By setting process_after to either null or to a required timestamp.

Allow failing jobs to retry with exponential backoff and max retries

Yes! By tracking the number of attempts in attempts_made.

Allow creation (for happens-before) or processing (first-ready, first-served) time based ordering

Yes! By ordering based on what is passed in for order_related_jobs_by_field.

Did we satisfy the design goals?

Create an easy to understand interface for current and future developers

Yes! All it takes is creating a new table & setting up the 2 modules with simple APIs: Lib::UniqueUntilExecutedJob, Lib::UniqueWhileExecutingJob

Leverage production tested and respected gems as much as possible

Yes! Sidekiq and redlock-rb form the fundamental building blocks.

Minimize overhead added to jobs with such requirements

Yes! We added just a few DB operations and round trips to Redis to build this on top of the existing pipeline without disturbing the jobs that don’t care about these uniqueness requirements. We also prune the finished jobs from the tables on a regular basis.

Potential pipeline improvements

A sample of improvements that can be made to this pipeline:

  • The degree of concurrency can be made dynamic. For example, based on current volume.
  • Reduce round trips to Redis by attempting locks on multiple resources.
  • Horizontal scaling by sharding based on unique_fields of jobs.

Real-world results

At the time of writing, this pipeline has been in production use for almost two and a half years! During this time, it has helped to:

  1. Increase consistency by guaranteeing determinism
  2. Eliminate wasted effort due to duplicate jobs
  3. Reduce bugs by preventing race conditions
  4. Reduce time to market for features by providing a simple interface for a complex problem