Tag: benchmark

Reduce your method calls by 99.9% by replacing Thread#pass with Queue#pop

When doing multi-threaded work in Ruby, there are a couple of ways to control the execution flow within a given thread. In this article, I will be looking at Thread#pass and Queue#pop and how understanding each of them can help you drastically optimize your applications.

Thread#pass - what it is and how does it work

One of the ways you can ask the scheduler to "do something else" is by using the Thread#pass method.

Where can you find it? Well, aside from Karafka, for example in one of the most recent additions to ActiveRecord called #load_async (pull request).

Let's see how it works and why it may or may not be what you are looking for when building multi-threaded applications.

Ruby docs are rather minimalistic with its description:

Give the thread scheduler a hint to pass execution to another thread. A running thread may or may not switch, it depends on OS and processor.

That means that when dealing with threads, you can tell Ruby that it would not be a bad idea to switch from executing the current one and focusing on others.

By default, all the threads you create have the same priority and are treated the same way. An excellent illustration of this is the code below:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    # Make threads wait for a bit so all threads are created
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

# for i in {1..1000}; do ruby threads.rb; done > results.txt

on average, the computation in each of them took a similar amount of time:

The difference in between the fastest and the slowest thread was less than 8%.

However, when one of the threads "passes," things change drastically:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      Thread.pass if i.zero?

      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

Now, thread zero takes twice as much time as other threads doing the same job.

What is worth pointing out is that this method does not stop the execution flow by itself, and it just suggests to Ruby that there may be other more important things to do.

Exactly this behaviour was used by Jean Boussier in ActiveRecord:

def schedule_query(future_result) # :nodoc:
  @async_executor.post { future_result.execute_or_skip }
  Thread.pass
end

This code schedules a background job and suggests to the scheduler that it may be worth doing that or other things somewhere else.

It is worth mentioning, that when all the threads use the Thread#pass, it becomes a colossal burden to the Ruby VM. Ruby goes crazy since none of the threads wants to do any work and the execution time increases over 100 times.

Queue#pop - What it is and how does it work

Queue is a well known class and #popis one of the most important methods it contains.

Here is what Ruby docs say about the Queue class and the #pop method:

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

#pop: If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

When asked about queues, most programmers think about workers consuming jobs from a queue:

numbers = Queue.new

threads = 10.times.map do |i|
  Thread.new do
    while number = numbers.pop
      result = Time.now.to_f / number

      # a bit of randomness
      sleep(rand / 1_000)

      puts "Thread #{i},#{result}"
    end
  end
end

10_000.times { numbers << rand }

# see what I did here? ;)
Thread.pass until numbers.empty?

numbers.close

threads.each(&:join)

What is worth keeping in mind about Queue#pop is that it will block the execution in a given thread until there is something to do. This means, that a blocked thread becomes almost "invisible" from the performance perspective. Here's an example of running computations with 0 , 4, 9 and 99 blocked threads:

queue = Queue.new

THREADS = 4

THREADS.times do
  Thread.new { queue.pop }
end

# Wait until all the threads are initialized
Thread.pass until queue.num_waiting == THREADS

start = Time.now.to_f

10_000_000.times do
  start / rand
end

puts Time.now.to_f - start

As you can see, inactive threads do not have a big impact on the overall performance of this code. Even with 99 extra threads, the end result is not far away from the baseline.

Reducing method calls in a multi-threaded environment

Now that you know what Thread#pass and Queue#pop do, lets put them to work in a real use case. For that to happen we will be looking into the Karafka framework.

Karafka is a framework used to simplify Apache Kafka-based Ruby applications development that I built. The version 2.0 supports work distribution across multiple threads. The way it works from a data processing perspective is quite simple:

1. Take some data from Kafka
2. Divide it into processing units (jobs)
3. Put all the jobs into a queue
4. Wait for all the workers to pick the jobs and finish all the work
5. Repeat endlessly

Assuming an endless stream of data available, this can be pretty much modelled as followed:

queue = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
    end
  end
end

def wait_for_jobs_to_finish(queue)
  Thread.pass while queue.num_waiting < THREADS || !queue.empty?
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(queue)
end

And this is how the listener loop together with jobs distributions was implemented by me initially.

When benchmarked in regards to the number of times Thread#pass was executed on a pass-through benchmark (where we measure max throughput), things looked solid.

Despite increased number of iterations, we would not wait more often per iteration. What that means, is that our jobs were short enough for them to finish prior to Ruby returning to the wait loop.

Things become much more interesting if we assume that our jobs take more time than Ruby gives them before thread execution is interrupted. Then things start to look differently:

# Same code as before but the job has a bit of sleep simulating IO
task = ->(data) { sleep(rand(9..11) / 10000.0) }

Assuming we burn around 1ms per job, the number of passes skyrockets:

That's over 1000 times more invocations of the same method!

In a case, where we would run heavy queries of around 100ms (+/- 10%) per job, we end up with following results per iteration:

That means, that Ruby had to run #Thread#pass over 180 000 times on average for nothing!

When optimizing any code, it is good to establish the primary use case for its usage. In the case of Karafka, while raw throughput is important, it is more about complex jobs being able to use the GVL release strategy to allow parallel work execution upon IO.

So, is there a better way to make Ruby wait patiently on all the jobs to be done? There is: Queue#pop. Since it is thread-safe, we can use it to notify the main thread that the given job has finished. It won't eliminate useless runs, but it will reduce them so much that they, in fact, will become insignificant. Since we know how many jobs we've enqueued, we know how many times we need to #pop:

queue = Queue.new
lock = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
      lock << true
    end
  end
end

def wait_for_jobs_to_finish(dispatched, lock)
  dispatched.times { lock.pop }
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(data.size, lock)
end

The lock.pop will stop the execution of the main thread until each job is done. This means that we increase the number of stops with an increased number of threads. However, this correlation is linear and the end result is orders of magnitude smaller than when using Thread.pass.

Here's the same benchmark with a number of Queue#pop calls that replaced Thread#pass for non-sleep case:

The number of Queue#pop invocations equals the thread number. It is independent of jobs types or any other circumstances. So the longer jobs are, the bigger the improvement:

This change not only reduced the number of calls by over 99.994% but it also drastically lowered CPU utilization, which is visible especially for cases with extensive IO (here simulated with sleep):

Summary

So, is one better than the other? No. They should be used in different cases and to achieve different goals.

Thread#pass should not be used to defer work but rather to provide a hint to Ruby, that there may be more important things that it could focus on.

Queue#pop on the other hand can act not only as a component of a queue but also as a part of multi-threaded applications flow control.

Concurrency is not easy. Thread management and selection of proper methods are as crucial as understanding your primary use-cases and building correct benchmarks. Sometimes minor tweaks can provide tremendous benefits.


Note: this post would not be possible without extensive help from Samuel Williams. Thank you!


Cover photo by Chris-Håvard Berge on Attribution-NonCommercial 2.0 Generic (CC BY-NC 2.0) . Image has been cropped.

Benchmarking Karafka – how does it handle multiple TCP connections

Recently I've released a Ruby Apache Kafka microframework, however I don't expect anyone to use it without at least a bit information on what it can do. Here are some measurements that I took.

How Karafka handles multiple TCP connections

Since listening to multiple topics require multiple TCP connections it is pretty obvious that in order to obtain a decent performance, we are using threads (process clustering feature is in progress). Each controller that you create theoretically could have a single thread and could listen all the time. However with a bigger application, it could slow down the application. That's why we introduced topics clusterization. When you config your Karafka application, you should specify the concurrency parameter:

class App < Karafka::App
  setup do |config|
    # Other config options
    config.max_concurrency = 10 # 10 threads max
  end
end

This is a maximum number of threads that will be used to listen for incoming messages. It is pretty simple when you have less controllers (topics) than threads - it will just use a single thread per topic. However if you have more controllers then threads - few connections will be packed in a single thread (wrapped with Karafka::Connection::ThreadCluster). And this is how it works when you have 2 threads and 4 controllers:

clusters

In general, it will distribute TCP connections across threads evenly. So, if you have 20 controllers and 5 threads, each thread will be responsible for checking 4 sockets, one after another. Since it won't do this simultaneously, Karafka will slow down. How much? It depends - if there's something on each of the topics - you will get around 24% (per controller) of the base performance out of each connection.

Other things that have impact on the performance

When considering this framework's performance, you need to keep in mind that:

  • It is strongly dependent on what you do in your code
  • It depends also on Apache Kafka performance
  • Connection between Karafka and Redis (for Sidekiq) is a factor as well
  • All the benchmarks show the performance without any business logic
  • All the benchmarks show the performance without enqueuing to Sidekiq
  • It also depends on what type of infrastructure you benchmark everything
  • Message size is a factor as well (since it get deserialized to JSON by default)
  • Ruby version - I've been testing in on MRI (CRuby) 2.2.3 - Karafka is not yet working with other Ruby distributions (JRuby or Rubinius) but it should change when some of the dependencies stop using refinements

Benchmarking

Methodology

For each of the benchmarks I was measuring time taken to consume all messages that were stored in Kafka. There were no business logic involved (just messages processing by the framework). My local Kafka setup was a default setup (no settings were changed) introduced with this Docker containers.

I've tested up to 5 topics - each with 1 000 000 messages loaded. Since Karafka has lazy loading for params - benchmark does not include time that is needed to unparse the messages. Unparsing performance strongly depends on a parser you pick (defaults to JSON) and messages size. Those benchmarks measure maximum throughput that we can get during messaging receiving.

Note: all the benchmarking was performed on my 16GB, 4 core i7 processor, Linux laptop. During the benchmarking I've been performing other tasks that might have small impact on overall results (although  no heavy stuff).

1 thread

With a single thread it is pretty straightforward - the more controllers we have, the less we can process per controller. There's also controllers context switching overhead that consumes some of the power, allowing us to consume less and less. Switching between controllers seems to consume around 11% of a single controller performance when we tend to use more than 1 controller in a single threaded application.

Zrzut ekranu z 2015-11-02 17:50:46
Context switching between controllers in a single thread will cost us around 1% of a general performance per one additional controller (if you're eager to know what we're planning to do with it scroll down to the summary). On one side it is a lot, on the other, with a bigger application you should probably run Karafka in multithreaded mode.. That way context switching won't be as painful.

2 threads

Zrzut ekranu z 2015-11-02 18:12:37
General performance with 2 threads and 2 controllers proves that we're able to lower switching impact on a overall performance, gaining around 1.5-2k requests per second (overall).

3 threads

Zrzut ekranu z 2015-11-02 18:23:13
5 controllers with 3 threads vs 5 controllers with 1 thread: 7% better performance.

4 threads

Zrzut ekranu z 2015-11-02 18:32:40

5 threads

Zrzut ekranu z 2015-11-02 18:33:33

Benchmarks results

Summary

The overall performance of a single Karafka framework process is highly dependent on the way it is being used. Because of GIL, when we receive data from sockets, we can only process incoming messages from a single socket at a time. So in general we're limited to around 30-33k requests per second per process. It means that the bigger the application gets, the slower it works (when we consider total performance per single controller). However this is only valid when we assume that all the topics are always full of messages. Otherwise we don't process, we wait on the IO and Ruby can process incoming messages from multiple threads. That's why it is worth starting Karafka with a decent concurrency level.

How can we increase throughput for Karafka applications? Well for now, we can create multiple partitions for a single topic and spin up multiple Karafka processes. Then they will load balance between partitions automatically. This solution has one downside: if we have only few topics with multiple partitions and rest with a single one, then some of the threads in Karafka won't perform any work. This will be fixed soon (we're already working on it), when we will introduce a Karafka processes clustering. It will allow to spin up multiple Karafka processes (in a single cluster) that will listen only for a given part of controllers. That way the overall performance will increase significantly. But still being able to perform 30k rq/s is not that bad, right? ;)

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑