LLVM 20.0.0git
Parallel.cpp
Go to the documentation of this file.
1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "llvm/Config/llvm-config.h"
13
14#include <atomic>
15#include <future>
16#include <thread>
17#include <vector>
18
20
21namespace llvm {
22namespace parallel {
23#if LLVM_ENABLE_THREADS
24
25#ifdef _WIN32
26static thread_local unsigned threadIndex = UINT_MAX;
27
28unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
29#else
30thread_local unsigned threadIndex = UINT_MAX;
31#endif
32
33namespace detail {
34
35namespace {
36
37/// An abstract class that takes closures and runs them asynchronously.
38class Executor {
39public:
40 virtual ~Executor() = default;
41 virtual void add(std::function<void()> func) = 0;
42 virtual size_t getThreadCount() const = 0;
43
44 static Executor *getDefaultExecutor();
45};
46
47/// An implementation of an Executor that runs closures on a thread pool
48/// in filo order.
49class ThreadPoolExecutor : public Executor {
50public:
51 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
53 // Spawn all but one of the threads in another thread as spawning threads
54 // can take a while.
55 Threads.reserve(ThreadCount);
56 Threads.resize(1);
57 std::lock_guard<std::mutex> Lock(Mutex);
58 // Use operator[] before creating the thread to avoid data race in .size()
59 // in 'safe libc++' mode.
60 auto &Thread0 = Threads[0];
61 Thread0 = std::thread([this, S] {
62 for (unsigned I = 1; I < ThreadCount; ++I) {
63 Threads.emplace_back([=] { work(S, I); });
64 if (Stop)
65 break;
66 }
67 ThreadsCreated.set_value();
68 work(S, 0);
69 });
70 }
71
72 void stop() {
73 {
74 std::lock_guard<std::mutex> Lock(Mutex);
75 if (Stop)
76 return;
77 Stop = true;
78 }
79 Cond.notify_all();
80 ThreadsCreated.get_future().wait();
81 }
82
83 ~ThreadPoolExecutor() override {
84 stop();
85 std::thread::id CurrentThreadId = std::this_thread::get_id();
86 for (std::thread &T : Threads)
87 if (T.get_id() == CurrentThreadId)
88 T.detach();
89 else
90 T.join();
91 }
92
93 struct Creator {
94 static void *call() { return new ThreadPoolExecutor(strategy); }
95 };
96 struct Deleter {
97 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
98 };
99
100 void add(std::function<void()> F) override {
101 {
102 std::lock_guard<std::mutex> Lock(Mutex);
103 WorkStack.push_back(std::move(F));
104 }
105 Cond.notify_one();
106 }
107
108 size_t getThreadCount() const override { return ThreadCount; }
109
110private:
111 void work(ThreadPoolStrategy S, unsigned ThreadID) {
112 threadIndex = ThreadID;
113 S.apply_thread_strategy(ThreadID);
114 while (true) {
115 std::unique_lock<std::mutex> Lock(Mutex);
116 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
117 if (Stop)
118 break;
119 auto Task = std::move(WorkStack.back());
120 WorkStack.pop_back();
121 Lock.unlock();
122 Task();
123 }
124 }
125
126 std::atomic<bool> Stop{false};
127 std::vector<std::function<void()>> WorkStack;
128 std::mutex Mutex;
129 std::condition_variable Cond;
130 std::promise<void> ThreadsCreated;
131 std::vector<std::thread> Threads;
132 unsigned ThreadCount;
133};
134
135Executor *Executor::getDefaultExecutor() {
136#ifdef _WIN32
137 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
138 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
139 // stops the thread pool and waits for any worker thread creation to complete
140 // but does not wait for the threads to finish. The wait for worker thread
141 // creation to complete is important as it prevents intermittent crashes on
142 // Windows due to a race condition between thread creation and process exit.
143 //
144 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
145 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
146 // destructor ensures it has been stopped and waits for worker threads to
147 // finish. The wait is important as it prevents intermittent crashes on
148 // Windows when the process is doing a full exit.
149 //
150 // The Windows crashes appear to only occur with the MSVC static runtimes and
151 // are more frequent with the debug static runtime.
152 //
153 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
154
155 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
156 ThreadPoolExecutor::Deleter>
157 ManagedExec;
158 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
159 return Exec.get();
160#else
161 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
162 // by llvm_shutdown(), worker threads will clean up and invoke TLS
163 // destructors. This can lead to race conditions if other threads attempt to
164 // access TLS objects that have already been destroyed.
165 static ThreadPoolExecutor Exec(strategy);
166 return &Exec;
167#endif
168}
169} // namespace
170} // namespace detail
171
172size_t getThreadCount() {
173 return detail::Executor::getDefaultExecutor()->getThreadCount();
174}
175#endif
176
177// Latch::sync() called by the dtor may cause one thread to block. If is a dead
178// lock if all threads in the default executor are blocked. To prevent the dead
179// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
180// of nested parallel_for_each(), only the outermost one runs parallelly.
181TaskGroup::TaskGroup()
182#if LLVM_ENABLE_THREADS
183 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
184 (threadIndex == UINT_MAX)) {}
185#else
186 : Parallel(false) {}
187#endif
188TaskGroup::~TaskGroup() {
189 // We must ensure that all the workloads have finished before decrementing the
190 // instances count.
191 L.sync();
192}
193
194void TaskGroup::spawn(std::function<void()> F) {
195#if LLVM_ENABLE_THREADS
196 if (Parallel) {
197 L.inc();
198 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
199 F();
200 L.dec();
201 });
202 return;
203 }
204#endif
205 F();
206}
207
208} // namespace parallel
209} // namespace llvm
210
211void llvm::parallelFor(size_t Begin, size_t End,
212 llvm::function_ref<void(size_t)> Fn) {
213#if LLVM_ENABLE_THREADS
214 if (parallel::strategy.ThreadsRequested != 1) {
215 auto NumItems = End - Begin;
216 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
217 // overhead on large inputs.
218 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
219 if (TaskSize == 0)
220 TaskSize = 1;
221
223 for (; Begin + TaskSize < End; Begin += TaskSize) {
224 TG.spawn([=, &Fn] {
225 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
226 Fn(I);
227 });
228 }
229 if (Begin != End) {
230 TG.spawn([=, &Fn] {
231 for (size_t I = Begin; I != End; ++I)
232 Fn(I);
233 });
234 }
235 return;
236 }
237#endif
238
239 for (; Begin != End; ++Begin)
240 Fn(Begin);
241}
static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")
bool End
Definition: ELF_riscv.cpp:480
global merge func
#define F(x, y, z)
Definition: MD5.cpp:55
#define I(x, y, z)
Definition: MD5.cpp:58
const SmallVectorImpl< MachineOperand > & Cond
static cl::opt< int > ThreadCount("threads", cl::init(0))
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
Definition: ManagedStatic.h:83
This tells how a thread pool will be used.
Definition: Threading.h:116
void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
Definition: Threading.cpp:41
An efficient, type-erasing, non-owning reference to a callable.
void spawn(std::function< void()> f)
Definition: Parallel.cpp:194
ThreadPoolStrategy strategy
Definition: Parallel.cpp:19
unsigned getThreadIndex()
Definition: Parallel.h:54
size_t getThreadCount()
Definition: Parallel.h:55
This is an optimization pass for GlobalISel generic memory operations.
Definition: AddressRanges.h:18
void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition: Parallel.cpp:211