问题描述
thread_local CustomAllocator* ts_alloc = nullptr;
struct AllocatorSetup
{
AllocatorSetup( int threadNum )
{
static std::vector<CustomAllocator> vec( (size_t)omp_get_max_threads() );
ts_alloc = &vec.at( threadNum );
}
~AllocatorSetup()
{
ts_alloc->resetArena();
ts_alloc = nullptr;
}
AllocatorSetup() = delete;
AllocatorSetup( const AllocatorSetup& ) = delete;
void operator=( const AllocatorSetup& ) = delete;
};
template<class E>
inline E* allocateBuffer( size_t count )
{
return (E*)ts_alloc->allocate( count * sizeof( E ),alignof( E ) );
}
void computeThings()
{
#pragma omp parallel for
for( int64_t i = 0; i < 100000; i++ )
{
AllocatorSetup allocSetup{ omp_get_thread_num() };
float* const buffer = allocateBuffer<float>( 1024 * 256 );
// ..some computations here
}
}
换句话说,在任何给定时间,omp_get_thread_num()
索引和本地线程之间的关系是一对一还是一对多?
解决方法
The documentation spells this out.
omp_get_thread_num 例程返回调用线程的当前团队中的线程号。
为 omp_get_thread_num 区域设置的绑定线程是当前团队。 omp_get_thread_num 区域的绑定区域是最内部的封闭并行区域。
omp_get_thread_num 例程返回调用线程的线程号,在执行例程区域绑定到的并行区域的组内。 线程号是一个0到1之间的整数,比omp_get_num_threads返回的值小一号。 团队主线程的线程号为0。如果从线程调用例程,则返回0程序的连续部分。
强调我的。
这意味着如果您有多个团队同时执行,则每个团队中的线程编号为 0 .. nbr_threads_in_team
。因此,如果多个线程在不同的并发运行团队中,它们将获得相同的线程编号。
虽然 OpenMP 肯定会重用线程,但如果它实际上同时运行两个并行部分,您将同时运行两个线程组。由于单个线程不能同时做两件事,因此这些线程必须不同,但每个团队中的线程都编号为 0 .. nbr_threads_in_team
。
这意味着您的 computeThings()
函数目前不是线程安全的。您可以改为构建一个 CustomAllocator
对象的线程安全池,线程可以从中“借出”一个对象,并在线程完成后返回它。
像这样:
void computeThings() {
#pragma omp parallel
{
//Since we're in a parallel block,each thread
//will lend its own CustomAllocator
CustomAllocator& theAllocator = allocatorPool.lend();
#pragma omp for
for (...) {
/* Do stuff with the allocator */
}
allocatorPool.unLend(theAllocator);
}
}
在生产代码中,lend()
和 unLend()
当然应该使用 RAII 实现,以避免出现异常时的泄漏。请注意,它仅在整个并行 for 循环之前获取一次新资源,而不是在循环的每次迭代中,因此每次调用只需为缓存未命中支付一次。 (除非您在循环中调用该函数本身,否则您的数据不太可能在调用 computeThings
之间保留在 L1 中。)如果您确实需要对 L1 友好,即使在多次调用该函数时,您也可以可以将每个资源与一个线程 ID 关联起来,如果“首选资源”仍然可用,则优先将资源返回给之前请求它们的同一个线程。
以下是这种缓存的示例,该缓存尝试为每个请求线程提供其首选资源实例:
//Warning,untested!
template<typename T>
class ThreadAffineCache {
std::unordered_map<std::thread::id,std::unique_ptr<T>> m_cache = {};
std::mutex m_mtx{};
public:
std::unique_ptr<T> lend() {
std::unique_lock lk{m_mtx};
auto tid = std::this_thread::get_id();
std::unique_ptr<T> result = std::move(m_cache[tid]);
m_cache.erase(tid);
if (!result) {
if (m_cache.empty()) {
result = std::make_unique<T>();
} else {
auto iter = m_cache.begin();
result = std::move(*iter);
m_cache.erase(iter);
}
}
assert(result);
return result;
}
void unLend(std::unique_ptr<T> obj) {
assert(obj);
std::unique_lock lk{m_mtx};
m_cache[std::this_thread::get_id()] = std::move(obj);
}
}
或者,您可以使用由 pthread_self
或 gettid
返回的值索引的地图。我不确定 std::thread::id
是否返回对 OpenMP 工作线程有意义的内容。在线程终止时进行清理也可能是一个问题。
如果您只是需要一个简单的方法,您还可以使用互斥锁保护 computeThings
。