使用alignas进行的虚假共享预防已被破坏

问题描述

我不习惯在互联网上发布任何问题,所以请告诉我是否做错了事。

简而言之

  1. 如何正确防止cpu缓存行大小为64bytes的64位体系结构上的错误共享?

  2. 使用C ++'alignas'关键字和简单的字节数组(例如char [64])如何影响多线程效率?

上下文

在为Single Consumer Single Producer Queue进行非常有效的实现时,我在基准测试代码时遇到了GCC编译器的不合逻辑的行为。

完整的故事

我希望有人会具备必要的知识来解释发生了什么事。

我目前在arch linux上使用GCC 10.2.0及其C ++ 20实现。我的笔记本电脑是具有i7-7500U处理器的Lenovo T470S。

让我从数据结构开始:

class SPSCQueue
{
public:
    ...

private:
    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad0[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
};

以下数据结构在我的系统上推入/弹出时获得了快速且稳定的20ns。

但是,使用以下成员更改对齐方式会使基准变得不稳定,并给出20到30ns的值。

    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
    };

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer1
        std::size_t _tailCache { 0 }; // Tail cache for the consumer
    };

最后,当我尝试使用这种配置给我40到55ns的结果时,我甚至迷失了。


    std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    char _pad0[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)];

    std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    char _pad2[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad3[64 - sizeof(Buffer) - sizeof(std::size_t)];

这次,我的队列推送/弹出在40ns和55ns之间振荡。

这时我很迷茫,因为我不知道该在哪里寻找答案。到目前为止,C ++内存布局对我来说还是非常直观的,但是我意识到我仍然想念一些非常重要的知识,以便更好地使用高频多线程。

最小代码示例

如果您希望编译整个代码以自己进行测试,则需要一些文件

SPSCQueue.hpp:


#pragma once

#include <atomic>
#include <cstdlib>
#include <cinttypes>

#define KF_ALIGN_CACHELINE alignas(kF::Core::Utils::CacheLinesize)

namespace kF::Core
{
    template<typename Type>
    class SPSCQueue;

    namespace Utils
    {
        /** @brief Helper used to perfect forward move / copy constructor */
        template<typename Type,bool Forcecopy = false>
        void ForwardConstruct(Type *dest,Type *source) {
            if constexpr (!Forcecopy && std::is_move_assignable_v<Type>)
                new (dest) Type(std::move(*source));
            else
                new (dest) Type(*source);
        }

        /** @brief Helper used to perfect forward move / copy assignment */
        template<typename Type,bool Forcecopy = false>
        void ForwardAssign(Type *dest,Type *source) {
            if constexpr (!Forcecopy && std::is_move_assignable_v<Type>)
                *dest = std::move(*source);
            else
                *dest = *source;
        }

        /** @brief Theorical cacheline size */
        constexpr std::size_t CacheLinesize = 64ul;
    }
}

/**
 * @brief The SPSC queue is a lock-free queue that only supports a Single Producer and a Single Consumer
 * The queue is really fast compared to other more flexible implementations because the fact that only two thread can simultaneously read / write
 * means that less synchronization is needed for each operation.
 * The queue supports ranged push / pop to insert multiple elements without performance impact
 *
 * @tparam Type to be inserted
 */
template<typename Type>
class kF::Core::SPSCQueue
{
public:
    /** @brief Buffer structure containing all cells */
    struct Buffer
    {
        Type *data { nullptr };
        std::size_t capacity { 0 };
    };

    /** @brief Local thread cache */
    struct Cache
    {
        Buffer buffer {};
        std::size_t value { 0 };
    };

    /** @brief Default constructor initialize the queue */
    SPSCQueue(const std::size_t capacity);

    /** @brief Destruct and release all memory (unsafe) */
    ~SPSCQueue(void) { clear(); std::free(_buffer.data); }

    /** @brief Push a single element into the queue
     *  @return true if the element has been inserted */
    template<typename ...Args>
    [[nodiscard]] inline bool push(Args &&...args);

    /** @brief Pop a single element from the queue
     *  @return true if an element has been extracted */
    [[nodiscard]] inline bool pop(Type &value);

    /** @brief Clear all elements of the queue (unsafe) */
    void clear(void);

private:
    KF_ALIGN_CACHELINE std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct {
        Buffer _buffer {}; // Buffer cache for the producer,equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
        char _pad0[Utils::CacheLinesize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    KF_ALIGN_CACHELINE std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct{
        Buffer _buffer2 {}; // Buffer cache for the consumer,equivalent to _buffer2
        std::size_t _tailCache { 0 }; // Head cache for the consumer
        char _pad1[Utils::CacheLinesize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    /** @brief copy and move constructors disabled */
    SPSCQueue(const SPSCQueue &other) = delete;
    SPSCQueue(SPSCQueue &&other) = delete;
};

static_assert(sizeof(kF::Core::SPSCQueue<int>) == 4 * kF::Core::Utils::CacheLinesize);

template<typename Type>
kF::Core::SPSCQueue<Type>::SPSCQueue(const std::size_t capacity)
{
    _buffer.capacity = capacity;
    if (_buffer.data = reinterpret_cast<Type *>(std::malloc(sizeof(Type) * capacity)); !_buffer.data)
        throw std::runtime_error("Core::SPSCQueue: Malloc Failed");
    _buffer2 = _buffer;
}

template<typename Type>
template<typename ...Args>
bool kF::Core::SPSCQueue<Type>::push(Args &&...args)
{
    static_assert(std::is_constructible<Type,Args...>::value,"Type must be constructible from Args...");

    const auto tail = _tail.load(std::memory_order_relaxed);
    auto next = tail + 1;

    if (next == _buffer.capacity) [[unlikely]]
        next = 0;
    if (auto head = _headCache; next == head) [[unlikely]] {
        head = _headCache = _head.load(std::memory_order_acquire);
        if (next == head) [[unlikely]]
            return false;
    }
    new (_buffer.data + tail) Type{ std::forward<Args>(args)... };
    _tail.store(next,std::memory_order_release);
    return true;
}

template<typename Type>
bool kF::Core::SPSCQueue<Type>::pop(Type &value)
{
    const auto head = _head.load(std::memory_order_relaxed);

    if (auto tail = _tailCache; head == tail) [[unlikely]] {
        tail = _tailCache = _tail.load(std::memory_order_acquire);
        if (head == tail) [[unlikely]]
            return false;
    }
    auto *elem = reinterpret_cast<Type *>(_buffer2.data + head);
    auto next = head + 1;
    if (next == _buffer2.capacity) [[unlikely]]
        next = 0;
    value = std::move(*elem);
    elem->~Type();
    _head.store(next,std::memory_order_release);
    return true;
}

template<typename Type>
void kF::Core::SPSCQueue<Type>::clear(void)
{
    for (Type type; pop(type););
}

使用google benchmark的基准。 bench_SPSCQueue.cpp:

#include <thread>

#include <benchmark/benchmark.h>

#include "SPSCQueue.hpp"

using namespace kF;

using Queue = Core::SPSCQueue<std::size_t>;

constexpr std::size_t Capacity = 4096;

static void SPSCQueue_NoisyPush(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue,&running] { for (std::size_t tmp; running; benchmark::DoNotOptimize(queue.pop(tmp))); });
    for (auto _ : state) {
        decltype(std::chrono::high_resolution_clock::Now()) start;
        do {
            start = std::chrono::high_resolution_clock::Now();
        } while (!queue.push(42ul));
        auto end = std::chrono::high_resolution_clock::Now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPush)->UseManualTime();

static void SPSCQueue_NoisyPop(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue,&running] { while (running) benchmark::DoNotOptimize(queue.push(42ul)); });
    for (auto _ : state) {
        std::size_t tmp;
        decltype(std::chrono::high_resolution_clock::Now()) start;
        do {
            start = std::chrono::high_resolution_clock::Now();
        } while (!queue.pop(tmp));
        auto end = std::chrono::high_resolution_clock::Now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPop)->UseManualTime();

解决方法

感谢您的有用评论(主要是感谢Peter Cordes),看来问题出在L2数据预取器。

由于我的SPSC队列设计,每个线程必须访问两个连续的缓存行以 push / pop 队列。 如果结构本身未对齐为128字节,则其地址将不会对齐为128字节,并且编译器将无法优化两个对齐的缓存行的访问。

因此,简单的解决方法是:

template<typename Type>
class alignas(128) SPSCQueue { ... };

cmd.exe是Intel的一篇有趣的论文,解释了其架构的优化以及如何在各种级别的缓存中进行预取。