-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathThreadPool.cpp
More file actions
112 lines (99 loc) · 2.86 KB
/
ThreadPool.cpp
File metadata and controls
112 lines (99 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
///////////////////////////////////////////////////////////////////////////////
// FILE: ThreadPool.cpp
// PROJECT: Micro-Manager
// SUBSYSTEM: MMCore
//-----------------------------------------------------------------------------
// DESCRIPTION: A class executing queued tasks on separate threads
// and scaling number of threads based on hardware.
//
// AUTHOR: Tomas Hanak, tomas.hanak@teledyne.com, 03/03/2021
// Andrej Bencur, andrej.bencur@teledyne.com, 03/03/2021
//
// COPYRIGHT: Teledyne Digital Imaging US, Inc., 2021
//
// LICENSE: This file is distributed under the "Lesser GPL" (LGPL) license.
// License text is included with the source distribution.
//
// This file is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES.
#include "ThreadPool.h"
#include "Task.h"
#include <algorithm>
#include <cassert>
#include <mutex>
#include <thread>
namespace mmcore {
namespace internal {
ThreadPool::ThreadPool()
{
const size_t hwThreadCount = std::max<size_t>(1, std::thread::hardware_concurrency());
for (size_t n = 0; n < hwThreadCount; ++n)
{
auto thread = std::make_unique<std::thread>(&ThreadPool::ThreadFunc, this);
threads_.push_back(std::move(thread));
}
}
ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(mx_);
abortFlag_ = true;
}
cv_.notify_all();
for (const auto& thread : threads_)
thread->join();
}
size_t ThreadPool::GetSize() const
{
return threads_.size();
}
void ThreadPool::Execute(Task* task)
{
assert(task);
{
std::lock_guard<std::mutex> lock(mx_);
if (abortFlag_)
return;
queue_.push_back(task);
}
cv_.notify_one();
}
void ThreadPool::Execute(const std::vector<Task*>& tasks)
{
assert(!tasks.empty());
{
std::lock_guard<std::mutex> lock(mx_);
if (abortFlag_)
return;
for (Task* task : tasks)
{
assert(task);
queue_.push_back(task);
}
}
cv_.notify_all();
}
void ThreadPool::ThreadFunc()
{
for (;;)
{
Task* task = nullptr;
{
std::unique_lock<std::mutex> lock(mx_);
cv_.wait(lock, [&]() { return abortFlag_ || !queue_.empty(); });
if (abortFlag_)
break;
task = queue_.front();
queue_.pop_front();
}
task->Execute();
task->Done();
}
}
} // namespace internal
} // namespace mmcore