A Thread Pool is an abstraction that you can give a unit of work to, and the work will be executed by one of possibly several threads in the pool. One motivation for using thread pools is the overhead of creating and destroying threads. Creating a pool of reusable worker threads then repeatedly re-using threads from the pool can have huge performance benefits for a long-running application like a service.
concurrent-ruby also offers some higher level abstractions than thread pools. For many problems, you will be better served by using one of these -- if you are thinking of using a thread pool, we especially recommend you look at and understand Futures before deciding to use thread pools directly instead. Futures are implemented using thread pools, but offer a higher level abstraction.
But there are some problems for which directly using a thread pool is an appropriate solution. Or, you may wish to make your own thread pool to run Futures on, to be separate or have different characteristics than the global thread pool that Futures run on by default.
Thread pools are considered 'executors' -- an object you can give a unit of work to, to have it executed. In fact, thread pools are the main kind of executor you will see - others are mainly for testing or odd edge cases. In some documentation or source code you'll see reference to an 'executor' -- this is commonly a thread pool, or else something similar that executes units of work (usually supplied as Ruby blocks).
A FixedThreadPool contains a fixed number of threads. When you give a unit of work to it, an available thread will be used to execute.
pool = Concurrent::FixedThreadPool.new(5) # 5 threads pool.post do # some parallel work end # As with all thread pools, execution resumes immediately here in the caller thread, # while work is concurrently being done in the thread pool, at some possibly future point.
What happens if you post new work when all (e.g.) 5 threads are currently busy? It will be added to a queue, and executed when a thread becomes available. In a
FixedThreadPool, if you post work to the pool much faster than the work can be completed, the queue may grow without bounds, as the work piles up in the holding queue, using up memory without bounds. To limit the queue and apply some form of 'back pressure' instead, you can use the more configurable
ThreadPoolExecutor (See below).
If you'd like to base the number of threads in the pool on the number of processors available, your code can consult Concurrent.processor_count.
FixedThreadPool is based on the semantics used in Java for java.util.concurrent.Executors.newFixedThreadPool(int nThreads)
A CachedThreadPool will create as many threads as necessary for work posted to it. If you post work to a
CachedThreadPool when all its existing threads are busy, it will create a new thread to execute that work, and then keep that thread cached for future work. Cached threads are reclaimed (destroyed) after they are idle for a while.
CachedThreadPools typically improve the performance of programs that execute many short-lived asynchronous tasks.
pool = Concurrent::CachedThreadPool.new pool.post do # some parallel work end
The behavior of
CachedThreadPool is based on Java's java.util.concurrent.Executors.newCachedThreadPool()
If you'd like to configure a maximum number of threads, you can use the more general configurable
A ThreadPoolExecutor is a general-purpose thread pool that can be configured to have various behaviors.
ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by
When a new task is submitted and fewer than
min-threads threads are running, a new thread is created to handle the request, even if other worker threads are idle.
If there are more than
min-threads but less than
max-threads threads running, a new thread will be created only if the queue is full.
FixedThreadPool are simply
ThreadPoolExecutors with certain configuration pre-determined. For instance, to create a
ThreadPoolExecutor that works just like a
FixedThreadPool.new 5, you could:
pool = Concurrent::ThreadPoolExecutor.new( min_threads: 5, max_threads: 5, max_queue: 0 # unbounded work queue )
If you want to provide a maximum queue size, you may also consider the
fallback_policy which defines what will happen if work is posted to a pool when the queue of waiting work has reached the maximum size and no new threads can be created. Available policies:
- abort: Raise a
Concurrent::RejectedExecutionErrorexception and discard the task. (default policy)
- discard: Silently discard the task and return nil as the task result.
- caller_runs: The work will be executed in the thread of the caller, instead of being given to another thread in the pool.
pool = Concurrent::ThreadPoolExecutor.new( min_threads: 5, max_threads: 5, max_queue: 100, fallback_policy: :caller_runs )
You can create something similar to a
CachedThreadPool, but with a maximum number of threads and a bounded queue.
A new thread will be created for the first 3 tasks submitted, and then, once the queue is full, up to an additional 7 threads (10 total) will be created.
If all 10 threads are busy and 100 tasks are already queued, additional tasks will be rejected.
pool = Concurrent::ThreadPoolExecutor.new( min_threads: 3, # create up to 3 threads before queueing tasks max_threads: 10, # create at most 10 threads max_queue: 100, # at most 100 jobs waiting in the queue )
max_threads set to different values will ordinarily reclaim idle threads. You can supply an
idletime argument, number of seconds that a thread may be idle before being reclaimed. The default is 60 seconds.
concurrent-ruby thread pools are based on designs from
java.util.concurrent -- a well-designed, stable, scalable, and battle-tested concurrency library. The
ThreadPoolExecutor is based on Java java.util.concurrent.ThreadPoolExecutor, and is in fact implemented with a Java ThreadPoolExecutor when running under JRuby. For more information on the design and concepts, you may find the Java documentation helpful:
Thread Pool Status and Shutdown
A running thread pool can be shutdown in an orderly or disruptive manner. Once a thread pool has been shutdown it cannot be started again.
shutdown method can be used to initiate an orderly shutdown of the thread pool. All new post calls will be handled according to the
fallback_policy (i.e. failing with a RejectedExecutionError by default). Threads in the pool will continue to process all in-progress work and will process all tasks still in the queue.
kill method can be used to immediately shutdown the pool. All new post calls will be handled according to the
Thread.kill will be called on all threads in the pool, aborting all in-progress work. Tasks in the queue will be discarded.
wait_for_termination can be used to block and wait for pool shutdown to complete. This is useful when shutting down an application and ensuring the app doesn't exit before pool processing is complete. The method wait_for_termination will block for a maximum of the given number of seconds then return true (if shutdown completed successfully) or false (if it was still ongoing). When the timeout value is
nil the call will block indefinitely. Calling
wait_for_termination on a stopped thread pool will immediately return true.
# tell the pool to shutdown in an orderly fashion, allowing in progress work to complete pool.shutdown # now wait for all work to complete, wait as long as it takes pool.wait_for_termination
You can check for current pool status:
pool.running? pool.shuttingdown? # in process of shutting down, can't take any more work pool.shutdown? # it's done
shutdown? method will return true for a stopped pool, regardless of whether the pool was stopped with
There are several other thread pools and executors in the
concurrent-ruby library. See the API documentation for more information:
Global Thread Pools
Concurrent Ruby provides several global thread pools. Higher-level abstractions use global thread pools, by default, for running asynchronous operations without creating new threads more often than necessary. These executors are lazy-loaded so they do not create overhead when not needed. The global executors may also be accessed directly if desired. For more information regarding the global thread pools and their configuration, refer to the API documentation.
When using a higher-level abstraction, which ordinarily uses a global thread pool, you may wish to instead supply your own thread pool, for separation of work, or to control the thread pool behavior with configuration.
pool = Concurrent::ThreadPoolExecutor.new( :min_threads => [2, Concurrent.processor_count].max, :max_threads => [2, Concurrent.processor_count].max, :max_queue => [2, Concurrent.processor_count].max * 5, :fallback_policy => :caller_runs ) future = Future.execute(:executor => pool) do #work end
Some Ruby versions allow the Ruby process to be forked. Generally, mixing threading and forking is an anti-pattern. Threading and forking are both concurrency techniques and mixing the two is rarely beneficial. Moreover, threads created before the fork become unusable ("dead") in the forked process. This aspect of forking is a significant issue for any application or library which spawns threads. It is strongly advised that applications using
ThreadPoolExecutor do not also fork. Since Concurrent Ruby is a foundational library often used by gems which are in turn used by other applications, it is impossible to predict or prevent upstream forking. Concurrent Ruby therefore makes a few guarantees about the behavior of
ThreadPoolExecutor after forking.
Concurrent Ruby guarantees that jobs post on the parent process will be handled on the parent process; the child process does not inherit any jobs at the time of the fork. Concurrent Ruby also guarantees that thread pools copied to the child process will continue to function normally.
When a fork occurs the
ThreadPoolExecutor in the forking process takes no special actions whatsoever. It has no way of knowing that a fork occurred. It proceeds to process its jobs as normal and makes no attempt whatsoever to distribute those jobs to the forked process(es).
ThreadPoolExecutor in the forked process detects that a fork has occurred it immediately takes the following actions:
- Clears all pending jobs from its queue (assuming they will be handled by the forking process).
- Deletes all worker threads (they will have died during the fork).
- Resets all job counters (these counts will be reflected in the forking process).
- Begins posting new jobs as normal.
These actions guarantee that all in-flight jobs are processed normally in the forking process and that thread pools, including the global thread pools, remain functional in the forked process(es).