问题描述
为了防止服务器压力过大,我使用滑动窗口算法实现了请求频率限制器,可以根据参数判断当前请求是否允许通过。为了实现算法的线程安全,我使用原子类型来控制窗口的滑动步数,并使用unique_lock来实现当前窗口请求总数的正确总和。 但是我不确定我的实现是否是线程安全的,如果是安全的,是否会影响服务性能。有没有更好的方法来实现它?
class SlideWindowLimiter
{
public:
bool TryAcquire();
void SlideWindow(int64_t window_number);
private:
int32_t limit_; // maximum number of window requests
int32_t split_num_; // subwindow number
int32_t window_size_; // the big window
int32_t sub_window_size_; // size of subwindow = window_size_ / split_number
int16_t index_{0}; //the index of window vector
std::mutex mtx_;
std::vector<int32_t> sub_windows_; // window vector
std::atomic<int64_t> start_time_{0}; //start time of limiter
}
bool SlideWindowLimiter::TryAcquire() {
int64_t cur_time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::Now().time_since_epoch()).count();
auto time_stamp = start_time_.load();
int64_t window_num = std::max(cur_time - window_size_ - start_time_,int64_t(0)) / sub_window_size_;
std::unique_lock<std::mutex> guard(mtx_,std::defer_lock);
if (window_num > 0 && start_time_.compare_exchange_strong(time_stamp,start_time_.load() + window_num*sub_window_size_)) {
guard.lock();
SlideWindow(window_num);
guard.unlock();
}
monitor_->TotalRequestQps();
{
guard.lock();
int32_t total_req = 0;
std::cout<<" "<<std::endl;
for(auto &p : sub_windows_) {
std::cout<<p<<" "<<std::this_thread::get_id()<<std::endl;
total_req += p;
}
if(total_req >= limit_) {
monitor_->RejectedRequestQps();
return false;
} else {
monitor_->PassedRequestQps();
sub_windows_[index_] += 1;
return true;
}
guard.unlock();
}
}
void SlideWindowLimiter::SlideWindow(int64_t window_num) {
int64_t slide_num = std::min(window_num,int64_t(split_num_));
for(int i = 0; i < slide_num; i++){
index_ += 1;
index_ = index_ % split_num_;
sub_windows_[index_] = 0;
}
}
解决方法
首先,线程安全是一个相对属性。两个操作序列相互之间是线程安全的。单个代码本身不能是线程安全的。
我会回答“我处理线程的方式是否可以用其他合理的代码来保证合理的线程安全”。
答案是“不”。
我发现了一个具体问题;您对 atomic 和 compare_exchange_strong
的使用不在循环中,并且您可以在多个位置以原子方式访问 start_time_
而没有适当的照顾。如果 start_time_
在您从中读取和写入的 3 个位置发生变化,则返回 false,并且无法调用 SlideWindow
,然后...继续进行。
我想不出为什么这会是对争用的合理回应,所以这是“不,此代码不是为了在使用它的多个线程下合理运行而编写的”。
您的代码中有很多难闻的气味。您将并发代码与一大堆状态混合在一起,这意味着不清楚哪些互斥锁在保护哪些数据。
您的代码中有一个从未定义的指针。也许它应该是一个全局变量?
您在一行中使用多个 cout
写信给 <<
。在多线程环境中这是一个糟糕的计划;即使您的 cout
是并发强化的,您也会得到混乱的输出。构建一个缓冲区字符串并执行一个 <<
。
您通过后门在函数之间传递数据。 index_
例如。一个函数设置一个成员变量,另一个函数读取它。它有没有可能被另一个线程编辑?难以审计,但似乎很有可能;您将其设置为 .lock()
,然后是 .unlock()
,然后在稍后的 lock()
中阅读它,就好像它处于合理状态一样。更重要的是,您可以使用它来访问向量;如果向量或索引以计划外的方式更改,则可能会崩溃或导致内存损坏。
...
如果这段代码在生产环境中没有一堆竞争条件、崩溃等问题,我会感到震惊。我看不出有任何迹象表明该代码是并发安全的,或将其简化到易于草拟此类证明的程度。
在实际实践中,任何您尚未证明并发安全的代码在并发使用时都是不安全的。所以复杂的并发代码几乎可以保证并发使用是不安全的。
...
从一个非常非常简单的模型开始。如果您有一个互斥锁和一些数据,请将互斥锁和数据整合到一个结构体中,这样您就可以确切地知道该互斥锁在保护什么。
如果您正在使用原子,请不要在与其他变量混合的其他代码中间使用它。把它放在它自己的类中。给这个类起一个名字,代表一些具体的语义,最好是你在别处找到的。描述它应该做什么,以及之前和之后的方法保证什么。然后使用它。
在其他地方,避免任何类型的全局状态。这包括用于传递状态的类成员变量。将您的数据从一个函数显式传递到另一个函数。避免指向任何可变内容的指针。
如果您的数据都是自动存储中的所有值类型和指向不可变(在线程的生命周期中永远不会改变)数据的指针,则该数据不能直接参与竞争条件。
将剩余的数据捆绑起来,并在尽可能小的地方设置防火墙,您可以查看您如何与之交互并确定您是否搞砸了。
...
多线程编程很难,尤其是在数据可变的环境中。如果你不努力证明你的代码是正确的,你就会产生不正确的代码,而你不会知道。
嗯,根据我的经验,我知道;所有没有明显尝试以容易证明其正确的方式运行的代码都是不正确的。如果代码很旧并且有成堆的补丁超过十年以上,那么错误可能不太可能并且更难找到,但它可能仍然不正确。如果是新代码,可能更容易发现错误。