我有一个问题,也许您可以帮助我。
特定线程中有一个amqp客户端,该客户端与Rabbitmq服务器进行通信。
在客户端开始使用之前,我需要锁定(并检查它是否已经锁定)某些资源。
LockResource是各种功能的必要先决条件。
在第一个示例中,我断开了连接的最后一个lamba,但有一个奇怪的行为,有时调用do_stuff(),有时不调用...
在第二个示例中,所有权利都将被杀死,这将杀死多线程。
在第三种方法中,为避免内存泄漏,我需要更改AmqpClient以在出现错误时始终用``magic number''发出resourceLocked并在调用do_stuff()之前对其进行检查...这根本不好...
也许是我做错了或者我误解了。
如果您有更好的方法,我接受。
编辑16/09/2018
我用不恰当的解释误导了你:
do_stuff()不是唯一的方法,但是有各种说明,否则我将直接连接它。
最好写[…]而不是do_stuff()。
此外,只有一个唯一的客户端实例。
我不知道用户将首先执行什么。
他可以将资源锁定为readResourceContent,deleteResource,writeResourceProperty等,因此所有这些都必须执行差异指令。
好消息是,由于您的回答,我有了一个使用bool QMetaObject::invokeMethod(QObject *context, Functor function, Qt::ConnectionType type = Qt::AutoConnection, FunctorReturnType *ret = nullptr)
的有效解决方案
一般声明
using Callback = std::function<void()>;
Q_DECLARE_METATYPE(Callback)
qRegisterMetaType<Callback>("Callback");
AmqpClient.cpp
void AmqpClient::lockResource(Identifier identifier, QObject *context, const Callback &func)
{
if(lockedResources_.contains(identifier))
{
QMetaObject::invokeMethod(context,
func,
Qt::QueuedConnection);
return;
}
QString queueName(QString::number(identifier) + ".lock");
QAmqpQueue *lockQueue = client_->createQueue(queueName);
connect(lockQueue, qOverload<QAMQP::Error>(&QAmqpQueue::error), this, [this](QAMQP::Error error) {
if(error == QAMQP::ResourceLockedError) {
emit errorMessage("The expected resource is already locked by another user.");
sender()->deleteLater();
}
});
connect(lockQueue, &QAmqpQueue::declared, this, [=]() {
QAmqpQueue *lockQueue = qobject_cast<QAmqpQueue*>(sender());
lockQueue->consume(QAmqpQueue::coExclusive);
lockedResources_[identifier] = lockQueue;
QMetaObject::invokeMethod(context,
func,
Qt::QueuedConnection);
});
lockQueue->declare(QAmqpQueue::Exclusive | QAmqpQueue::AutoDelete);
}
Controller.cpp
void Controller::readResourceContent(int row)
{
[...]
QMetaObject::invokeMethod(amqp_,
"lockResource",
Qt::AutoConnection,
Q_ARG(Identifier, identifier),
Q_ARG(QObject*, this),
Q_ARG(Callback, [&](){ [...] }));
[...]
}
1个
// called not inside connect(...) because it may not to emit the signal
// (if resource is already locked)
disconnect(amqp_, &AmqpClient::resourceLocked, 0, 0);
connect(amqp_, &AmqpClient::resourceLocked, this, [&](){
do_stuff();
});
emit lockResource(identifier, QPrivateSignal());
2
// This is working like a charm, but I'm losing ui reactivity
QEventLoop loop;
connect(amqp_, &AmqpClient::resourceLocked, &loop, &QEventLoop::quit);
emit lockResource(identifier, QPrivateSignal());
loop.exec();
do_stuff();
3
// Using an intermediate object
class CallbackObject : public QObject
{
Q_OBJECT
std::function<void()> callback;
public:
CallbackObject(std::function<void()> callback) : QObject(), callback(callback) {}
public slots:
void execute() { callback(); deleteLater(); }
};
// Working but memory leak if signal is not emitted
// resource already locked for example
CallbackObject *helper = new CallbackObject([&](){
do_stuff() ;
});
connect(amqp_, &AmqpClient::resourceLocked, helper, &CallbackObject::execute);
emit lockResource(identifier, QPrivateSignal());
最佳答案
如果我对您的理解正确,则您正在尝试设置以下事件序列:
Client AMQPClient
| lockResource(id) |
|------------------------------>|
| |
| |--\
| | |
| resourceLocked | | try to acquire resource
|<------------------------------| |
/--| |<-/
do_stuff | | |
\->| |
| |
当
resourceLocked
无法获取资源时,不会发送消息AMQP
的消息,因此在这种情况下也不会调用do_stuff
。当您有多个客户端同时等待各自的资源被锁定时,使用实现1时未调用
do_stuff
的问题是竞争状况。以下是说明您描述的问题的序列(以及该方法以及实现3的其他问题):AmqpClient::resourceLocked
的连接,然后将其自身连接到该信号。 AmpqClient::resourceLocked
的连接,特别是客户端A刚刚建立的连接。然后将自身连接到该信号。 AmpqClient
处理客户端A的lockResource
请求。它获取了请求的资源并发出resourceLocked
信号。 do_stuff
,即使已获取的是客户端A请求的资源。 AmpqClient
处理客户端B的lockResource
请求。再次,它成功获取资源并发出resourceLocked
信号。 do_stuff
。 上面的序列显示,考虑到客户端B即使尚未获取其资源也可以工作,客户端A的
do_stuff
未被调用还算不错。要解决此问题,您必须确保仅调用刚处理了
do_stuff
请求的客户端的lockResource
。信号(根据设计始终会通知所有观察者)只是一种次优的方法,因为观察者随后需要检查信号是否旨在通知他们或其他人。第一个解决方法是修改
lockResource
信号,以同时发送发出该信号的客户端的this
指针。这样AmqpClient
可以使用该指针在客户端获取资源时回调该客户端。连接到信号的插槽的一种实现可能如下所示:
void AmqpClient::handleLockResourceRequest(int identifier,
QObject* requestingClient)
{
// try to acquire resource described by `identifier`
if (resource_acquired_successfully)
{
QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
}
}
请注意,为了使
invokeMethod
工作,do_stuff
必须是一个插槽或需要标记为Q_INVOKABLE
。但是我要走得更远,而不仅仅是通过
do_stuff
调用invokeMethod
:据我所知,AmqpClient
是连接到lockResource
的唯一侦听器,并且是同一线程中的两个对象,您可能会直接使用直接调用amqp_->tryToLockResource(identifier)
代替emit lockResource(identifier)
。即您使用信号的唯一原因是使调用能够通过事件循环。除了使用信号之外,您还可以使用
invokeMethod
通过事件循环请求获取资源。这样做的好处是,您的客户类别不再向该类别的用户公开其需要获取资源以完成其工作的事实。总而言之,生成的代码如下所示:
class ClientType : public QObject {
Q_OBJECT
public:
// ...
Q_INVOKABLE void do_stuff(); // definition as before
private:
void requestResource(); // was previously code block 1 in your question
private:
AmqpClient* amqp_;
// ...
};
inline void ClientType::requestResource()
{
auto identifier = ...; // create resource identifier
QMetaObject::invokeMethod(amqp_,
"requestResource",
Qt::AutoConnection,
Q_ARG(int, identifier),
Q_ARG(QObject*, this));
}
class AmqpClient : public QObject {
Q_OBJECT
public:
// ...
Q_INVOKABLE requestResource(int identifier, QObject* requestingClient);
};
inline void AmqpClient::requestResource(int identifier,
QObject* requestingClient)
{
// try to acquire resource described by `identifier`
if (resource_acquired_successfully)
{
QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
}
}
在上面的实现中,我假设资源标识符的类型为
int
,但是您当然可以使用在Qt元类型系统中注册的任何其他类型。同样,在将指针类型注册到Qts元类型系统之后,也可以使用QObject
(或抽象基类,以避免引入循环依赖性)来代替传递ClientType*
指针。
关于c++ - Qt的信号/插槽临时连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52338350/