Tag: Sidekiq

Sidekiq Unique Jobs: don’t waste your time waiting – reschedule if busy

Half a year ago, I've made a post about Enforcing unique jobs in Karafka and Sidekiq for single resources. This approach is great, however, there's a particular case in which Sidekiq Unique Jobs can block all of your Sidekiq workers except one. This can significantly limit your computing power without you being aware of it.

When using Sidekiq Unique Jobs with a WhileExecuting strategy, only a single worker can start processing Sidekiq job with a given unique key. This is really helpful when you work with resources for which you cannot perform parallel operations (for example when you work with a database for which there are no atomic operations but you need to increment counters), as doing so could overwrite results from an other worker. A WhileExecuting strategy, with a properly defined unique key can help you prevent that from happening. However...

Problem definition

Sidekiq jobs are being consumed out of a FIFO queue. Without any additional modifications, situation is pretty clear: having 4 single threaded processes each allows you to process 4 jobs at the same time.

Everything changes, when you decide to add Sidekiq Unique Jobs to your stack. In a case, when there are multiple jobs in sequence for a given unique key, Sidekiq will get seriously clogged. In the worst scenario, it won't matter how many workers and threads you have in your Sidekiq infrastructure. You will get a performance of a single Sidekiq thread. It is because you cannot make Sidekiq skip certain tasks because of FIFO queue processing. To bypass this limitation, authors of Sidekiq Unique Jobs introduced a #sleep that will run up until the resource is free again to be processed or until timeout occurs. This approach means, that if you have more tasks in queue than processors, they will have to wait until all the jobs with a given unique key are processed.

All the workers will actively wait (meaning that in Sidekiq console you will see them marked as busy) up until a lock is released.

Solution: reschedule instead of waiting

Warning: if a similar case occurs in your business logic quite often, probably you will be better taking engine different than Sidekiq. I would recommend this solution for non-frequent edge/corner cases.

Bypassing that behavior is pretty easy: if there's a lock, put the current job at the end of the queue. That said, your jobs will be checked for possibility of execution and rescheduled back instead of waiting. This will mess up your queue counters a bit (as you will have more jobs enqueued and processed that it should) but on the other hand it means that Sidekiq will "actively" seek for resources on which it can work in a certain moment.

To do so, we can create a new Unique Job strategy that we can later on apply. Apart from rescheduling, our strategy won't differ from the WhileExecuting, so we can use it as a base.

module SidekiqUniqueJobs
  module Lock
    class WhileExecutingReschedule < WhileExecuting
      MUTEX = Mutex.new

      def synchronize
        MUTEX.lock

        if (@locked = locked?)
          yield
        else
          # We use sleep just to prevent from a pointless, extremely fast looping
          # in case all the jobs have the same unique key
          sleep 0.1
          @item['class'].constantize.perform_async(*@item['args'])
        end
      rescue Sidekiq::Shutdown
        logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
        raise
      ensure
        # If we were able to obtain lock, we need to release it after processing
        if @locked
          SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest }
        end

        MUTEX.unlock
      end
    end
  end
end

Applying this strategy is super easy. We just need to replace while_executing strategy with while_executing_reschedule:

class ApplicationWorker
  include Sidekiq::Worker

  # sidekiq_options unique: :while_executing
  sidekiq_options unique: :while_executing_reschedule
end

Cover photo by: Alexandre Duret-Lutz on Creative Commons 2.0 license. Changes made: added an overlay layer with an article title on top of the original picture.

Enforcing unique jobs in Karafka and Sidekiq for single resources

Note: For the case of simplicity I skipped some of the corner cases to make this article less complicated and more understandable.

When working with multi threaded and multi process systems that consume messages in parallel, you need to be able to enforce some limitations on the processing order.

Most of the time, in well designed systems, things should be based on atomic (in terms of not being dependent on any other worker job) jobs that can run whenever they need to. However, in some cases you need to make sure that jobs related to a given resource are not executed at the same time. This can be achieved with Karafka and Sidekiq in 3 ways:

  • Standalone Karafka mode - single Karafka process will consume messages one by one and process them inline (without using Sidekiq at all). Since Kafka gives us a per partition ordering, same applies to standalone Karafka mode.
  • Karafka default mode + single Sidekiq worker with a single thread. Since single threaded Sidekiq process uses FIFO, this can be achieved with this combination.
  • Karafka + Sidekiq + SidekiqUniqueJobs gem. This combination allows us to build complex multi threaded and multi process workers that will ensure that a single resource is being modified by maximum 1 worker at the same time (while executing uniqueness).

In this article I will focus on the last option, as it is the best in terms of performance and flexibility.

A bit about state machines

Having a single resource on which many actions can be performed is always a problem. To ensure (on an object level) that we won't process things in parallel we can use state machines (aasm is a great gem for that) to achieve locking. However, if we only do that, we will have to implement a bit more logic when considering a possibility of many jobs doing same stuff at the same time:

  • Detecting and waiting if someone else is already using (whatever that means) a given resource.
  • Retrying in case of state machine failure (invalid transitions).

But hey! We use Sidekiq! It means that retrying is already built in and it will restart the job later on. It also means, that at some point, the state machine lock will be removed and we will be able to finish all the jobs.

That is true, however we should not rely on that because a case like that shouldn't be considered an exception. Instead we should prevent situations like that from happening at all.

Problem definition

Let's say we have a RemoteResource representation that needs to be refreshed periodically (every 5 seconds) and we need to make sure that we never overwrite it's content with older data.

class RemoteResource < ActiveRecord::Base
  include AASM aasm do
    state :initial, initial: true
    state :running

    event :run do
      transitions :from => :initial, :to => :running
    end

    event :finish do
      transitions :from => :running, :to => :initial
    end
  end

  def refresh!
    # Some external API calls and other business stuff
    # @note I know that this shoould be in a service but
    # again, lets keep things simple
    update!(content: RemoteData.fetch(id))
  end
end

Happy, single threaded execution example:

resource = RemoteResource.find(params[:id])
resource.run!
resource.refresh!
resource.finish!

This code will run without problems as long as:

  • Our processing (including external API call) does not take more time than 5 seconds
  • Sidekiq workers are up and running (at least matching the speed of enqueuing)

But lets kill all the workers for 1 minute and run them again. We end up with a queue full of messages that will be processed at the same time. And then we might have jobs for the same object enqueued one after another, which means that they will be executed at the same time:

untitled-diagram
In a case like that, both workers will overlap processing the same resource. It means that we might end up with outdated data (network delay for an older resource) or a state machine exception. In more complex cases it could mean corrupted data.

Since Karafka passes messages into Sidekiq, this problem can occur there as well. The worse part is that it can get much worse when we perform many types of tasks on a single resource and they should never overlap.

Solution: SidekiqUniqueJobs

SidekiqUniqueJobs is a great gem that solves exactly this issue. It makes sure (among other options) that for a given set of arguments, only a single job can run at the same time. In our case  unique arguments would be:

  • Worker name
  • RemoteResource#id

This is enough to competeour task! Jobs for different resources will be processed at the same time, but no more parallel processing on the same RemoteResource.

untitled-diagram1

Implementation in Karafka application

To use this gem with Karafka application, you need to build a uniqueness scope before a job is enqueued. This can be done inside the #before_enqueue block:

class KarafkaController < Karafka::BaseController
  before_enqueue do
    params[:uniqueness_scope] = params.dig(:remote_resource, :id)
  end
end

you also need to configure your Karafka base worker to set uniqueness key based on this param value:

class ApplicationWorker < Karafka::BaseWorker
  sidekiq_options unique: :while_executing

  def self.unique_args(args)
    [
      args.first, # This is this worker name
      args.last['uniqueness_scope'] || SecureRandom.uuid
    ]
  end
end

Note, that we use SecureRandom.uuid as a fallback for jobs for which we don't want to make uniqueness execution. Random uuid will ensure that all the jobs without a uniqueness_scope can perform at the same time. If we would leave nil, it would be treated as any other value so no jobs would run in parallel.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑