LLVM 20.0.0git
Parallel.cpp
Go to the documentation of this file.
1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "llvm/Config/llvm-config.h"
13
14#include <atomic>
15#include <deque>
16#include <future>
17#include <thread>
18#include <vector>
19
21
22namespace llvm {
23namespace parallel {
24#if LLVM_ENABLE_THREADS
25
26#ifdef _WIN32
27static thread_local unsigned threadIndex = UINT_MAX;
28
29unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30#else
31thread_local unsigned threadIndex = UINT_MAX;
32#endif
33
34namespace detail {
35
36namespace {
37
38/// An abstract class that takes closures and runs them asynchronously.
39class Executor {
40public:
41 virtual ~Executor() = default;
42 virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43 virtual size_t getThreadCount() const = 0;
44
45 static Executor *getDefaultExecutor();
46};
47
48/// An implementation of an Executor that runs closures on a thread pool
49/// in filo order.
50class ThreadPoolExecutor : public Executor {
51public:
52 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
53 ThreadCount = S.compute_thread_count();
54 // Spawn all but one of the threads in another thread as spawning threads
55 // can take a while.
56 Threads.reserve(ThreadCount);
57 Threads.resize(1);
58 std::lock_guard<std::mutex> Lock(Mutex);
59 // Use operator[] before creating the thread to avoid data race in .size()
60 // in 'safe libc++' mode.
61 auto &Thread0 = Threads[0];
62 Thread0 = std::thread([this, S] {
63 for (unsigned I = 1; I < ThreadCount; ++I) {
64 Threads.emplace_back([=] { work(S, I); });
65 if (Stop)
66 break;
67 }
68 ThreadsCreated.set_value();
69 work(S, 0);
70 });
71 }
72
73 void stop() {
74 {
75 std::lock_guard<std::mutex> Lock(Mutex);
76 if (Stop)
77 return;
78 Stop = true;
79 }
80 Cond.notify_all();
81 ThreadsCreated.get_future().wait();
82 }
83
84 ~ThreadPoolExecutor() override {
85 stop();
86 std::thread::id CurrentThreadId = std::this_thread::get_id();
87 for (std::thread &T : Threads)
88 if (T.get_id() == CurrentThreadId)
89 T.detach();
90 else
91 T.join();
92 }
93
94 struct Creator {
95 static void *call() { return new ThreadPoolExecutor(strategy); }
96 };
97 struct Deleter {
98 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
99 };
100
101 void add(std::function<void()> F, bool Sequential = false) override {
102 {
103 std::lock_guard<std::mutex> Lock(Mutex);
104 if (Sequential)
105 WorkQueueSequential.emplace_front(std::move(F));
106 else
107 WorkQueue.emplace_back(std::move(F));
108 }
109 Cond.notify_one();
110 }
111
112 size_t getThreadCount() const override { return ThreadCount; }
113
114private:
115 bool hasSequentialTasks() const {
116 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
117 }
118
119 bool hasGeneralTasks() const { return !WorkQueue.empty(); }
120
121 void work(ThreadPoolStrategy S, unsigned ThreadID) {
122 threadIndex = ThreadID;
123 S.apply_thread_strategy(ThreadID);
124 while (true) {
125 std::unique_lock<std::mutex> Lock(Mutex);
126 Cond.wait(Lock, [&] {
127 return Stop || hasGeneralTasks() || hasSequentialTasks();
128 });
129 if (Stop)
130 break;
131 bool Sequential = hasSequentialTasks();
132 if (Sequential)
133 SequentialQueueIsLocked = true;
134 else
135 assert(hasGeneralTasks());
136
137 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138 auto Task = std::move(Queue.back());
139 Queue.pop_back();
140 Lock.unlock();
141 Task();
142 if (Sequential)
143 SequentialQueueIsLocked = false;
144 }
145 }
146
147 std::atomic<bool> Stop{false};
148 std::atomic<bool> SequentialQueueIsLocked{false};
149 std::deque<std::function<void()>> WorkQueue;
150 std::deque<std::function<void()>> WorkQueueSequential;
151 std::mutex Mutex;
152 std::condition_variable Cond;
153 std::promise<void> ThreadsCreated;
154 std::vector<std::thread> Threads;
155 unsigned ThreadCount;
156};
157
158Executor *Executor::getDefaultExecutor() {
159#ifdef _WIN32
160 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
161 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
162 // stops the thread pool and waits for any worker thread creation to complete
163 // but does not wait for the threads to finish. The wait for worker thread
164 // creation to complete is important as it prevents intermittent crashes on
165 // Windows due to a race condition between thread creation and process exit.
166 //
167 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
168 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
169 // destructor ensures it has been stopped and waits for worker threads to
170 // finish. The wait is important as it prevents intermittent crashes on
171 // Windows when the process is doing a full exit.
172 //
173 // The Windows crashes appear to only occur with the MSVC static runtimes and
174 // are more frequent with the debug static runtime.
175 //
176 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
177
178 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
179 ThreadPoolExecutor::Deleter>
180 ManagedExec;
181 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
182 return Exec.get();
183#else
184 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
185 // by llvm_shutdown(), worker threads will clean up and invoke TLS
186 // destructors. This can lead to race conditions if other threads attempt to
187 // access TLS objects that have already been destroyed.
188 static ThreadPoolExecutor Exec(strategy);
189 return &Exec;
190#endif
191}
192} // namespace
193} // namespace detail
194
195size_t getThreadCount() {
196 return detail::Executor::getDefaultExecutor()->getThreadCount();
197}
198#endif
199
200// Latch::sync() called by the dtor may cause one thread to block. If is a dead
201// lock if all threads in the default executor are blocked. To prevent the dead
202// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
203// of nested parallel_for_each(), only the outermost one runs parallelly.
204TaskGroup::TaskGroup()
205#if LLVM_ENABLE_THREADS
206 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
207 (threadIndex == UINT_MAX)) {}
208#else
209 : Parallel(false) {}
210#endif
211TaskGroup::~TaskGroup() {
212 // We must ensure that all the workloads have finished before decrementing the
213 // instances count.
214 L.sync();
215}
216
217void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
218#if LLVM_ENABLE_THREADS
219 if (Parallel) {
220 L.inc();
221 detail::Executor::getDefaultExecutor()->add(
222 [&, F = std::move(F)] {
223 F();
224 L.dec();
225 },
226 Sequential);
227 return;
228 }
229#endif
230 F();
231}
232
233} // namespace parallel
234} // namespace llvm
235
236void llvm::parallelFor(size_t Begin, size_t End,
237 llvm::function_ref<void(size_t)> Fn) {
238#if LLVM_ENABLE_THREADS
239 if (parallel::strategy.ThreadsRequested != 1) {
240 auto NumItems = End - Begin;
241 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
242 // overhead on large inputs.
243 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
244 if (TaskSize == 0)
245 TaskSize = 1;
246
248 for (; Begin + TaskSize < End; Begin += TaskSize) {
249 TG.spawn([=, &Fn] {
250 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
251 Fn(I);
252 });
253 }
254 if (Begin != End) {
255 TG.spawn([=, &Fn] {
256 for (size_t I = Begin; I != End; ++I)
257 Fn(I);
258 });
259 }
260 return;
261 }
262#endif
263
264 for (; Begin != End; ++Begin)
265 Fn(Begin);
266}
static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")
bool End
Definition: ELF_riscv.cpp:480
#define F(x, y, z)
Definition: MD5.cpp:55
#define I(x, y, z)
Definition: MD5.cpp:58
const SmallVectorImpl< MachineOperand > & Cond
assert(ImpDefSCC.getReg()==AMDGPU::SCC &&ImpDefSCC.isDef())
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
Definition: ManagedStatic.h:83
This tells how a thread pool will be used.
Definition: Threading.h:116
void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
Definition: Threading.cpp:43
An efficient, type-erasing, non-owning reference to a callable.
void spawn(std::function< void()> f, bool Sequential=false)
Definition: Parallel.cpp:217
ThreadPoolStrategy strategy
Definition: Parallel.cpp:20
unsigned getThreadIndex()
Definition: Parallel.h:54
size_t getThreadCount()
Definition: Parallel.h:55
This is an optimization pass for GlobalISel generic memory operations.
Definition: AddressRanges.h:18
void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition: Parallel.cpp:236