Halide 14.0.0
Halide compiler and libraries
ThreadPool.h
Go to the documentation of this file.
1#ifndef HALIDE_THREAD_POOL_H
2#define HALIDE_THREAD_POOL_H
3
4#include <condition_variable>
5#include <future>
6#include <mutex>
7#include <queue>
8#include <thread>
9#include <utility>
10
11#ifdef _MSC_VER
12#else
13#include <unistd.h>
14#endif
15
16/** \file
17 * Define a simple thread pool utility that is modeled on the api of
18 * std::async(); since implementation details of std::async
19 * can vary considerably, with no control over thread spawning, this class
20 * allows us to use the same model but with precise control over thread usage.
21 *
22 * A ThreadPool is created with a specific number of threads, which will never
23 * vary over the life of the ThreadPool. (If created without a specific number
24 * of threads, it will attempt to use threads == number-of-cores.)
25 *
26 * Each async request will go into a queue, and will be serviced by the next
27 * available thread from the pool.
28 *
29 * The ThreadPool's dtor will block until all currently-executing tasks
30 * to finish (but won't schedule any more).
31 *
32 * Note that this is a fairly simpleminded ThreadPool, meant for tasks
33 * that are fairly coarse (e.g. different tasks in a test); it is specifically
34 * *not* intended to be the underlying implementation for Halide runtime threads
35 */
36namespace Halide {
37namespace Internal {
38
39template<typename T>
41 struct Job {
42 std::function<T()> func;
43 std::promise<T> result;
44
45 void run_unlocked(std::unique_lock<std::mutex> &unique_lock);
46 };
47
48 // all fields are protected by this mutex.
49 std::mutex mutex;
50
51 // Queue of Jobs.
52 std::queue<Job> jobs;
53
54 // Broadcast whenever items are added to the Job queue.
55 std::condition_variable wakeup_threads;
56
57 // Keep track of threads so they can be joined at shutdown
58 std::vector<std::thread> threads;
59
60 // True if the pool is shutting down.
61 bool shutting_down{false};
62
63 void worker_thread() {
64 std::unique_lock<std::mutex> unique_lock(mutex);
65 while (!shutting_down) {
66 if (jobs.empty()) {
67 // There are no jobs pending. Wait until more jobs are enqueued.
68 wakeup_threads.wait(unique_lock);
69 } else {
70 // Grab the next job.
71 Job cur_job = std::move(jobs.front());
72 jobs.pop();
73 cur_job.run_unlocked(unique_lock);
74 }
75 }
76 }
77
78public:
79 static size_t num_processors_online() {
80#ifdef _WIN32
81 char *num_cores = getenv("NUMBER_OF_PROCESSORS");
82 return num_cores ? atoi(num_cores) : 8;
83#else
84 return sysconf(_SC_NPROCESSORS_ONLN);
85#endif
86 }
87
88 // Default to number of available cores if not specified otherwise
89 ThreadPool(size_t desired_num_threads = num_processors_online()) {
90 // This file doesn't depend on anything else in libHalide, so
91 // we'll use assert, not internal_assert.
92 assert(desired_num_threads > 0);
93
94 std::lock_guard<std::mutex> lock(mutex);
95
96 // Create all the threads.
97 for (size_t i = 0; i < desired_num_threads; ++i) {
98 threads.emplace_back([this] { worker_thread(); });
99 }
100 }
101
103 // Wake everyone up and tell them the party's over and it's time to go home
104 {
105 std::lock_guard<std::mutex> lock(mutex);
106 shutting_down = true;
107 wakeup_threads.notify_all();
108 }
109
110 // Wait until they leave
111 for (auto &t : threads) {
112 t.join();
113 }
114 }
115
116 template<typename Func, typename... Args>
117 std::future<T> async(Func func, Args... args) {
118 std::lock_guard<std::mutex> lock(mutex);
119
120 Job job;
121 // Don't use std::forward here: we never want args passed by reference,
122 // since they will be accessed from an arbitrary thread.
123 //
124 // Some versions of GCC won't allow capturing variadic arguments in a lambda;
125 //
126 // job.func = [func, args...]() -> T { return func(args...); }; // Nope, sorry
127 //
128 // fortunately, we can use std::bind() to accomplish the same thing.
129 job.func = std::bind(func, args...);
130 jobs.emplace(std::move(job));
131 std::future<T> result = jobs.back().result.get_future();
132
133 // Wake up our threads.
134 wakeup_threads.notify_all();
135
136 return result;
137 }
138};
139
140template<typename T>
141inline void ThreadPool<T>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
142 unique_lock.unlock();
143 T r = func();
144 unique_lock.lock();
145 result.set_value(std::move(r));
146}
147
148template<>
149inline void ThreadPool<void>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
150 unique_lock.unlock();
151 func();
152 unique_lock.lock();
153 result.set_value();
154}
155
156} // namespace Internal
157} // namespace Halide
158
159#endif // HALIDE_THREAD_POOL_H
A halide function.
Definition: Func.h:703
ThreadPool(size_t desired_num_threads=num_processors_online())
Definition: ThreadPool.h:89
static size_t num_processors_online()
Definition: ThreadPool.h:79
std::future< T > async(Func func, Args... args)
Definition: ThreadPool.h:117
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
int atoi(const char *)
char * getenv(const char *)