Line data Source code
1 : //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
2 : //
3 : // The LLVM Compiler Infrastructure
4 : //
5 : // This file is distributed under the University of Illinois Open Source
6 : // License. See LICENSE.TXT for details.
7 : //
8 : //===----------------------------------------------------------------------===//
9 : //
10 : // This file implements a crude C++11 based thread pool.
11 : //
12 : //===----------------------------------------------------------------------===//
13 :
14 : #include "llvm/Support/ThreadPool.h"
15 :
16 : #include "llvm/Config/llvm-config.h"
17 : #include "llvm/Support/Threading.h"
18 : #include "llvm/Support/raw_ostream.h"
19 :
20 : using namespace llvm;
21 :
22 : #if LLVM_ENABLE_THREADS
23 :
24 : // Default to hardware_concurrency
25 8 : ThreadPool::ThreadPool() : ThreadPool(hardware_concurrency()) {}
26 :
27 471 : ThreadPool::ThreadPool(unsigned ThreadCount)
28 471 : : ActiveThreads(0), EnableFlag(true) {
29 : // Create ThreadCount threads that will loop forever, wait on QueueCondition
30 : // for tasks to be queued or the Pool to be destroyed.
31 471 : Threads.reserve(ThreadCount);
32 2408 : for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
33 1937 : Threads.emplace_back([&] {
34 : while (true) {
35 : PackagedTaskTy Task;
36 : {
37 : std::unique_lock<std::mutex> LockGuard(QueueLock);
38 : // Wait for tasks to be pushed in the queue
39 : QueueCondition.wait(LockGuard,
40 : [&] { return !EnableFlag || !Tasks.empty(); });
41 : // Exit condition
42 : if (!EnableFlag && Tasks.empty())
43 : return;
44 : // Yeah, we have a task, grab it and release the lock on the queue
45 :
46 : // We first need to signal that we are active before popping the queue
47 : // in order for wait() to properly detect that even if the queue is
48 : // empty, there is still a task in flight.
49 : {
50 : std::unique_lock<std::mutex> LockGuard(CompletionLock);
51 : ++ActiveThreads;
52 : }
53 : Task = std::move(Tasks.front());
54 : Tasks.pop();
55 : }
56 : // Run the task we just grabbed
57 : Task();
58 :
59 : {
60 : // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
61 : std::unique_lock<std::mutex> LockGuard(CompletionLock);
62 : --ActiveThreads;
63 : }
64 :
65 : // Notify task completion, in case someone waits on ThreadPool::wait()
66 : CompletionCondition.notify_all();
67 : }
68 : });
69 : }
70 471 : }
71 :
72 448 : void ThreadPool::wait() {
73 : // Wait for all threads to complete and the queue to be empty
74 448 : std::unique_lock<std::mutex> LockGuard(CompletionLock);
75 : // The order of the checks for ActiveThreads and Tasks.empty() matters because
76 : // any active threads might be modifying the Tasks queue, and this would be a
77 : // race.
78 896 : CompletionCondition.wait(LockGuard,
79 : [&] { return !ActiveThreads && Tasks.empty(); });
80 448 : }
81 :
82 832 : std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
83 : /// Wrap the Task in a packaged_task to return a future object.
84 832 : PackagedTaskTy PackagedTask(std::move(Task));
85 832 : auto Future = PackagedTask.get_future();
86 : {
87 : // Lock the queue and push the new task
88 832 : std::unique_lock<std::mutex> LockGuard(QueueLock);
89 :
90 : // Don't allow enqueueing after disabling the pool
91 : assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
92 :
93 : Tasks.push(std::move(PackagedTask));
94 : }
95 832 : QueueCondition.notify_one();
96 832 : return Future.share();
97 : }
98 :
99 : // The destructor joins all threads, waiting for completion.
100 942 : ThreadPool::~ThreadPool() {
101 : {
102 471 : std::unique_lock<std::mutex> LockGuard(QueueLock);
103 471 : EnableFlag = false;
104 : }
105 471 : QueueCondition.notify_all();
106 2408 : for (auto &Worker : Threads)
107 1937 : Worker.join();
108 471 : }
109 :
110 : #else // LLVM_ENABLE_THREADS Disabled
111 :
112 : ThreadPool::ThreadPool() : ThreadPool(0) {}
113 :
114 : // No threads are launched, issue a warning if ThreadCount is not 0
115 : ThreadPool::ThreadPool(unsigned ThreadCount)
116 : : ActiveThreads(0) {
117 : if (ThreadCount) {
118 : errs() << "Warning: request a ThreadPool with " << ThreadCount
119 : << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
120 : }
121 : }
122 :
123 : void ThreadPool::wait() {
124 : // Sequential implementation running the tasks
125 : while (!Tasks.empty()) {
126 : auto Task = std::move(Tasks.front());
127 : Tasks.pop();
128 : Task();
129 : }
130 : }
131 :
132 : std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
133 : // Get a Future with launch::deferred execution using std::async
134 : auto Future = std::async(std::launch::deferred, std::move(Task)).share();
135 : // Wrap the future so that both ThreadPool::wait() can operate and the
136 : // returned future can be sync'ed on.
137 : PackagedTaskTy PackagedTask([Future]() { Future.get(); });
138 : Tasks.push(std::move(PackagedTask));
139 : return Future;
140 : }
141 :
142 : ThreadPool::~ThreadPool() {
143 : wait();
144 : }
145 :
146 : #endif
|