LLVM 22.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
18#include "llvm/Config/llvm-config.h"
23#include "llvm/Support/thread.h"
24
25#include <future>
26
27#include <condition_variable>
28#include <deque>
29#include <functional>
30#include <mutex>
31#include <utility>
32
33namespace llvm {
34
36
37/// This defines the abstract base interface for a ThreadPool allowing
38/// asynchronous parallel execution on a defined number of threads.
39///
40/// It is possible to reuse one thread pool for different groups of tasks
41/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
42/// the same queue, but it is possible to wait only for a specific group of
43/// tasks to finish.
44///
45/// It is also possible for worker threads to submit new tasks and wait for
46/// them. Note that this may result in a deadlock in cases such as when a task
47/// (directly or indirectly) tries to wait for its own completion, or when all
48/// available threads are used up by tasks waiting for a task that has no thread
49/// left to run on (this includes waiting on the returned future). It should be
50/// generally safe to wait() for a group as long as groups do not form a cycle.
52 /// The actual method to enqueue a task to be defined by the concrete
53 /// implementation.
54 virtual void asyncEnqueue(llvm::unique_function<void()> Task,
55 ThreadPoolTaskGroup *Group) = 0;
56
57public:
58 /// Destroying the pool will drain the pending tasks and wait. The current
59 /// thread may participate in the execution of the pending tasks.
61
62 /// Blocking wait for all the threads to complete and the queue to be empty.
63 /// It is an error to try to add new tasks while blocking on this call.
64 /// Calling wait() from a task would deadlock waiting for itself.
65 virtual void wait() = 0;
66
67 /// Blocking wait for only all the threads in the given group to complete.
68 /// It is possible to wait even inside a task, but waiting (directly or
69 /// indirectly) on itself will deadlock. If called from a task running on a
70 /// worker thread, the call may process pending tasks while waiting in order
71 /// not to waste the thread.
72 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
73
74 /// Returns the maximum number of worker this pool can eventually grow to.
75 virtual unsigned getMaxConcurrency() const = 0;
76
77 /// Asynchronous submission of a task to the pool. The returned future can be
78 /// used to wait for the task to finish and is *non-blocking* on destruction.
79 template <typename Function, typename... Args>
80 auto async(Function &&F, Args &&...ArgList) {
81 auto Task =
82 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
83 return async(std::move(Task));
84 }
85
86 /// Overload, task will be in the given task group.
87 template <typename Function, typename... Args>
88 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
89 auto Task =
90 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
91 return async(Group, std::move(Task));
92 }
93
94 /// Asynchronous submission of a task to the pool. The returned future can be
95 /// used to wait for the task to finish and is *non-blocking* on destruction.
96 template <typename Func>
97 auto async(Func &&F) -> std::shared_future<decltype(F())> {
98 return asyncImpl(
99 llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), nullptr);
100 }
101
102 template <typename Func>
103 auto async(ThreadPoolTaskGroup &Group, Func &&F)
104 -> std::shared_future<decltype(F())> {
105 return asyncImpl(
106 llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), &Group);
107 }
108
109private:
110 /// Asynchronous submission of a task to the pool. The returned future can be
111 /// used to wait for the task to finish and is *non-blocking* on destruction.
112 template <typename ResTy>
113 std::shared_future<ResTy> asyncImpl(llvm::unique_function<ResTy()> Task,
114 ThreadPoolTaskGroup *Group) {
115 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
116 asyncEnqueue([Future]() { Future.wait(); }, Group);
117 return Future;
118 }
119};
120
121#if LLVM_ENABLE_THREADS
122/// A ThreadPool implementation using std::threads.
123///
124/// The pool keeps a vector of threads alive, waiting on a condition variable
125/// for some work to become available.
126class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
127public:
128 /// Construct a pool using the hardware strategy \p S for mapping hardware
129 /// execution resources (threads, cores, CPUs)
130 /// Defaults to using the maximum execution resources in the system, but
131 /// accounting for the affinity mask.
132 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
133
134 /// Blocking destructor: the pool will wait for all the threads to complete.
135 ~StdThreadPool() override;
136
137 /// Blocking wait for all the threads to complete and the queue to be empty.
138 /// It is an error to try to add new tasks while blocking on this call.
139 /// Calling wait() from a task would deadlock waiting for itself.
140 void wait() override;
141
142 /// Blocking wait for only all the threads in the given group to complete.
143 /// It is possible to wait even inside a task, but waiting (directly or
144 /// indirectly) on itself will deadlock. If called from a task running on a
145 /// worker thread, the call may process pending tasks while waiting in order
146 /// not to waste the thread.
147 void wait(ThreadPoolTaskGroup &Group) override;
148
149 /// Returns the maximum number of worker threads in the pool, not the current
150 /// number of threads!
151 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
152
153 /// Returns true if the current thread is a worker thread of this thread pool.
154 bool isWorkerThread() const;
155
156private:
157 /// Returns true if all tasks in the given group have finished (nullptr means
158 /// all tasks regardless of their group). QueueLock must be locked.
159 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
160
161 /// Asynchronous submission of a task to the pool. The returned future can be
162 /// used to wait for the task to finish and is *non-blocking* on destruction.
163 void asyncEnqueue(llvm::unique_function<void()> Task,
164 ThreadPoolTaskGroup *Group) override {
165 int requestedThreads;
166 {
167 // Lock the queue and push the new task
168 std::unique_lock<std::mutex> LockGuard(QueueLock);
169
170 // Don't allow enqueueing after disabling the pool
171 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
172 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
173 requestedThreads = ActiveThreads + Tasks.size();
174 }
175 QueueCondition.notify_one();
176 grow(requestedThreads);
177 }
178
179 /// Grow to ensure that we have at least `requested` Threads, but do not go
180 /// over MaxThreadCount.
181 void grow(int requested);
182
183 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
184 void processTasksWithJobserver();
185
186 /// Threads in flight
187 std::vector<llvm::thread> Threads;
188 /// Lock protecting access to the Threads vector.
189 mutable llvm::sys::RWMutex ThreadsLock;
190
191 /// Tasks waiting for execution in the pool.
192 std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
193 Tasks;
194
195 /// Locking and signaling for accessing the Tasks queue.
196 std::mutex QueueLock;
197 std::condition_variable QueueCondition;
198
199 /// Signaling for job completion (all tasks or all tasks in a group).
200 std::condition_variable CompletionCondition;
201
202 /// Keep track of the number of thread actually busy
203 unsigned ActiveThreads = 0;
204 /// Number of threads active for tasks in the given group (only non-zero).
205 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
206
207 /// Signal for the destruction of the pool, asking thread to exit.
208 bool EnableFlag = true;
209
210 const ThreadPoolStrategy Strategy;
211
212 /// Maximum number of threads to potentially grow this pool to.
213 const unsigned MaxThreadCount;
214
215 JobserverClient *TheJobserver = nullptr;
216};
217#endif // LLVM_ENABLE_THREADS
218
219/// A non-threaded implementation.
221public:
222 /// Construct a non-threaded pool, ignoring using the hardware strategy.
224
225 /// Blocking destructor: the pool will first execute the pending tasks.
226 ~SingleThreadExecutor() override;
227
228 /// Blocking wait for all the tasks to execute first
229 void wait() override;
230
231 /// Blocking wait for only all the tasks in the given group to complete.
232 void wait(ThreadPoolTaskGroup &Group) override;
233
234 /// Returns always 1: there is no concurrency.
235 unsigned getMaxConcurrency() const override { return 1; }
236
237 /// Returns true if the current thread is a worker thread of this thread pool.
238 bool isWorkerThread() const;
239
240private:
241 /// Asynchronous submission of a task to the pool. The returned future can be
242 /// used to wait for the task to finish and is *non-blocking* on destruction.
243 void asyncEnqueue(llvm::unique_function<void()> Task,
244 ThreadPoolTaskGroup *Group) override {
245 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
246 }
247
248 /// Tasks waiting for execution in the pool.
249 std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
250 Tasks;
251};
252
253#if LLVM_ENABLE_THREADS
254using DefaultThreadPool = StdThreadPool;
255#else
257#endif
258
259/// A group of tasks to be run on a thread pool. Thread pool tasks in different
260/// groups can run on the same threadpool but can be waited for separately.
261/// It is even possible for tasks of one group to submit and wait for tasks
262/// of another group, as long as this does not form a loop.
264public:
265 /// The ThreadPool argument is the thread pool to forward calls to.
267
268 /// Blocking destructor: will wait for all the tasks in the group to complete
269 /// by calling ThreadPool::wait().
271
272 /// Calls ThreadPool::async() for this group.
273 template <typename Function, typename... Args>
274 inline auto async(Function &&F, Args &&...ArgList) {
275 return Pool.async(*this, std::forward<Function>(F),
276 std::forward<Args>(ArgList)...);
277 }
278
279 /// Calls ThreadPool::wait() for this group.
280 void wait() { Pool.wait(*this); }
281
282private:
284};
285
286} // namespace llvm
287
288#endif // LLVM_SUPPORT_THREADPOOL_H
assert(UImm &&(UImm !=~static_cast< T >(0)) &&"Invalid immediate!")
#define LLVM_ABI
Definition Compiler.h:213
This file defines the DenseMap class.
This file provides a collection of function (or more generally, callable) type erasure utilities supp...
#define F(x, y, z)
Definition MD5.cpp:54
A non-threaded implementation.
Definition ThreadPool.h:220
SingleThreadExecutor(ThreadPoolStrategy ignored={})
Construct a non-threaded pool, ignoring using the hardware strategy.
void wait() override
Blocking wait for all the tasks to execute first.
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition ThreadPool.h:235
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition ThreadPool.h:51
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition ThreadPool.h:88
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition ThreadPool.h:103
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:97
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:80
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition Threading.h:115
A group of tasks to be run on a thread pool.
Definition ThreadPool.h:263
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition ThreadPool.h:274
void wait()
Calls ThreadPool::wait() for this group.
Definition ThreadPool.h:280
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition ThreadPool.h:270
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition ThreadPool.h:266
unique_function is a type-erasing functor similar to std::function.
SmartRWMutex< false > RWMutex
Definition RWMutex.h:165
This is an optimization pass for GlobalISel generic memory operations.
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition Threading.h:190
SingleThreadExecutor DefaultThreadPool
Definition ThreadPool.h:256