Handle sent containers separately.

This commit is contained in:
John Preston 2019-12-02 13:32:09 +03:00
parent 9caac426ef
commit efaa3ba453
4 changed files with 154 additions and 154 deletions

View File

@ -56,8 +56,7 @@ constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000);
// when resending request or checking its state.
constexpr auto kSendStateRequestWaiting = crl::time(1000);
// Container lives 10 minutes in haveSent map.
constexpr auto kContainerLives = TimeId(600);
constexpr auto kSentContainerLives = 600 * crl::time(1000);
using namespace details;
@ -216,8 +215,8 @@ int16 Connection::getProtocolDcId() const {
}
void Connection::checkSentRequests() {
// Remove very old (10 minutes) containers and resend requests.
auto removingIds = std::vector<mtpMsgId>();
clearOldContainers();
auto restarting = false;
auto requesting = false;
{
@ -229,16 +228,6 @@ void Connection::checkSentRequests() {
for (const auto &[msgId, request] : haveSent) {
if (request.isStateRequest()) {
continue;
} else if (request.isSentContainer()) {
if (now > request->lastSentTime + kContainerLives) {
removingIds.push_back(msgId);
DEBUG_LOG(("MTP Info: Removing old container %1, "
"sent: %2, now: %3, current unixtime: %4"
).arg(msgId
).arg(request->lastSentTime
).arg(now
).arg(base::unixtime::now()));
}
} else if (request->lastSentTime + checkAfter < now) {
// Need to check state.
request->lastSentTime = now;
@ -250,13 +239,6 @@ void Connection::checkSentRequests() {
}
}
}
if (!removingIds.empty()) {
QWriteLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (const auto msgId : removingIds) {
haveSent.remove(msgId);
}
}
if (restarting) {
DEBUG_LOG(("MTP Info: "
"Request state while key is not bound, restarting."));
@ -266,6 +248,23 @@ void Connection::checkSentRequests() {
}
}
void Connection::clearOldContainers() {
const auto now = crl::now();
for (auto i = _sentContainers.begin(); i != _sentContainers.end();) {
if (now > i->second.sent + kSentContainerLives) {
DEBUG_LOG(("MTP Info: Removing old container %1, "
"sent: %2, now: %3, current unixtime: %4"
).arg(i->first
).arg(i->second.sent
).arg(now
).arg(base::unixtime::now()));
i = _sentContainers.erase(i);
} else {
++i;
}
}
}
void Connection::destroyAllConnections() {
clearUnboundKeyCreator();
_waitForBetterTimer.cancel();
@ -452,34 +451,29 @@ mtpMsgId Connection::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
haveSent.erase(k);
haveSent.emplace(newId, request);
}
for (const auto &[requestId, sent] : haveSent) {
if (sent.isSentContainer()) {
const auto ids = (mtpMsgId *)(sent->data() + 8);
for (uint32 i = 0, l = (sent->size() - 8) >> 1; i < l; ++i) {
if (ids[i] == oldMsgId) {
ids[i] = newId;
}
for (auto &[msgId, container] : _sentContainers) {
for (auto &innerMsgId : container.messages) {
if (innerMsgId == oldMsgId) {
innerMsgId = newId;
}
}
}
request.setMsgId(newId);
request.setSeqNo(nextRequestSeqNumber(request.needAck()));
return newId;
}
mtpMsgId Connection::placeToContainer(
SentContainer &sentIdsWrap,
SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
mtpMsgId *&haveSentArr,
SerializedRequest &req) {
const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
*(haveSentArr++) = msgId;
sentIdsWrap.messages.push_back(msgId);
uint32 from = toSendRequest->size(), len = req.messageSize();
toSendRequest->resize(from + len);
@ -732,7 +726,7 @@ void Connection::tryToSend() {
}
} else { // send in container
bool willNeedInit = false;
uint32 containerSize = 1 + 1, idsWrapSize = (toSendCount << 1); // cons + vector size, idsWrapSize - size of "request-like" wrap for msgId vector
uint32 containerSize = 1 + 1; // cons + vector size
if (pingRequest) containerSize += pingRequest.messageSize();
if (ackRequest) containerSize += ackRequest.messageSize();
if (resendRequest) containerSize += resendRequest.messageSize();
@ -767,27 +761,26 @@ void Connection::tryToSend() {
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
// prepare "request-like" wrap for msgId vector
auto haveSentIdsWrap = SerializedRequest::Prepare(idsWrapSize);
haveSentIdsWrap->isContainerIdsWrap = true;
haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize);
auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8);
// prepare sent container
auto sentIdsWrap = SentContainer();
sentIdsWrap.sent = crl::now();
sentIdsWrap.messages.reserve(toSendCount);
if (bindDcKeyRequest) {
_bindMsgId = placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
false,
haveSentArr,
bindDcKeyRequest);
needAnyResponse = true;
}
if (pingRequest) {
_pingMsgId = placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
forceNewMsgId,
haveSentArr,
pingRequest);
needAnyResponse = true;
}
@ -799,10 +792,10 @@ void Connection::tryToSend() {
request,
bigMsgId,
forceNewMsgId);
sentIdsWrap.messages.push_back(msgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
*(haveSentArr++) = msgId;
bool added = false;
if (request->requestId) {
if (request.needAck()) {
@ -838,22 +831,46 @@ void Connection::tryToSend() {
}
}
if (stateRequest) {
const auto msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest);
const auto msgId = placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
forceNewMsgId,
stateRequest);
Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, stateRequest);
}
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest);
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest);
if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest);
if (resendRequest) {
placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
forceNewMsgId,
resendRequest);
}
if (ackRequest) {
placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
forceNewMsgId,
ackRequest);
}
if (httpWaitRequest) {
placeToContainer(
sentIdsWrap,
toSendRequest,
bigMsgId,
forceNewMsgId,
httpWaitRequest);
}
toSend.clear();
const auto containerMsgId = prepareToSend(
toSendRequest,
bigMsgId,
forceNewMsgId);
*(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId;
Assert(!haveSent.contains(containerMsgId));
haveSent.emplace(containerMsgId, haveSentIdsWrap);
toSend.clear();
_sentContainers.emplace(containerMsgId, std::move(sentIdsWrap));
}
}
sendSecureRequest(std::move(toSendRequest), needAnyResponse);
@ -1452,29 +1469,15 @@ Connection::HandleResult Connection::handleOneReceived(
|| (errorCode == 64); // bad container
if (errorCode == 64) { // bad container!
if (Logs::DebugEnabled()) {
SerializedRequest request;
{
QWriteLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
const auto i = haveSent.find(resendId);
if (i == haveSent.end()) {
LOG(("Message Error: Container not found!"));
} else {
request = i->second;
}
}
if (request) {
if (request.isSentContainer()) {
QStringList lst;
const auto ids = (const mtpMsgId*)(request->constData() + 8);
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
lst.push_back(QString::number(ids[i]));
}
LOG(("Message Info: bad container received! messages: %1").arg(lst.join(',')));
} else {
LOG(("Message Error: bad container received, but request is not a container!"));
const auto i = _sentContainers.find(resendId);
if (i == _sentContainers.end()) {
LOG(("Message Error: Container not found!"));
} else {
auto idsList = QStringList();
for (const auto innerMsgId : i->second.messages) {
idsList.push_back(QString::number(innerMsgId));
}
LOG(("Message Info: bad container received! messages: %1").arg(idsList.join(',')));
}
}
}
@ -1613,7 +1616,7 @@ Connection::HandleResult Connection::handleOneReceived(
} else {
MTPMsgResendReq request;
if (!request.read(rFrom, rEnd)) {
LOG(("Message Error: could not parse sent msgs_state_req"));
LOG(("Message Error: could not parse sent msgs_resend_req"));
return HandleResult::ParseError;
}
handleMsgsStates(request.c_msg_resend_req().vmsg_ids().v, states, toAck);
@ -1962,64 +1965,58 @@ void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (uint32 i = 0; i < idsCount; ++i) {
mtpMsgId msgId = ids[i].v;
const auto req = haveSent.find(msgId);
if (req != haveSent.end()) {
if (req->second.isSentContainer()) {
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v));
uint32 inContCount = (req->second->size() - 8) / 2;
const mtpMsgId *inContId = (const mtpMsgId *)(req->second->constData() + 8);
toAckMore.reserve(toAckMore.size() + inContCount);
for (uint32 j = 0; j < inContCount; ++j) {
toAckMore.push_back(MTP_long(*(inContId++)));
}
haveSent.erase(req);
} else {
const auto requestId = req->second->requestId;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(requestId);
}
if (moveToAcked) {
_ackedIds.emplace(msgId, requestId);
haveSent.erase(req);
} else {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
}
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
const auto reqIt = _resendingIds.find(msgId);
if (reqIt != _resendingIds.end()) {
const auto reqId = reqIt->second;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
QWriteLocker locker4(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto req = toSend.find(reqId);
if (req != toSend.cend()) {
_ackedIds.emplace(msgId, req->second->requestId);
if (req->second->requestId != reqId) {
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req->second->requestId));
} else {
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId));
}
toSend.erase(req);
} else {
DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId));
}
_resendingIds.erase(reqIt);
} else {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
for (const auto &wrappedMsgId : ids) {
const auto msgId = wrappedMsgId.v;
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(msgId));
const auto &list = i->second.messages;
toAckMore.reserve(toAckMore.size() + list.size());
for (const auto msgId : list) {
toAckMore.push_back(MTP_long(msgId));
}
_sentContainers.erase(i);
continue;
}
if (const auto i = haveSent.find(msgId); i != end(haveSent)) {
const auto requestId = i->second->requestId;
if (!byResponse && _instance->hasCallbacks(requestId)) {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
continue;
}
haveSent.erase(i);
_ackedIds.emplace(msgId, requestId);
continue;
}
DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
const auto requestId = i->second;
if (!byResponse && _instance->hasCallbacks(requestId)) {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
continue;
}
_resendingIds.erase(i);
QWriteLocker locker4(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto j = toSend.find(requestId);
if (j == end(toSend)) {
DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId).arg(requestId));
continue;
}
if (j->second->requestId != requestId) {
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(requestId).arg(j->second->requestId));
} else {
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(requestId));
}
toSend.erase(j);
_ackedIds.emplace(msgId, j->second->requestId);
continue;
}
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
}
}
@ -2100,6 +2097,16 @@ void Connection::resend(
}
});
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
DEBUG_LOG(("Message Info: resending container, msgId %1").arg(msgId));
const auto ids = std::move(i->second.messages);
_sentContainers.erase(i);
for (const auto innerMsgId : ids) {
resend(innerMsgId, -1, true);
}
return;
}
auto lock = QWriteLocker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
auto i = haveSent.find(msgId);
@ -2110,14 +2117,7 @@ void Connection::resend(
haveSent.erase(i);
lock.unlock();
// For container just resend all messages we can.
if (request.isSentContainer()) {
DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId));
const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8);
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
resend(ids[i], -1, true);
}
} else if (!request.isStateRequest()) {
if (!request.isStateRequest()) {
request->lastSentTime = crl::now();
request->forceSendInContainer = forceContainer;
_resendingIds.emplace(msgId, request->requestId);
@ -2135,7 +2135,7 @@ void Connection::resendAll() {
const auto &haveSent = _sessionData->haveSentMap();
toResend.reserve(haveSent.size());
for (const auto &[msgId, request] : haveSent) {
if (!request.isSentContainer() && !request.isStateRequest()) {
if (!request.isStateRequest()) {
toResend.push_back(msgId);
}
}
@ -2549,6 +2549,16 @@ mtpRequestId Connection::wasSent(mtpMsgId msgId) const {
if (msgId == _pingMsgId || msgId == _bindMsgId) {
return mtpRequestId(0xFFFFFFFF);
}
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
return i->second;
}
if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) {
return i->second;
}
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
return mtpRequestId(0xFFFFFFFF);
}
{
QReadLocker locker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
@ -2559,12 +2569,6 @@ mtpRequestId Connection::wasSent(mtpMsgId msgId) const {
: mtpRequestId(0xFFFFFFFF);
}
}
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
return i->second;
}
if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) {
return i->second;
}
return 0;
}

View File

@ -61,7 +61,10 @@ private:
ConnectionPointer data;
int priority = 0;
};
struct SentContainer {
crl::time sent = 0;
std::vector<mtpMsgId> messages;
};
enum class HandleResult {
Success,
Ignored,
@ -100,12 +103,13 @@ private:
[[nodiscard]] int16 getProtocolDcId() const;
void checkSentRequests();
void clearOldContainers();
mtpMsgId placeToContainer(
SentContainer &sentIdsWrap,
details::SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
mtpMsgId *&haveSentArr,
details::SerializedRequest &req);
mtpMsgId prepareToSend(
details::SerializedRequest &request,
@ -216,6 +220,7 @@ private:
details::ReceivedIdsManager _receivedMessageIds;
base::flat_map<mtpMsgId, mtpRequestId> _resendingIds;
base::flat_map<mtpMsgId, mtpRequestId> _ackedIds;
base::flat_map<mtpMsgId, SentContainer> _sentContainers;
std::unique_ptr<details::BoundKeyCreator> _keyCreator;
mtpMsgId _bindMsgId = 0;

View File

@ -124,12 +124,6 @@ uint32 SerializedRequest::messageSize() const {
return kMessageIdInts + kSeqNoInts + kMessageLengthInts + ints;
}
bool SerializedRequest::isSentContainer() const {
Expects(_data != nullptr);
return _data->isContainerIdsWrap;
}
bool SerializedRequest::isStateRequest() const {
Expects(_data != nullptr);
Expects(_data->size() > kMessageBodyPosition);

View File

@ -67,8 +67,6 @@ public:
void addPadding(bool extended, bool old);
[[nodiscard]] uint32 messageSize() const;
// "request-like" wrap for msgIds vector
[[nodiscard]] bool isSentContainer() const;
[[nodiscard]] bool isStateRequest() const;
[[nodiscard]] bool needAck() const;
@ -94,7 +92,6 @@ public:
mtpRequestId requestId = 0;
bool needsLayer = false;
bool forceSendInContainer = false;
bool isContainerIdsWrap = false;
};