首先是一个小的上下文:我正在学习C++11中的线程,为此,我尝试构建一个小的actor类,基本上(我忽略了异常处理和传播的内容),如下所示:
类参与者{
私人:标准::原子<;布尔>;停止;
专用:std::条件变量中断;
私有:std::thread actor\u thread;
私有:消息队列传入消息;
公众:演员()
:停止(错误),
actor_线程([&;]{run_actor();})
{}
public:virtual~actor(){
//如果参与者被销毁,我们必须确保线程也会死亡
停止=真;
//为此,我们必须中断actor线程,这很可能是
//正在等待传入的\u msgs队列:
中断。通知所有人();
actor_thread.join();
}
private:virtual void run_actor(){
试一试{
当(!停止)
//等待新消息并处理它
//但如果发出中断信号,则中断等待过程:
进程(传入消息、等待和弹出(中断));
}
捕获(中断的\u异常){
// ...
}
};
私有:虚拟无效进程(const message&;)=0;
// ...
};
每个参与者都在自己的actor\u线程中运行,在incoming\u msgs上等待新的传入消息,并在消息到达时对其进行处理
actor\u线程与actor一起创建,并且必须与之一起消亡,这就是为什么我需要在消息队列::wait\u和\u pop(std::condition\u variable interrupt)中使用某种中断机制的原因
本质上,我要求等待和\u pop块,直到
a) 新的消息到达或
b) 直到触发中断,在这种情况下——理想情况下——将抛出中断的异常
message\u队列中新消息的到达目前也由std::condition\u变量new\u msg\u notification建模:
/。。。
//类内消息队列:
消息等待和弹出(标准::条件变量和中断){
std::unique_lock<;std::mutex>;lock(互斥锁);
//当中断触发时,如何中断以下内容??
新建消息通知。等待(锁定,[&;]{
return!queue.empty();
});
自动消息(std::move(queue.front());
queue.pop();
返回味精;
}
简而言之,问题是这样的:new\u msg\u通知中如何中断等待新消息的过程。当触发中断时(不引入超时),等待(…)
或者,问题可以理解为:如何等待两个std::condition_变量s中的任何一个发出信号
一种简单的方法似乎是根本不使用std::condition_变量,而是使用原子标志std::atomic<;布尔>;中断,然后忙着等待新消息通知,超时时间很短,直到新消息到达或true==中断。不过,我很想避免忙碌的等待
编辑:
从皮尔克罗的评论和回答来看,基本上有两种可能的方法
- 按照Alan、mukunda和pilcrow的建议,将一条特殊的“终止”消息排队。我决定不使用这个选项,因为我不知道在我希望参与者终止时队列的大小。很可能(当我想快速终止某个消息时,大多数情况下都是这样)队列中还有数千条消息要处理,等待它们被处理,直到最终轮到终止消息为止,这似乎是不可接受的
- 实现条件变量的自定义版本,通过将第一个线程正在等待的通知转发给条件变量,可能会被另一个线程中断。我选择了这种方法
对于那些感兴趣的人,我的实现如下。在我的例子中,条件变量实际上是一个信号量(因为我更喜欢它们,也因为我喜欢这样做的练习)。我为这个信号量配备了一个相关的中断,可以通过semaphore::get_interrupt()从信号量获取该中断。如果现在一个线程阻塞了semaphore::wait(),那么另一个线程就有可能在信号量中断时调用semaphore::interrupt::trigger(),导致第一个线程取消阻塞并传播interrupt\u异常
结构
中断_异常{};
班
信号量{
公共:课堂中断;
私有:可变std::mutex mutex;
//由于施工顺序,必须在我们的互斥后声明!
私人:中断*通知者;
专用:标准::原子<;长>;计数器;
私有:std::条件变量cond;
公众:
信号量();
公众:
~semaphore()throw();
公众:无效
等待();
公众:中断及;
get_interrupt()const{return*通知_by;}
公众:无效
post(){
std::lock_guard<;std::mutex>;lock(mutex);
计数器++;
cond.notify_one();//从不抛出
}
公共:未签名长
加载()常数{
返回计数器load();
}
};
班
信号量::中断{
私有:信号量*转发到;
私有:标准::原子<;布尔>;触发;
公众:
中断(信号量*转发至):触发(错误),转发至(转发至){
断言(转发到);
std::lock\u guard<;std::mutex>;lock(转发到->;mutex);
转发帖子到->;通知者=此;
}
公众:无效
触发器(){
断言(转发到);
std::lock\u guard<;std::mutex>;(转发到->;mutex);
触发=真;
转发_posts_to->;cond.notify_one();//从不抛出
}
公众:布尔
是否触发了_()常量抛出(){
返回触发的。load();
}
公众:无效
重置()抛出(){
返回触发。存储(假);
}
};
信号量::信号量():计数器(0L),由(新中断(此)){}通知_
//必须在此处声明,否则信号量::中断是不完整的类型
信号量::~semaphore()throw(){
删除通知人;
}
无效的
信号量::wait(){
std::unique_lock<;std::mutex>;lock(互斥锁);
如果(0L==计数器){
条件等待(锁定,[&;]{
如果(通知者->;已触发())
抛出中断_异常();
返回计数器>;0;
});
}
计数器--;
}
使用这个信号量,我的消息队列实现现在看起来是这样的(使用信号量而不是std::condition_变量我可以去掉std::mutex:
类
消息队列{
专用:std::queue<;message>;queue;
private:信号量new_msg_通知;
公众:无效
推送(消息和消息){
push(std::move(msg));
new_msg_notification.post();
}
公共:const消息
等等{
新建消息通知。等待();
自动消息(std::move(queue.front());
queue.pop();
返回味精;
}
公共:信号量::中断&;
get_interrupt()const{returnnew_msg_notification.get_interrupt();}
};
Myactor现在可以中断其线程,并且线程中的延迟非常低。目前的实现如下所示:
类
演员{
专用:消息队列
输入的MSG;
///由于施工订单的原因,必须在进货后申报!
私有:信号量::中断&;
打断
私有:std::thread
我的线;
私有:std::异常\u ptr
例外
公众:
演员()
:中断(传入的\u msgs.get\u中断()),我的\u线程(
[&;]{
试一试{
run_actor();
}
捕获(…){
异常=标准::当前_异常();
}
})
{}
私有:虚拟无效
run_actor(){
而(!interrupt.is_triggered())
进程(传入的_msgs.wait_和_pop());
};
私有:虚拟无效
进程(const message&;)=0;
公众:无效
通知(信息和消息输入){
传入消息推送(std::forward<;message>;(消息输入));
}
公众:虚拟
~actor()抛出(中断\异常){
interrupt.trigger();
我的_线程。join();
如果(例外)
std::重新计时异常(异常);
}
};
你问
在C++11中等待多个条件变量的最佳方式是什么?
您不能,而且必须重新设计。一个线程一次只能等待一个条件变量(及其关联的互斥体)。在这方面,Windows的同步功能比“POSIX风格”的同步原语系列更丰富
线程安全队列的典型方法是将一条特殊的“all done!”消息排入队列,或设计一个“breakable”(或“shutdown-able”)队列。在后一种情况下,队列的内部条件变量会保护一个复杂的谓词:队列中的某个项可用或已断开
你在评论中注意到
如果没有人等待,则notify_all()将无效
这是正确的,但可能不相关。wait()ing条件变量还意味着检查谓词,并在实际阻止通知之前检查它。因此,忙于处理“未命中”一个notify_all()队列项的工作线程将看到,下一次它将