LLVM 20.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
17#include "llvm/Config/llvm-config.h"
20#include "llvm/Support/thread.h"
21
22#include <future>
23
24#include <condition_variable>
25#include <deque>
26#include <functional>
27#include <memory>
28#include <mutex>
29#include <utility>
30
31namespace llvm {
32
33class ThreadPoolTaskGroup;
34
35/// This defines the abstract base interface for a ThreadPool allowing
36/// asynchronous parallel execution on a defined number of threads.
37///
38/// It is possible to reuse one thread pool for different groups of tasks
39/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
40/// the same queue, but it is possible to wait only for a specific group of
41/// tasks to finish.
42///
43/// It is also possible for worker threads to submit new tasks and wait for
44/// them. Note that this may result in a deadlock in cases such as when a task
45/// (directly or indirectly) tries to wait for its own completion, or when all
46/// available threads are used up by tasks waiting for a task that has no thread
47/// left to run on (this includes waiting on the returned future). It should be
48/// generally safe to wait() for a group as long as groups do not form a cycle.
50 /// The actual method to enqueue a task to be defined by the concrete
51 /// implementation.
52 virtual void asyncEnqueue(std::function<void()> Task,
53 ThreadPoolTaskGroup *Group) = 0;
54
55public:
56 /// Destroying the pool will drain the pending tasks and wait. The current
57 /// thread may participate in the execution of the pending tasks.
59
60 /// Blocking wait for all the threads to complete and the queue to be empty.
61 /// It is an error to try to add new tasks while blocking on this call.
62 /// Calling wait() from a task would deadlock waiting for itself.
63 virtual void wait() = 0;
64
65 /// Blocking wait for only all the threads in the given group to complete.
66 /// It is possible to wait even inside a task, but waiting (directly or
67 /// indirectly) on itself will deadlock. If called from a task running on a
68 /// worker thread, the call may process pending tasks while waiting in order
69 /// not to waste the thread.
70 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
71
72 /// Returns the maximum number of worker this pool can eventually grow to.
73 virtual unsigned getMaxConcurrency() const = 0;
74
75 /// Asynchronous submission of a task to the pool. The returned future can be
76 /// used to wait for the task to finish and is *non-blocking* on destruction.
77 template <typename Function, typename... Args>
78 auto async(Function &&F, Args &&...ArgList) {
79 auto Task =
80 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
81 return async(std::move(Task));
82 }
83
84 /// Overload, task will be in the given task group.
85 template <typename Function, typename... Args>
86 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
87 auto Task =
88 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
89 return async(Group, std::move(Task));
90 }
91
92 /// Asynchronous submission of a task to the pool. The returned future can be
93 /// used to wait for the task to finish and is *non-blocking* on destruction.
94 template <typename Func>
95 auto async(Func &&F) -> std::shared_future<decltype(F())> {
96 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
97 nullptr);
98 }
99
100 template <typename Func>
101 auto async(ThreadPoolTaskGroup &Group, Func &&F)
102 -> std::shared_future<decltype(F())> {
103 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
104 &Group);
105 }
106
107private:
108 /// Asynchronous submission of a task to the pool. The returned future can be
109 /// used to wait for the task to finish and is *non-blocking* on destruction.
110 template <typename ResTy>
111 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
112 ThreadPoolTaskGroup *Group) {
113 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
114 asyncEnqueue([Future]() { Future.wait(); }, Group);
115 return Future;
116 }
117};
118
119#if LLVM_ENABLE_THREADS
120/// A ThreadPool implementation using std::threads.
121///
122/// The pool keeps a vector of threads alive, waiting on a condition variable
123/// for some work to become available.
124class StdThreadPool : public ThreadPoolInterface {
125public:
126 /// Construct a pool using the hardware strategy \p S for mapping hardware
127 /// execution resources (threads, cores, CPUs)
128 /// Defaults to using the maximum execution resources in the system, but
129 /// accounting for the affinity mask.
130 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
131
132 /// Blocking destructor: the pool will wait for all the threads to complete.
133 ~StdThreadPool() override;
134
135 /// Blocking wait for all the threads to complete and the queue to be empty.
136 /// It is an error to try to add new tasks while blocking on this call.
137 /// Calling wait() from a task would deadlock waiting for itself.
138 void wait() override;
139
140 /// Blocking wait for only all the threads in the given group to complete.
141 /// It is possible to wait even inside a task, but waiting (directly or
142 /// indirectly) on itself will deadlock. If called from a task running on a
143 /// worker thread, the call may process pending tasks while waiting in order
144 /// not to waste the thread.
145 void wait(ThreadPoolTaskGroup &Group) override;
146
147 /// Returns the maximum number of worker threads in the pool, not the current
148 /// number of threads!
149 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
150
151 // TODO: Remove, misleading legacy name warning!
152 LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
153 unsigned getThreadCount() const { return MaxThreadCount; }
154
155 /// Returns true if the current thread is a worker thread of this thread pool.
156 bool isWorkerThread() const;
157
158private:
159 /// Returns true if all tasks in the given group have finished (nullptr means
160 /// all tasks regardless of their group). QueueLock must be locked.
161 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
162
163 /// Asynchronous submission of a task to the pool. The returned future can be
164 /// used to wait for the task to finish and is *non-blocking* on destruction.
165 void asyncEnqueue(std::function<void()> Task,
166 ThreadPoolTaskGroup *Group) override {
167 int requestedThreads;
168 {
169 // Lock the queue and push the new task
170 std::unique_lock<std::mutex> LockGuard(QueueLock);
171
172 // Don't allow enqueueing after disabling the pool
173 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
174 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
175 requestedThreads = ActiveThreads + Tasks.size();
176 }
177 QueueCondition.notify_one();
178 grow(requestedThreads);
179 }
180
181 /// Grow to ensure that we have at least `requested` Threads, but do not go
182 /// over MaxThreadCount.
183 void grow(int requested);
184
185 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
186
187 /// Threads in flight
188 std::vector<llvm::thread> Threads;
189 /// Lock protecting access to the Threads vector.
190 mutable llvm::sys::RWMutex ThreadsLock;
191
192 /// Tasks waiting for execution in the pool.
193 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
194
195 /// Locking and signaling for accessing the Tasks queue.
196 std::mutex QueueLock;
197 std::condition_variable QueueCondition;
198
199 /// Signaling for job completion (all tasks or all tasks in a group).
200 std::condition_variable CompletionCondition;
201
202 /// Keep track of the number of thread actually busy
203 unsigned ActiveThreads = 0;
204 /// Number of threads active for tasks in the given group (only non-zero).
205 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
206
207 /// Signal for the destruction of the pool, asking thread to exit.
208 bool EnableFlag = true;
209
210 const ThreadPoolStrategy Strategy;
211
212 /// Maximum number of threads to potentially grow this pool to.
213 const unsigned MaxThreadCount;
214};
215#endif // LLVM_ENABLE_THREADS
216
217/// A non-threaded implementation.
219public:
220 /// Construct a non-threaded pool, ignoring using the hardware strategy.
222
223 /// Blocking destructor: the pool will first execute the pending tasks.
224 ~SingleThreadExecutor() override;
225
226 /// Blocking wait for all the tasks to execute first
227 void wait() override;
228
229 /// Blocking wait for only all the tasks in the given group to complete.
230 void wait(ThreadPoolTaskGroup &Group) override;
231
232 /// Returns always 1: there is no concurrency.
233 unsigned getMaxConcurrency() const override { return 1; }
234
235 // TODO: Remove, misleading legacy name warning!
236 LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
237 unsigned getThreadCount() const { return 1; }
238
239 /// Returns true if the current thread is a worker thread of this thread pool.
240 bool isWorkerThread() const;
241
242private:
243 /// Asynchronous submission of a task to the pool. The returned future can be
244 /// used to wait for the task to finish and is *non-blocking* on destruction.
245 void asyncEnqueue(std::function<void()> Task,
246 ThreadPoolTaskGroup *Group) override {
247 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
248 }
249
250 /// Tasks waiting for execution in the pool.
251 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
252};
253
254#if LLVM_ENABLE_THREADS
255using DefaultThreadPool = StdThreadPool;
256#else
258#endif
259
260/// A group of tasks to be run on a thread pool. Thread pool tasks in different
261/// groups can run on the same threadpool but can be waited for separately.
262/// It is even possible for tasks of one group to submit and wait for tasks
263/// of another group, as long as this does not form a loop.
265public:
266 /// The ThreadPool argument is the thread pool to forward calls to.
268
269 /// Blocking destructor: will wait for all the tasks in the group to complete
270 /// by calling ThreadPool::wait().
272
273 /// Calls ThreadPool::async() for this group.
274 template <typename Function, typename... Args>
275 inline auto async(Function &&F, Args &&...ArgList) {
276 return Pool.async(*this, std::forward<Function>(F),
277 std::forward<Args>(ArgList)...);
278 }
279
280 /// Calls ThreadPool::wait() for this group.
281 void wait() { Pool.wait(*this); }
282
283private:
285};
286
287} // namespace llvm
288
289#endif // LLVM_SUPPORT_THREADPOOL_H
#define LLVM_DEPRECATED(MSG, FIX)
Definition: Compiler.h:153
Looks at all the uses of the given value Returns the Liveness deduced from the uses of this value Adds all uses that cause the result to be MaybeLive to MaybeLiveRetUses If the result is MaybeLiveUses might be modified but its content should be ignored(since it might not be complete). DeadArgumentEliminationPass
This file defines the DenseMap class.
#define F(x, y, z)
Definition: MD5.cpp:55
assert(ImpDefSCC.getReg()==AMDGPU::SCC &&ImpDefSCC.isDef())
A non-threaded implementation.
Definition: ThreadPool.h:218
void wait() override
Blocking wait for all the tasks to execute first.
Definition: ThreadPool.cpp:201
~SingleThreadExecutor() override
Blocking destructor: the pool will first execute the pending tasks.
Definition: ThreadPool.cpp:220
unsigned getThreadCount() const
Definition: ThreadPool.h:237
bool isWorkerThread() const
Returns true if the current thread is a worker thread of this thread pool.
Definition: ThreadPool.cpp:216
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition: ThreadPool.h:233
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition: ThreadPool.h:49
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition: ThreadPool.h:86
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition: ThreadPool.h:101
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:95
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:78
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition: Threading.h:116
A group of tasks to be run on a thread pool.
Definition: ThreadPool.h:264
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition: ThreadPool.h:275
void wait()
Calls ThreadPool::wait() for this group.
Definition: ThreadPool.h:281
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition: ThreadPool.h:271
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition: ThreadPool.h:267
This is an optimization pass for GlobalISel generic memory operations.
Definition: AddressRanges.h:18
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition: Threading.h:185
SingleThreadExecutor DefaultThreadPool
Definition: ThreadPool.h:257