LLVM 22.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
17#include "llvm/Config/llvm-config.h"
22#include "llvm/Support/thread.h"
23
24#include <future>
25
26#include <condition_variable>
27#include <deque>
28#include <functional>
29#include <memory>
30#include <mutex>
31#include <utility>
32
33namespace llvm {
34
36
37/// This defines the abstract base interface for a ThreadPool allowing
38/// asynchronous parallel execution on a defined number of threads.
39///
40/// It is possible to reuse one thread pool for different groups of tasks
41/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
42/// the same queue, but it is possible to wait only for a specific group of
43/// tasks to finish.
44///
45/// It is also possible for worker threads to submit new tasks and wait for
46/// them. Note that this may result in a deadlock in cases such as when a task
47/// (directly or indirectly) tries to wait for its own completion, or when all
48/// available threads are used up by tasks waiting for a task that has no thread
49/// left to run on (this includes waiting on the returned future). It should be
50/// generally safe to wait() for a group as long as groups do not form a cycle.
52 /// The actual method to enqueue a task to be defined by the concrete
53 /// implementation.
54 virtual void asyncEnqueue(std::function<void()> Task,
55 ThreadPoolTaskGroup *Group) = 0;
56
57public:
58 /// Destroying the pool will drain the pending tasks and wait. The current
59 /// thread may participate in the execution of the pending tasks.
61
62 /// Blocking wait for all the threads to complete and the queue to be empty.
63 /// It is an error to try to add new tasks while blocking on this call.
64 /// Calling wait() from a task would deadlock waiting for itself.
65 virtual void wait() = 0;
66
67 /// Blocking wait for only all the threads in the given group to complete.
68 /// It is possible to wait even inside a task, but waiting (directly or
69 /// indirectly) on itself will deadlock. If called from a task running on a
70 /// worker thread, the call may process pending tasks while waiting in order
71 /// not to waste the thread.
72 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
73
74 /// Returns the maximum number of worker this pool can eventually grow to.
75 virtual unsigned getMaxConcurrency() const = 0;
76
77 /// Asynchronous submission of a task to the pool. The returned future can be
78 /// used to wait for the task to finish and is *non-blocking* on destruction.
79 template <typename Function, typename... Args>
80 auto async(Function &&F, Args &&...ArgList) {
81 auto Task =
82 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
83 return async(std::move(Task));
84 }
85
86 /// Overload, task will be in the given task group.
87 template <typename Function, typename... Args>
88 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
89 auto Task =
90 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
91 return async(Group, std::move(Task));
92 }
93
94 /// Asynchronous submission of a task to the pool. The returned future can be
95 /// used to wait for the task to finish and is *non-blocking* on destruction.
96 template <typename Func>
97 auto async(Func &&F) -> std::shared_future<decltype(F())> {
98 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
99 nullptr);
100 }
101
102 template <typename Func>
103 auto async(ThreadPoolTaskGroup &Group, Func &&F)
104 -> std::shared_future<decltype(F())> {
105 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
106 &Group);
107 }
108
109private:
110 /// Asynchronous submission of a task to the pool. The returned future can be
111 /// used to wait for the task to finish and is *non-blocking* on destruction.
112 template <typename ResTy>
113 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
114 ThreadPoolTaskGroup *Group) {
115 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
116 asyncEnqueue([Future]() { Future.wait(); }, Group);
117 return Future;
118 }
119};
120
121#if LLVM_ENABLE_THREADS
122/// A ThreadPool implementation using std::threads.
123///
124/// The pool keeps a vector of threads alive, waiting on a condition variable
125/// for some work to become available.
126class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
127public:
128 /// Construct a pool using the hardware strategy \p S for mapping hardware
129 /// execution resources (threads, cores, CPUs)
130 /// Defaults to using the maximum execution resources in the system, but
131 /// accounting for the affinity mask.
132 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
133
134 /// Blocking destructor: the pool will wait for all the threads to complete.
135 ~StdThreadPool() override;
136
137 /// Blocking wait for all the threads to complete and the queue to be empty.
138 /// It is an error to try to add new tasks while blocking on this call.
139 /// Calling wait() from a task would deadlock waiting for itself.
140 void wait() override;
141
142 /// Blocking wait for only all the threads in the given group to complete.
143 /// It is possible to wait even inside a task, but waiting (directly or
144 /// indirectly) on itself will deadlock. If called from a task running on a
145 /// worker thread, the call may process pending tasks while waiting in order
146 /// not to waste the thread.
147 void wait(ThreadPoolTaskGroup &Group) override;
148
149 /// Returns the maximum number of worker threads in the pool, not the current
150 /// number of threads!
151 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
152
153 /// Returns true if the current thread is a worker thread of this thread pool.
154 bool isWorkerThread() const;
155
156private:
157 /// Returns true if all tasks in the given group have finished (nullptr means
158 /// all tasks regardless of their group). QueueLock must be locked.
159 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
160
161 /// Asynchronous submission of a task to the pool. The returned future can be
162 /// used to wait for the task to finish and is *non-blocking* on destruction.
163 void asyncEnqueue(std::function<void()> Task,
164 ThreadPoolTaskGroup *Group) override {
165 int requestedThreads;
166 {
167 // Lock the queue and push the new task
168 std::unique_lock<std::mutex> LockGuard(QueueLock);
169
170 // Don't allow enqueueing after disabling the pool
171 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
172 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
173 requestedThreads = ActiveThreads + Tasks.size();
174 }
175 QueueCondition.notify_one();
176 grow(requestedThreads);
177 }
178
179 /// Grow to ensure that we have at least `requested` Threads, but do not go
180 /// over MaxThreadCount.
181 void grow(int requested);
182
183 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
184 void processTasksWithJobserver();
185
186 /// Threads in flight
187 std::vector<llvm::thread> Threads;
188 /// Lock protecting access to the Threads vector.
189 mutable llvm::sys::RWMutex ThreadsLock;
190
191 /// Tasks waiting for execution in the pool.
192 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
193
194 /// Locking and signaling for accessing the Tasks queue.
195 std::mutex QueueLock;
196 std::condition_variable QueueCondition;
197
198 /// Signaling for job completion (all tasks or all tasks in a group).
199 std::condition_variable CompletionCondition;
200
201 /// Keep track of the number of thread actually busy
202 unsigned ActiveThreads = 0;
203 /// Number of threads active for tasks in the given group (only non-zero).
204 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
205
206 /// Signal for the destruction of the pool, asking thread to exit.
207 bool EnableFlag = true;
208
209 const ThreadPoolStrategy Strategy;
210
211 /// Maximum number of threads to potentially grow this pool to.
212 const unsigned MaxThreadCount;
213
214 JobserverClient *TheJobserver = nullptr;
215};
216#endif // LLVM_ENABLE_THREADS
217
218/// A non-threaded implementation.
220public:
221 /// Construct a non-threaded pool, ignoring using the hardware strategy.
223
224 /// Blocking destructor: the pool will first execute the pending tasks.
225 ~SingleThreadExecutor() override;
226
227 /// Blocking wait for all the tasks to execute first
228 void wait() override;
229
230 /// Blocking wait for only all the tasks in the given group to complete.
231 void wait(ThreadPoolTaskGroup &Group) override;
232
233 /// Returns always 1: there is no concurrency.
234 unsigned getMaxConcurrency() const override { return 1; }
235
236 /// Returns true if the current thread is a worker thread of this thread pool.
237 bool isWorkerThread() const;
238
239private:
240 /// Asynchronous submission of a task to the pool. The returned future can be
241 /// used to wait for the task to finish and is *non-blocking* on destruction.
242 void asyncEnqueue(std::function<void()> Task,
243 ThreadPoolTaskGroup *Group) override {
244 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
245 }
246
247 /// Tasks waiting for execution in the pool.
248 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
249};
250
251#if LLVM_ENABLE_THREADS
252using DefaultThreadPool = StdThreadPool;
253#else
255#endif
256
257/// A group of tasks to be run on a thread pool. Thread pool tasks in different
258/// groups can run on the same threadpool but can be waited for separately.
259/// It is even possible for tasks of one group to submit and wait for tasks
260/// of another group, as long as this does not form a loop.
262public:
263 /// The ThreadPool argument is the thread pool to forward calls to.
265
266 /// Blocking destructor: will wait for all the tasks in the group to complete
267 /// by calling ThreadPool::wait().
269
270 /// Calls ThreadPool::async() for this group.
271 template <typename Function, typename... Args>
272 inline auto async(Function &&F, Args &&...ArgList) {
273 return Pool.async(*this, std::forward<Function>(F),
274 std::forward<Args>(ArgList)...);
275 }
276
277 /// Calls ThreadPool::wait() for this group.
278 void wait() { Pool.wait(*this); }
279
280private:
282};
283
284} // namespace llvm
285
286#endif // LLVM_SUPPORT_THREADPOOL_H
assert(UImm &&(UImm !=~static_cast< T >(0)) &&"Invalid immediate!")
#define LLVM_ABI
Definition Compiler.h:213
This file defines the DenseMap class.
#define F(x, y, z)
Definition MD5.cpp:55
A non-threaded implementation.
Definition ThreadPool.h:219
SingleThreadExecutor(ThreadPoolStrategy ignored={})
Construct a non-threaded pool, ignoring using the hardware strategy.
void wait() override
Blocking wait for all the tasks to execute first.
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition ThreadPool.h:234
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition ThreadPool.h:51
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition ThreadPool.h:88
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition ThreadPool.h:103
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:97
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:80
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition Threading.h:115
A group of tasks to be run on a thread pool.
Definition ThreadPool.h:261
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition ThreadPool.h:272
void wait()
Calls ThreadPool::wait() for this group.
Definition ThreadPool.h:278
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition ThreadPool.h:268
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition ThreadPool.h:264
SmartRWMutex< false > RWMutex
Definition RWMutex.h:165
This is an optimization pass for GlobalISel generic memory operations.
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition Threading.h:190
SingleThreadExecutor DefaultThreadPool
Definition ThreadPool.h:254