Handle updates from MTP after RPC responses.

If some RPC responses and MTP updates are received together a fake
requestId in the negative range was used and that way updates were
processed before responses.

That could lead to an incorrect "out" message flag when sending
messages to supergroups, because a broadcast update about the new
message without "out" flag was handled before the request response.

Now a separate response map and updates list are used and responses
are handled always before the updates.
This commit is contained in:
John Preston 2017-04-30 17:23:57 +03:00
parent 413eafb240
commit 6418c9c718
4 changed files with 115 additions and 118 deletions

View File

@ -1479,9 +1479,9 @@ void ConnectionPrivate::handleReceived() {
bool emitSignal = false;
{
QReadLocker locker(sessionData->haveReceivedMutex());
emitSignal = !sessionData->haveReceivedMap().isEmpty();
emitSignal = !sessionData->haveReceivedResponses().isEmpty() || !sessionData->haveReceivedUpdates().isEmpty();
if (emitSignal) {
DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, haveReceivedMap.size() = %1").arg(sessionData->haveReceivedMap().size()));
DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(sessionData->haveReceivedResponses().size()).arg(sessionData->haveReceivedUpdates().size()));
}
}
@ -1908,7 +1908,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
case mtpc_rpc_result: {
if (from + 3 > end) throw mtpErrorInsufficient();
mtpResponse response;
auto response = SerializedMessage();
MTPlong reqMsgId;
reqMsgId.read(++from, end);
@ -1943,10 +1943,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
sessionData->owner()->notifyLayerInited(true);
}
mtpRequestId requestId = wasSent(reqMsgId.v);
auto requestId = wasSent(reqMsgId.v);
if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) {
// Save rpc_result for processing in the main thread.
QWriteLocker locker(sessionData->haveReceivedMutex());
sessionData->haveReceivedMap().insert(requestId, response); // save rpc_result for processing in main mtp thread
sessionData->haveReceivedResponses().insert(requestId, response);
} else {
DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(reqMsgId.v));
}
@ -1986,10 +1987,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
mtpBuffer update(from - start);
if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime));
// Notify main process about new session - need to get difference.
QWriteLocker locker(sessionData->haveReceivedMutex());
mtpResponseMap &haveReceived(sessionData->haveReceivedMap());
mtpRequestId fakeRequestId = sessionData->nextFakeRequestId();
haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new session - need to get difference
sessionData->haveReceivedUpdates().push_back(SerializedMessage(update));
} return HandleResult::Success;
case mtpc_ping: {
@ -2044,10 +2044,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
mtpBuffer update(end - from);
if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
// Notify main process about the new updates.
QWriteLocker locker(sessionData->haveReceivedMutex());
mtpResponseMap &haveReceived(sessionData->haveReceivedMap());
mtpRequestId fakeRequestId = sessionData->nextFakeRequestId();
haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new updates
sessionData->haveReceivedUpdates().push_back(SerializedMessage(update));
if (cons != mtpc_updatesTooLong && cons != mtpc_updateShortMessage && cons != mtpc_updateShortChatMessage && cons != mtpc_updateShortSentMessage && cons != mtpc_updateShort && cons != mtpc_updatesCombined && cons != mtpc_updates) {
LOG(("Message Error: unknown constructor %1").arg(cons)); // maybe new api?..

View File

@ -119,23 +119,6 @@ inline void mtpRequest::write(mtpBuffer &to) const {
memcpy(to.data() + was, value->constData() + 8, s * sizeof(mtpPrime));
}
class mtpResponse : public mtpBuffer {
public:
mtpResponse() {
}
mtpResponse(const mtpBuffer &v) : mtpBuffer(v) {
}
mtpResponse &operator=(const mtpBuffer &v) {
mtpBuffer::operator=(v);
return (*this);
}
bool needAck() const {
if (size() < 8) return false;
uint32 seqNo = *(uint32*)(constData() + 6);
return (seqNo & 0x01) ? true : false;
}
};
using mtpPreRequestMap = QMap<mtpRequestId, mtpRequest>;
using mtpRequestMap = QMap<mtpMsgId, mtpRequest>;
using mtpMsgIdsSet = QMap<mtpMsgId, bool>;
@ -154,8 +137,6 @@ public:
}
};
using mtpResponseMap = QMap<mtpRequestId, mtpResponse>;
class mtpErrorUnexpected : public Exception {
public:
mtpErrorUnexpected(mtpTypeId typeId, const QString &type) : Exception(QString("MTP Unexpected type id #%1 read in %2").arg(uint32(typeId), 0, 16).arg(type), false) { // maybe api changed?..

View File

@ -29,42 +29,42 @@ void SessionData::clear(Instance *instance) {
RPCCallbackClears clearCallbacks;
{
QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex());
mtpResponseMap::const_iterator end = haveReceived.cend();
clearCallbacks.reserve(haveSent.size() + wereAcked.size());
for (mtpRequestMap::const_iterator i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
mtpRequestId requestId = i.value()->requestId;
if (haveReceived.find(requestId) == end) {
auto receivedResponsesEnd = _receivedResponses.cend();
clearCallbacks.reserve(_haveSent.size() + _wereAcked.size());
for (auto i = _haveSent.cbegin(), e = _haveSent.cend(); i != e; ++i) {
auto requestId = i.value()->requestId;
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
for (mtpRequestIdsMap::const_iterator i = toResend.cbegin(), e = toResend.cend(); i != e; ++i) {
mtpRequestId requestId = i.value();
if (haveReceived.find(requestId) == end) {
for (auto i = _toResend.cbegin(), e = _toResend.cend(); i != e; ++i) {
auto requestId = i.value();
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
for (mtpRequestIdsMap::const_iterator i = wereAcked.cbegin(), e = wereAcked.cend(); i != e; ++i) {
mtpRequestId requestId = i.value();
if (haveReceived.find(requestId) == end) {
for (auto i = _wereAcked.cbegin(), e = _wereAcked.cend(); i != e; ++i) {
auto requestId = i.value();
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
}
{
QWriteLocker locker(haveSentMutex());
haveSent.clear();
_haveSent.clear();
}
{
QWriteLocker locker(toResendMutex());
toResend.clear();
_toResend.clear();
}
{
QWriteLocker locker(wereAckedMutex());
wereAcked.clear();
_wereAcked.clear();
}
{
QWriteLocker locker(receivedIdsMutex());
receivedIds.clear();
_receivedIds.clear();
}
instance->clearCallbacksDelayed(clearCallbacks);
}
@ -494,28 +494,37 @@ void Session::tryToReceive() {
_needToReceive = true;
return;
}
int32 cnt = 0;
while (true) {
mtpRequestId requestId;
mtpResponse response;
auto requestId = mtpRequestId(0);
auto isUpdate = false;
auto message = SerializedMessage();
{
QWriteLocker locker(data.haveReceivedMutex());
mtpResponseMap &responses(data.haveReceivedMap());
mtpResponseMap::iterator i = responses.begin();
if (i == responses.end()) return;
requestId = i.key();
response = i.value();
responses.erase(i);
auto &responses = data.haveReceivedResponses();
auto response = responses.begin();
if (response == responses.cend()) {
auto &updates = data.haveReceivedUpdates();
auto update = updates.begin();
if (update == updates.cend()) {
return;
} else {
message = std::move(*update);
isUpdate = true;
updates.pop_front();
}
} else {
requestId = response.key();
message = std::move(response.value());
responses.erase(response);
}
}
if (requestId <= 0) {
if (isUpdate) {
if (dcWithShift == bareDcId(dcWithShift)) { // call globalCallback only in main session
_instance->globalCallback(response.constData(), response.constData() + response.size());
_instance->globalCallback(message.constData(), message.constData() + message.size());
}
} else {
_instance->execCallback(requestId, response.constData(), response.constData() + response.size());
_instance->execCallback(requestId, message.constData(), message.constData() + message.size());
}
++cnt;
}
}

View File

@ -85,6 +85,16 @@ private:
};
using SerializedMessage = mtpBuffer;
inline bool ResponseNeedsAck(const SerializedMessage &response) {
if (response.size() < 8) {
return false;
}
auto seqNo = *(uint32*)(response.constData() + 6);
return (seqNo & 0x01) ? true : false;
}
class Session;
class SessionData {
public:
@ -94,31 +104,31 @@ public:
void setSession(uint64 session) {
DEBUG_LOG(("MTP Info: setting server_session: %1").arg(session));
QWriteLocker locker(&lock);
QWriteLocker locker(&_lock);
if (_session != session) {
_session = session;
_messagesSent = 0;
}
}
uint64 getSession() const {
QReadLocker locker(&lock);
QReadLocker locker(&_lock);
return _session;
}
bool layerWasInited() const {
QReadLocker locker(&lock);
QReadLocker locker(&_lock);
return _layerInited;
}
void setLayerWasInited(bool was) {
QWriteLocker locker(&lock);
QWriteLocker locker(&_lock);
_layerInited = was;
}
void setSalt(uint64 salt) {
QWriteLocker locker(&lock);
QWriteLocker locker(&_lock);
_salt = salt;
}
uint64 getSalt() const {
QReadLocker locker(&lock);
QReadLocker locker(&_lock);
return _salt;
}
@ -131,7 +141,7 @@ public:
_authKey = key;
DEBUG_LOG(("MTP Info: new auth key set in SessionData, id %1, setting random server_session %2").arg(key ? key->keyId() : 0).arg(session));
QWriteLocker locker(&lock);
QWriteLocker locker(&_lock);
if (_session != session) {
_session = session;
_messagesSent = 0;
@ -141,88 +151,85 @@ public:
}
bool isCheckedKey() const {
QReadLocker locker(&lock);
QReadLocker locker(&_lock);
return _keyChecked;
}
void setCheckedKey(bool checked) {
QWriteLocker locker(&lock);
QWriteLocker locker(&_lock);
_keyChecked = checked;
}
QReadWriteLock *keyMutex() const;
QReadWriteLock *toSendMutex() const {
return &toSendLock;
return &_toSendLock;
}
QReadWriteLock *haveSentMutex() const {
return &haveSentLock;
return &_haveSentLock;
}
QReadWriteLock *toResendMutex() const {
return &toResendLock;
return &_toResendLock;
}
QReadWriteLock *wereAckedMutex() const {
return &wereAckedLock;
return &_wereAckedLock;
}
QReadWriteLock *receivedIdsMutex() const {
return &receivedIdsLock;
return &_receivedIdsLock;
}
QReadWriteLock *haveReceivedMutex() const {
return &haveReceivedLock;
return &_haveReceivedLock;
}
QReadWriteLock *stateRequestMutex() const {
return &stateRequestLock;
return &_stateRequestLock;
}
mtpPreRequestMap &toSendMap() {
return toSend;
return _toSend;
}
const mtpPreRequestMap &toSendMap() const {
return toSend;
return _toSend;
}
mtpRequestMap &haveSentMap() {
return haveSent;
return _haveSent;
}
const mtpRequestMap &haveSentMap() const {
return haveSent;
return _haveSent;
}
mtpRequestIdsMap &toResendMap() { // msgId -> requestId, on which toSend: requestId -> request for resended requests
return toResend;
return _toResend;
}
const mtpRequestIdsMap &toResendMap() const {
return toResend;
return _toResend;
}
ReceivedMsgIds &receivedIdsSet() {
return receivedIds;
return _receivedIds;
}
const ReceivedMsgIds &receivedIdsSet() const {
return receivedIds;
return _receivedIds;
}
mtpRequestIdsMap &wereAckedMap() {
return wereAcked;
return _wereAcked;
}
const mtpRequestIdsMap &wereAckedMap() const {
return wereAcked;
return _wereAcked;
}
mtpResponseMap &haveReceivedMap() {
return haveReceived;
QMap<mtpRequestId, SerializedMessage> &haveReceivedResponses() {
return _receivedResponses;
}
const mtpResponseMap &haveReceivedMap() const {
return haveReceived;
const QMap<mtpRequestId, SerializedMessage> &haveReceivedResponses() const {
return _receivedResponses;
}
QList<SerializedMessage> &haveReceivedUpdates() {
return _receivedUpdates;
}
const QList<SerializedMessage> &haveReceivedUpdates() const {
return _receivedUpdates;
}
mtpMsgIdsSet &stateRequestMap() {
return stateRequest;
return _stateRequest;
}
const mtpMsgIdsSet &stateRequestMap() const {
return stateRequest;
}
mtpRequestId nextFakeRequestId() { // must be locked by haveReceivedMutex()
if (haveReceived.isEmpty() || haveReceived.cbegin().key() > 0) {
_fakeRequestId = -2000000000;
} else {
++_fakeRequestId;
}
return _fakeRequestId;
return _stateRequest;
}
Session *owner() {
@ -233,8 +240,8 @@ public:
}
uint32 nextRequestSeqNumber(bool needAck = true) {
QWriteLocker locker(&lock);
uint32 result(_messagesSent);
QWriteLocker locker(&_lock);
auto result = _messagesSent;
_messagesSent += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
@ -246,7 +253,6 @@ private:
uint64 _salt = 0;
uint32 _messagesSent = 0;
mtpRequestId _fakeRequestId = -2000000000;
Session *_owner = nullptr;
@ -254,23 +260,25 @@ private:
bool _keyChecked = false;
bool _layerInited = false;
mtpPreRequestMap toSend; // map of request_id -> request, that is waiting to be sent
mtpRequestMap haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers
mtpRequestIdsMap toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent
ReceivedMsgIds receivedIds; // set of received msg_id's, for checking new msg_ids
mtpRequestIdsMap wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack
mtpResponseMap haveReceived; // map of request_id -> response, that should be processed in other thread
mtpMsgIdsSet stateRequest; // set of msg_id's, whose state should be requested
mtpPreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent
mtpRequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers
mtpRequestIdsMap _toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent
ReceivedMsgIds _receivedIds; // set of received msg_id's, for checking new msg_ids
mtpRequestIdsMap _wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack
mtpMsgIdsSet _stateRequest; // set of msg_id's, whose state should be requested
QMap<mtpRequestId, SerializedMessage> _receivedResponses; // map of request_id -> response that should be processed in the main thread
QList<SerializedMessage> _receivedUpdates; // list of updates that should be processed in the main thread
// mutexes
mutable QReadWriteLock lock;
mutable QReadWriteLock toSendLock;
mutable QReadWriteLock haveSentLock;
mutable QReadWriteLock toResendLock;
mutable QReadWriteLock receivedIdsLock;
mutable QReadWriteLock wereAckedLock;
mutable QReadWriteLock haveReceivedLock;
mutable QReadWriteLock stateRequestLock;
mutable QReadWriteLock _lock;
mutable QReadWriteLock _toSendLock;
mutable QReadWriteLock _haveSentLock;
mutable QReadWriteLock _toResendLock;
mutable QReadWriteLock _receivedIdsLock;
mutable QReadWriteLock _wereAckedLock;
mutable QReadWriteLock _haveReceivedLock;
mutable QReadWriteLock _stateRequestLock;
};