问题描述
我不习惯在互联网上发布任何问题,所以请告诉我是否做错了事。
简而言之
上下文
在为Single Consumer Single Producer Queue进行非常有效的实现时,我在基准测试代码时遇到了GCC编译器的不合逻辑的行为。
完整的故事
我希望有人会具备必要的知识来解释发生了什么事。
我目前在arch linux上使用GCC 10.2.0及其C ++ 20实现。我的笔记本电脑是具有i7-7500U处理器的Lenovo T470S。
让我从数据结构开始:
class SPSCQueue
{
public:
...
private:
alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
std::size_t _headCache { 0 }; // Head cache for the producer
char _pad0[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
std::size_t _tailCache { 0 }; // Head cache for the consumer
char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
};
以下数据结构在我的系统上推入/弹出时获得了快速且稳定的20ns。
但是,仅使用以下成员更改对齐方式会使基准变得不稳定,并给出20到30ns的值。
alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
struct alignas(64) {
Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
std::size_t _headCache { 0 }; // Head cache for the producer
};
alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
struct alignas(64) {
Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer1
std::size_t _tailCache { 0 }; // Tail cache for the consumer
};
最后,当我尝试使用这种配置给我40到55ns的结果时,我甚至迷失了。
std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
char _pad0[64 - sizeof(std::atomic<size_t>)];
Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
std::size_t _headCache { 0 }; // Head cache for the producer
char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)];
std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
char _pad2[64 - sizeof(std::atomic<size_t>)];
Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
std::size_t _tailCache { 0 }; // Head cache for the consumer
char _pad3[64 - sizeof(Buffer) - sizeof(std::size_t)];
这次,我的队列推送/弹出在40ns和55ns之间振荡。
这时我很迷茫,因为我不知道该在哪里寻找答案。到目前为止,C ++内存布局对我来说还是非常直观的,但是我意识到我仍然想念一些非常重要的知识,以便更好地使用高频多线程。
最小代码示例
SPSCQueue.hpp:
#pragma once
#include <atomic>
#include <cstdlib>
#include <cinttypes>
#define KF_ALIGN_CACHELINE alignas(kF::Core::Utils::CacheLinesize)
namespace kF::Core
{
template<typename Type>
class SPSCQueue;
namespace Utils
{
/** @brief Helper used to perfect forward move / copy constructor */
template<typename Type,bool Forcecopy = false>
void ForwardConstruct(Type *dest,Type *source) {
if constexpr (!Forcecopy && std::is_move_assignable_v<Type>)
new (dest) Type(std::move(*source));
else
new (dest) Type(*source);
}
/** @brief Helper used to perfect forward move / copy assignment */
template<typename Type,bool Forcecopy = false>
void ForwardAssign(Type *dest,Type *source) {
if constexpr (!Forcecopy && std::is_move_assignable_v<Type>)
*dest = std::move(*source);
else
*dest = *source;
}
/** @brief Theorical cacheline size */
constexpr std::size_t CacheLinesize = 64ul;
}
}
/**
* @brief The SPSC queue is a lock-free queue that only supports a Single Producer and a Single Consumer
* The queue is really fast compared to other more flexible implementations because the fact that only two thread can simultaneously read / write
* means that less synchronization is needed for each operation.
* The queue supports ranged push / pop to insert multiple elements without performance impact
*
* @tparam Type to be inserted
*/
template<typename Type>
class kF::Core::SPSCQueue
{
public:
/** @brief Buffer structure containing all cells */
struct Buffer
{
Type *data { nullptr };
std::size_t capacity { 0 };
};
/** @brief Local thread cache */
struct Cache
{
Buffer buffer {};
std::size_t value { 0 };
};
/** @brief Default constructor initialize the queue */
SPSCQueue(const std::size_t capacity);
/** @brief Destruct and release all memory (unsafe) */
~SPSCQueue(void) { clear(); std::free(_buffer.data); }
/** @brief Push a single element into the queue
* @return true if the element has been inserted */
template<typename ...Args>
[[nodiscard]] inline bool push(Args &&...args);
/** @brief Pop a single element from the queue
* @return true if an element has been extracted */
[[nodiscard]] inline bool pop(Type &value);
/** @brief Clear all elements of the queue (unsafe) */
void clear(void);
private:
KF_ALIGN_CACHELINE std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
struct {
Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
std::size_t _headCache { 0 }; // Head cache for the producer
char _pad0[Utils::CacheLinesize - sizeof(Buffer) - sizeof(std::size_t)];
};
KF_ALIGN_CACHELINE std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
struct{
Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
std::size_t _tailCache { 0 }; // Head cache for the consumer
char _pad1[Utils::CacheLinesize - sizeof(Buffer) - sizeof(std::size_t)];
};
/** @brief copy and move constructors disabled */
SPSCQueue(const SPSCQueue &other) = delete;
SPSCQueue(SPSCQueue &&other) = delete;
};
static_assert(sizeof(kF::Core::SPSCQueue<int>) == 4 * kF::Core::Utils::CacheLinesize);
template<typename Type>
kF::Core::SPSCQueue<Type>::SPSCQueue(const std::size_t capacity)
{
_buffer.capacity = capacity;
if (_buffer.data = reinterpret_cast<Type *>(std::malloc(sizeof(Type) * capacity)); !_buffer.data)
throw std::runtime_error("Core::SPSCQueue: Malloc Failed");
_buffer2 = _buffer;
}
template<typename Type>
template<typename ...Args>
bool kF::Core::SPSCQueue<Type>::push(Args &&...args)
{
static_assert(std::is_constructible<Type,Args...>::value,"Type must be constructible from Args...");
const auto tail = _tail.load(std::memory_order_relaxed);
auto next = tail + 1;
if (next == _buffer.capacity) [[unlikely]]
next = 0;
if (auto head = _headCache; next == head) [[unlikely]] {
head = _headCache = _head.load(std::memory_order_acquire);
if (next == head) [[unlikely]]
return false;
}
new (_buffer.data + tail) Type{ std::forward<Args>(args)... };
_tail.store(next,std::memory_order_release);
return true;
}
template<typename Type>
bool kF::Core::SPSCQueue<Type>::pop(Type &value)
{
const auto head = _head.load(std::memory_order_relaxed);
if (auto tail = _tailCache; head == tail) [[unlikely]] {
tail = _tailCache = _tail.load(std::memory_order_acquire);
if (head == tail) [[unlikely]]
return false;
}
auto *elem = reinterpret_cast<Type *>(_buffer2.data + head);
auto next = head + 1;
if (next == _buffer2.capacity) [[unlikely]]
next = 0;
value = std::move(*elem);
elem->~Type();
_head.store(next,std::memory_order_release);
return true;
}
template<typename Type>
void kF::Core::SPSCQueue<Type>::clear(void)
{
for (Type type; pop(type););
}
使用google benchmark的基准。 bench_SPSCQueue.cpp:
#include <thread>
#include <benchmark/benchmark.h>
#include "SPSCQueue.hpp"
using namespace kF;
using Queue = Core::SPSCQueue<std::size_t>;
constexpr std::size_t Capacity = 4096;
static void SPSCQueue_NoisyPush(benchmark::State &state)
{
Queue queue(Capacity);
std::atomic<bool> running = true;
std::size_t i = 0ul;
std::thread thd([&queue,&running] { for (std::size_t tmp; running; benchmark::DoNotOptimize(queue.pop(tmp))); });
for (auto _ : state) {
decltype(std::chrono::high_resolution_clock::Now()) start;
do {
start = std::chrono::high_resolution_clock::Now();
} while (!queue.push(42ul));
auto end = std::chrono::high_resolution_clock::Now();
auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
auto iterationTime = elapsed.count();
state.SetIterationTime(iterationTime);
}
running = false;
if (thd.joinable())
thd.join();
}
BENCHMARK(SPSCQueue_NoisyPush)->UseManualTime();
static void SPSCQueue_NoisyPop(benchmark::State &state)
{
Queue queue(Capacity);
std::atomic<bool> running = true;
std::size_t i = 0ul;
std::thread thd([&queue,&running] { while (running) benchmark::DoNotOptimize(queue.push(42ul)); });
for (auto _ : state) {
std::size_t tmp;
decltype(std::chrono::high_resolution_clock::Now()) start;
do {
start = std::chrono::high_resolution_clock::Now();
} while (!queue.pop(tmp));
auto end = std::chrono::high_resolution_clock::Now();
auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
auto iterationTime = elapsed.count();
state.SetIterationTime(iterationTime);
}
running = false;
if (thd.joinable())
thd.join();
}
BENCHMARK(SPSCQueue_NoisyPop)->UseManualTime();
解决方法
感谢您的有用评论(主要是感谢Peter Cordes),看来问题出在L2数据预取器。
由于我的SPSC队列设计,每个线程必须访问两个连续的缓存行以 push / pop 队列。 如果结构本身未对齐为128字节,则其地址将不会对齐为128字节,并且编译器将无法优化两个对齐的缓存行的访问。
因此,简单的解决方法是:
template<typename Type>
class alignas(128) SPSCQueue { ... };
cmd.exe
是Intel的一篇有趣的论文,解释了其架构的优化以及如何在各种级别的缓存中进行预取。