Line data Source code
1 : //===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
2 : //
3 : // The LLVM Compiler Infrastructure
4 : //
5 : // This file is distributed under the University of Illinois Open Source
6 : // License. See LICENSE.TXT for details.
7 : //
8 : //===----------------------------------------------------------------------===//
9 : //
10 : // This file defines a crude C++11 based task queue.
11 : //
12 : //===----------------------------------------------------------------------===//
13 :
14 : #ifndef LLVM_SUPPORT_TASK_QUEUE_H
15 : #define LLVM_SUPPORT_TASK_QUEUE_H
16 :
17 : #include "llvm/Config/llvm-config.h"
18 : #include "llvm/Support/ThreadPool.h"
19 : #include "llvm/Support/thread.h"
20 :
21 : #include <atomic>
22 : #include <cassert>
23 : #include <condition_variable>
24 : #include <deque>
25 : #include <functional>
26 : #include <future>
27 : #include <memory>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : namespace llvm {
32 : /// TaskQueue executes serialized work on a user-defined Thread Pool. It
33 : /// guarantees that if task B is enqueued after task A, task B begins after
34 : /// task A completes and there is no overlap between the two.
35 : class TaskQueue {
36 : // Because we don't have init capture to use move-only local variables that
37 : // are captured into a lambda, we create the promise inside an explicit
38 : // callable struct. We want to do as much of the wrapping in the
39 : // type-specialized domain (before type erasure) and then erase this into a
40 : // std::function.
41 10 : template <typename Callable> struct Task {
42 : using ResultTy = typename std::result_of<Callable()>::type;
43 6 : explicit Task(Callable C, TaskQueue &Parent)
44 : : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
45 6 : Parent(&Parent) {}
46 0 :
47 : template<typename T>
48 0 : void invokeCallbackAndSetPromise(T*) {
49 0 : P->set_value(C());
50 : }
51 0 :
52 : void invokeCallbackAndSetPromise(void*) {
53 : C();
54 0 : P->set_value();
55 1 : }
56 0 :
57 0 : void operator()() noexcept {
58 0 : ResultTy *Dummy = nullptr;
59 0 : invokeCallbackAndSetPromise(Dummy);
60 0 : Parent->completeTask();
61 0 : }
62 0 :
63 : Callable C;
64 0 : std::shared_ptr<std::promise<ResultTy>> P;
65 4 : TaskQueue *Parent;
66 4 : };
67 0 :
68 0 : public:
69 0 : /// Construct a task queue with no work.
70 0 : TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
71 0 :
72 0 : /// Blocking destructor: the queue will wait for all work to complete.
73 0 : ~TaskQueue() {
74 0 : Scheduler.wait();
75 0 : assert(Tasks.empty());
76 0 : }
77 0 :
78 0 : /// Asynchronous submission of a task to the queue. The returned future can be
79 0 : /// used to wait for the task (and all previous tasks that have not yet
80 0 : /// completed) to finish.
81 0 : template <typename Callable>
82 0 : std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
83 0 : #if !LLVM_ENABLE_THREADS
84 0 : static_assert(false,
85 0 : "TaskQueue requires building with LLVM_ENABLE_THREADS!");
86 0 : #endif
87 0 : Task<Callable> T{std::move(C), *this};
88 0 : using ResultTy = typename std::result_of<Callable()>::type;
89 0 : std::future<ResultTy> F = T.P->get_future();
90 0 : {
91 0 : std::lock_guard<std::mutex> Lock(QueueLock);
92 : // If there's already a task in flight, just queue this one up. If
93 5 : // there is not a task in flight, bypass the queue and schedule this
94 : // task immediately.
95 3 : if (IsTaskInFlight)
96 8 : Tasks.push_back(std::move(T));
97 5 : else {
98 1 : Scheduler.async(std::move(T));
99 : IsTaskInFlight = true;
100 : }
101 1 : }
102 1 : return std::move(F);
103 1 : }
104 :
105 : private:
106 1 : void completeTask() {
107 1 : // We just completed a task. If there are no more tasks in the queue,
108 1 : // update IsTaskInFlight to false and stop doing work. Otherwise
109 : // schedule the next task (while not holding the lock).
110 : std::function<void()> Continuation;
111 1 : {
112 1 : std::lock_guard<std::mutex> Lock(QueueLock);
113 1 : if (Tasks.empty()) {
114 : IsTaskInFlight = false;
115 : return;
116 1 : }
117 1 :
118 1 : Continuation = std::move(Tasks.front());
119 : Tasks.pop_front();
120 : }
121 1 : Scheduler.async(std::move(Continuation));
122 1 : }
123 :
124 : /// The thread pool on which to run the work.
125 : ThreadPool &Scheduler;
126 :
127 : /// State which indicates whether the queue currently is currently processing
128 : /// any work.
129 : bool IsTaskInFlight = false;
130 :
131 3 : /// Mutex for synchronizing access to the Tasks array.
132 : std::mutex QueueLock;
133 :
134 3 : /// Tasks waiting for execution in the queue.
135 3 : std::deque<std::function<void()>> Tasks;
136 : };
137 : } // namespace llvm
138 :
139 : #endif // LLVM_SUPPORT_TASK_QUEUE_H
|