LLVM 23.0.0git
Parallel.cpp
Go to the documentation of this file.
1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "llvm/ADT/ScopeExit.h"
11#include "llvm/Config/llvm-config.h"
16
17#include <atomic>
18#include <future>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <vector>
23
25
26namespace llvm {
27namespace parallel {
28#if LLVM_ENABLE_THREADS
29
30#ifdef _WIN32
31static thread_local unsigned threadIndex = UINT_MAX;
32
33unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
34#else
35thread_local unsigned threadIndex = UINT_MAX;
36#endif
37
38namespace detail {
39
40namespace {
41
42/// An abstract class that takes closures and runs them asynchronously.
43class Executor {
44public:
45 virtual ~Executor() = default;
46 virtual void add(std::function<void()> func) = 0;
47 virtual size_t getThreadCount() const = 0;
48
49 static Executor *getDefaultExecutor();
50};
51
52/// An implementation of an Executor that runs closures on a thread pool
53/// in filo order.
54class ThreadPoolExecutor : public Executor {
55public:
56 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
57 if (S.UseJobserver)
58 TheJobserver = JobserverClient::getInstance();
59
61 // Spawn all but one of the threads in another thread as spawning threads
62 // can take a while.
63 Threads.reserve(ThreadCount);
64 Threads.resize(1);
65 std::lock_guard<std::mutex> Lock(Mutex);
66 // Use operator[] before creating the thread to avoid data race in .size()
67 // in 'safe libc++' mode.
68 auto &Thread0 = Threads[0];
69 Thread0 = std::thread([this, S] {
70 for (unsigned I = 1; I < ThreadCount; ++I) {
71 Threads.emplace_back([this, S, I] { work(S, I); });
72 if (Stop)
73 break;
74 }
75 ThreadsCreated.set_value();
76 work(S, 0);
77 });
78 }
79
80 // To make sure the thread pool executor can only be created with a parallel
81 // strategy.
82 ThreadPoolExecutor() = delete;
83
84 void stop() {
85 {
86 std::lock_guard<std::mutex> Lock(Mutex);
87 if (Stop)
88 return;
89 Stop = true;
90 }
91 Cond.notify_all();
92 ThreadsCreated.get_future().wait();
93
94 std::thread::id CurrentThreadId = std::this_thread::get_id();
95 for (std::thread &T : Threads)
96 if (T.get_id() == CurrentThreadId)
97 T.detach();
98 else
99 T.join();
100 }
101
102 ~ThreadPoolExecutor() override { stop(); }
103
104 struct Creator {
105 static void *call() { return new ThreadPoolExecutor(strategy); }
106 };
107 struct Deleter {
108 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
109 };
110
111 void add(std::function<void()> F) override {
112 {
113 std::lock_guard<std::mutex> Lock(Mutex);
114 WorkStack.push_back(std::move(F));
115 }
116 Cond.notify_one();
117 }
118
119 size_t getThreadCount() const override { return ThreadCount; }
120
121private:
122 void work(ThreadPoolStrategy S, unsigned ThreadID) {
123 threadIndex = ThreadID;
124 S.apply_thread_strategy(ThreadID);
125 // Note on jobserver deadlock avoidance:
126 // GNU Make grants each invoked process one implicit job slot. Our
127 // JobserverClient models this by returning an implicit JobSlot on the
128 // first successful tryAcquire() in a process. This guarantees forward
129 // progress without requiring a dedicated "always-on" thread here.
130
131 while (true) {
132 if (TheJobserver) {
133 // Jobserver-mode scheduling:
134 // - Acquire one job slot (with exponential backoff to avoid busy-wait).
135 // - While holding the slot, drain and run tasks from the local queue.
136 // - Release the slot when the queue is empty or when shutting down.
137 // Rationale: Holding a slot amortizes acquire/release overhead over
138 // multiple tasks and avoids requeue/yield churn, while still enforcing
139 // the jobserver’s global concurrency limit. With K available slots,
140 // up to K workers run tasks in parallel; within each worker tasks run
141 // sequentially until the local queue is empty.
142 ExponentialBackoff Backoff(std::chrono::hours(24));
143 JobSlot Slot;
144 do {
145 if (Stop)
146 return;
147 Slot = TheJobserver->tryAcquire();
148 if (Slot.isValid())
149 break;
150 } while (Backoff.waitForNextAttempt());
151
152 llvm::scope_exit SlotReleaser(
153 [&] { TheJobserver->release(std::move(Slot)); });
154
155 while (true) {
156 std::function<void()> Task;
157 {
158 std::unique_lock<std::mutex> Lock(Mutex);
159 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
160 if (Stop && WorkStack.empty())
161 return;
162 if (WorkStack.empty())
163 break;
164 Task = std::move(WorkStack.back());
165 WorkStack.pop_back();
166 }
167 Task();
168 }
169 } else {
170 std::unique_lock<std::mutex> Lock(Mutex);
171 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
172 if (Stop)
173 break;
174 auto Task = std::move(WorkStack.back());
175 WorkStack.pop_back();
176 Lock.unlock();
177 Task();
178 }
179 }
180 }
181
182 std::atomic<bool> Stop{false};
183 std::vector<std::function<void()>> WorkStack;
184 std::mutex Mutex;
185 std::condition_variable Cond;
186 std::promise<void> ThreadsCreated;
187 std::vector<std::thread> Threads;
188 unsigned ThreadCount;
189
190 JobserverClient *TheJobserver = nullptr;
191};
192
193Executor *Executor::getDefaultExecutor() {
194#ifdef _WIN32
195 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
196 // llvm_shutdown() on Windows. This is important to avoid various race
197 // conditions at process exit that can cause crashes or deadlocks.
198
199 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
200 ThreadPoolExecutor::Deleter>
201 ManagedExec;
202 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
203 return Exec.get();
204#else
205 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
206 // by llvm_shutdown(), worker threads will clean up and invoke TLS
207 // destructors. This can lead to race conditions if other threads attempt to
208 // access TLS objects that have already been destroyed.
209 static ThreadPoolExecutor Exec(strategy);
210 return &Exec;
211#endif
212}
213} // namespace
214} // namespace detail
215
216size_t getThreadCount() {
217 return detail::Executor::getDefaultExecutor()->getThreadCount();
218}
219#endif
220
221// Latch::sync() called by the dtor may cause one thread to block. If is a dead
222// lock if all threads in the default executor are blocked. To prevent the dead
223// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
224// of nested parallel_for_each(), only the outermost one runs parallelly.
226#if LLVM_ENABLE_THREADS
227 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
228 (threadIndex == UINT_MAX)) {}
229#else
230 : Parallel(false) {}
231#endif
233 // We must ensure that all the workloads have finished before decrementing the
234 // instances count.
235 L.sync();
236}
237
238void TaskGroup::spawn(std::function<void()> F) {
239#if LLVM_ENABLE_THREADS
240 if (Parallel) {
241 L.inc();
242 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
243 F();
244 L.dec();
245 });
246 return;
247 }
248#endif
249 F();
250}
251
252} // namespace parallel
253} // namespace llvm
254
255void llvm::parallelFor(size_t Begin, size_t End,
256 llvm::function_ref<void(size_t)> Fn) {
257#if LLVM_ENABLE_THREADS
258 if (parallel::strategy.ThreadsRequested != 1) {
259 auto NumItems = End - Begin;
260 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
261 // overhead on large inputs.
262 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
263 if (TaskSize == 0)
264 TaskSize = 1;
265
267 for (; Begin + TaskSize < End; Begin += TaskSize) {
268 TG.spawn([=, &Fn] {
269 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
270 Fn(I);
271 });
272 }
273 if (Begin != End) {
274 TG.spawn([=, &Fn] {
275 for (size_t I = Begin; I != End; ++I)
276 Fn(I);
277 });
278 }
279 return;
280 }
281#endif
282
283 for (; Begin != End; ++Begin)
284 Fn(Begin);
285}
static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")
#define F(x, y, z)
Definition MD5.cpp:54
#define I(x, y, z)
Definition MD5.cpp:57
#define T
const SmallVectorImpl< MachineOperand > & Cond
This file defines the make_scope_exit function, which executes user-defined cleanup logic at scope ex...
static cl::opt< int > ThreadCount("threads", cl::init(0))
A class to help implement exponential backoff.
LLVM_ABI bool waitForNextAttempt()
Blocks while waiting for the next attempt.
A JobSlot represents a single job slot that can be acquired from or released to a jobserver pool.
Definition Jobserver.h:75
The public interface for a jobserver client.
Definition Jobserver.h:133
static LLVM_ABI_FOR_TEST JobserverClient * getInstance()
Returns the singleton instance of the JobserverClient.
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
This tells how a thread pool will be used.
Definition Threading.h:115
LLVM_ABI void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
LLVM_ABI unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
Definition Threading.cpp:42
bool UseJobserver
If true, the thread pool will attempt to coordinate with a GNU Make jobserver, acquiring a job slot b...
Definition Threading.h:149
An efficient, type-erasing, non-owning reference to a callable.
LLVM_ABI void spawn(std::function< void()> f)
Definition Parallel.cpp:238
LLVM_ABI ThreadPoolStrategy strategy
Definition Parallel.cpp:24
unsigned getThreadIndex()
Definition Parallel.h:55
size_t getThreadCount()
Definition Parallel.h:56
SmartMutex< false > Mutex
Mutex - A standard, always enforced mutex.
Definition Mutex.h:66
This is an optimization pass for GlobalISel generic memory operations.
Definition Types.h:26
LLVM_ABI void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition Parallel.cpp:255