prepared ping_delay_disconnect, moved http_wait to any request through http transport

This commit is contained in:
John Preston 2015-03-12 13:28:10 +03:00
parent e1f5c79b97
commit 4f705c3712
6 changed files with 111 additions and 54 deletions

View File

@ -52,6 +52,10 @@ enum {
MTPDebugBufferSize = 1024 * 1024, // 1 mb start size
MTPPingDelayDisconnect = 60, // 1 min
MTPPingSendAfterAuto = 30, // send new ping starting from 30 seconds (add to existing container)
MTPPingSendAfter = 45, // send new ping after 45 seconds without ping
MaxSelectedItems = 100,
MaxPhoneTailLength = 18, // rest of the phone number, without country code (seen 12 at least)

View File

@ -563,7 +563,7 @@ public:
if (chIsBad(ch) || ch.isLowSurrogate()) {
skip = true;
} else if (isDiac) {
if (lastSkipped || lastSpace || emoji || ++diacs > chMaxDiacAfterSymbol()) {
if (lastSkipped || emoji || ++diacs > chMaxDiacAfterSymbol()) {
skip = true;
}
} else if (isSpace && lastSpace && !isNewLine) {

View File

@ -622,8 +622,6 @@ tcpNonce(MTP::nonce<MTPint128>()), httpNonce(MTP::nonce<MTPint128>()), _tcpTimeo
sock.moveToThread(thread);
sock.setProxy(QNetworkProxy(QNetworkProxy::NoProxy));
connect(&sock, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
// connect(&sock, SIGNAL(connected()), this, SIGNAL(connected()));
// connect(&sock, SIGNAL(disconnected()), this, SIGNAL(disconnected()));
connect(&sock, SIGNAL(connected()), this, SLOT(onSocketConnected()));
connect(&sock, SIGNAL(disconnected()), this, SLOT(onSocketDisconnected()));
}
@ -874,6 +872,10 @@ void MTPautoConnection::socketPacket(mtpPrime *packet, uint32 size) {
}
}
bool MTPautoConnection::usingHttpWait() {
return (status == UsingHttp);
}
bool MTPautoConnection::needHttpWait() {
return (status == UsingHttp) ? requests.isEmpty() : false;
}
@ -1044,6 +1046,10 @@ void MTPhttpConnection::requestFinished(QNetworkReply *reply) {
}
}
bool MTPhttpConnection::usingHttpWait() {
return true;
}
bool MTPhttpConnection::needHttpWait() {
return requests.isEmpty();
}
@ -1088,9 +1094,10 @@ MTProtoConnectionPrivate::MTProtoConnectionPrivate(QThread *thread, MTProtoConne
, oldConnection(true)
, receiveDelay(MTPMinReceiveDelay)
, firstSentAt(-1)
, pingId(0)
, toSendPingId(0)
, pingMsgId(0)
, _pingId(0)
, _pingIdToSend(0)
, _pingSent(0)
, _pingMsgId(0)
, restarted(false)
, keyId(0)
, sessionData(data)
@ -1100,6 +1107,7 @@ MTProtoConnectionPrivate::MTProtoConnectionPrivate(QThread *thread, MTProtoConne
oldConnectionTimer.moveToThread(thread);
connCheckTimer.moveToThread(thread);
_pingSender.moveToThread(thread);
retryTimer.moveToThread(thread);
moveToThread(thread);
@ -1122,6 +1130,7 @@ MTProtoConnectionPrivate::MTProtoConnectionPrivate(QThread *thread, MTProtoConne
connect(&retryTimer, SIGNAL(timeout()), this, SLOT(retryByTimer()));
connect(&connCheckTimer, SIGNAL(timeout()), this, SLOT(onBadConnection()));
connect(&oldConnectionTimer, SIGNAL(timeout()), this, SLOT(onOldConnection()));
connect(&_pingSender, SIGNAL(timeout()), this, SLOT(onPingSender()));
connect(sessionData->owner(), SIGNAL(authKeyCreated()), this, SLOT(updateAuthKey()), Qt::QueuedConnection);
connect(sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection);
@ -1138,7 +1147,7 @@ MTProtoConnectionPrivate::MTProtoConnectionPrivate(QThread *thread, MTProtoConne
connect(this, SIGNAL(needToSendAsync()), sessionData->owner(), SLOT(needToResumeAndSend()), Qt::QueuedConnection);
connect(this, SIGNAL(sendAnythingAsync(quint64)), sessionData->owner(), SLOT(sendAnything(quint64)), Qt::QueuedConnection);
connect(this, SIGNAL(sendHttpWaitAsync()), sessionData->owner(), SLOT(sendHttpWait()), Qt::QueuedConnection);
connect(this, SIGNAL(sendHttpWaitAsync()), sessionData->owner(), SLOT(sendAnything()), Qt::QueuedConnection);
connect(this, SIGNAL(sendPongAsync(quint64,quint64)), sessionData->owner(), SLOT(sendPong(quint64,quint64)), Qt::QueuedConnection);
connect(this, SIGNAL(sendMsgsStateInfoAsync(quint64, QByteArray)), sessionData->owner(), SLOT(sendMsgsStateInfo(quint64,QByteArray)), Qt::QueuedConnection);
connect(this, SIGNAL(resendAsync(quint64,quint64,bool,bool)), sessionData->owner(), SLOT(resend(quint64,quint64,bool,bool)), Qt::QueuedConnection);
@ -1412,32 +1421,48 @@ void MTProtoConnectionPrivate::tryToSend() {
}
bool needsLayer = !sessionData->layerWasInited();
bool prependOnly = false;
int32 state = getState();
bool prependOnly = (state != MTProtoConnection::Connected);
mtpRequest pingRequest;
if (toSendPingId) {
MTPPing ping(MTPping(MTP_long(toSendPingId)));
prependOnly = (getState() != MTProtoConnection::Connected);
DEBUG_LOG(("MTP Info: sending ping, ping_id: %1, prepend_only: %2").arg(ping.vping_id.v).arg(prependOnly ? "[TRUE]" : "[FALSE]"));
if (dc < _mtp_internal::dcShift) { // main session
if (!prependOnly && !_pingIdToSend && !_pingId && _pingSent + (MTPPingSendAfterAuto * 1000ULL) <= getms(true)) {
//_pingIdToSend = MTP::nonce<mtpPingId>(); // temp disable ping_delay_disconnect, needed only for main dc session
}
}
if (_pingIdToSend) {
if (prependOnly || true) {
MTPPing ping(MTPping(MTP_long(_pingIdToSend)));
uint32 pingSize = ping.innerLength() >> 2; // copy from MTProtoSession::send
pingRequest = mtpRequestData::prepare(pingSize);
ping.write(*pingRequest);
DEBUG_LOG(("MTP Info: sending ping, ping_id: %1").arg(_pingIdToSend));
} else {
MTPPing_delay_disconnect ping(MTP_long(_pingIdToSend), MTP_int(MTPPingDelayDisconnect));
uint32 pingSize = ping.innerLength() >> 2; // copy from MTProtoSession::send
pingRequest = mtpRequestData::prepare(pingSize);
ping.write(*pingRequest);
DEBUG_LOG(("MTP Info: sending ping_delay_disconnect, ping_id: %1").arg(_pingIdToSend));
}
pingRequest->msDate = getms(true); // > 0 - can send without container
_pingSent = pingRequest->msDate = getms(true); // > 0 - can send without container
pingRequest->requestId = 0; // dont add to haveSent / wereAcked maps
pingId = toSendPingId;
toSendPingId = 0;
if (dc < _mtp_internal::dcShift && !prependOnly) { // main session
// _pingSender.start(MTPPingSendAfter * 1000);
}
_pingId = _pingIdToSend;
_pingIdToSend = 0;
} else {
int32 st = getState();
DEBUG_LOG(("MTP Info: dc %1 trying to send after ping, state: %2").arg(dc).arg(st));
if (st != MTProtoConnection::Connected) {
if (prependOnly) {
DEBUG_LOG(("MTP Info: dc %1 not sending, waiting for Connected state, state: %2").arg(dc).arg(state));
return; // just do nothing, if is not connected yet
} else {
DEBUG_LOG(("MTP Info: dc %1 trying to send after ping, state: %2").arg(dc).arg(state));
}
}
mtpRequest ackRequest, resendRequest, stateRequest;
mtpRequest ackRequest, resendRequest, stateRequest, httpWaitRequest;
if (!prependOnly && !ackRequestData.isEmpty()) {
MTPMsgsAck ack(MTP_msgs_ack(MTP_vector<MTPlong>(ackRequestData)));
@ -1482,6 +1507,15 @@ void MTProtoConnectionPrivate::tryToSend() {
stateRequest->msDate = getms(true); // > 0 - can send without container
stateRequest->requestId = reqid();// add to haveSent / wereAcked maps, but don't add to requestMap
}
if (conn->usingHttpWait()) {
MTPHttpWait req(MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000)));
httpWaitRequest = mtpRequestData::prepare(req.innerLength() >> 2);
req.write(*httpWaitRequest);
httpWaitRequest->msDate = getms(true); // > 0 - can send without container
httpWaitRequest->requestId = 0; // dont add to haveSent / wereAcked maps
}
}
MTPInitConnection<mtpRequest> initWrapperImpl, *initWrapper = &initWrapperImpl;
@ -1505,10 +1539,11 @@ void MTProtoConnectionPrivate::tryToSend() {
if (ackRequest) ++toSendCount;
if (resendRequest) ++toSendCount;
if (stateRequest) ++toSendCount;
if (httpWaitRequest) ++toSendCount;
if (!toSendCount) return; // nothing to send
mtpRequest first = pingRequest ? pingRequest : (ackRequest ? ackRequest : (resendRequest ? resendRequest : (stateRequest ? stateRequest : toSend.cbegin().value())));
mtpRequest first = pingRequest ? pingRequest : (ackRequest ? ackRequest : (resendRequest ? resendRequest : (stateRequest ? stateRequest : (httpWaitRequest ? httpWaitRequest : toSend.cbegin().value()))));
if (toSendCount == 1 && first->msDate > 0) { // if can send without container
toSendRequest = first;
if (!prependOnly) {
@ -1518,7 +1553,7 @@ void MTProtoConnectionPrivate::tryToSend() {
mtpMsgId msgId = prepareToSend(toSendRequest, msgid());
if (pingRequest) {
pingMsgId = msgId;
_pingMsgId = msgId;
needAnyResponse = true;
} else if (resendRequest || stateRequest) {
needAnyResponse = true;
@ -1566,6 +1601,7 @@ void MTProtoConnectionPrivate::tryToSend() {
if (ackRequest) containerSize += mtpRequestData::messageSize(ackRequest);
if (resendRequest) containerSize += mtpRequestData::messageSize(resendRequest);
if (stateRequest) containerSize += mtpRequestData::messageSize(stateRequest);
if (httpWaitRequest) containerSize += mtpRequestData::messageSize(httpWaitRequest);
for (mtpPreRequestMap::iterator i = toSend.begin(), e = toSend.end(); i != e; ++i) {
containerSize += mtpRequestData::messageSize(i.value());
if (needsLayer && i.value()->needsLayer) {
@ -1598,7 +1634,7 @@ void MTProtoConnectionPrivate::tryToSend() {
mtpMsgId *haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8);
if (pingRequest) {
pingMsgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, pingRequest);
_pingMsgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, pingRequest);
needAnyResponse = true;
} else if (resendRequest || stateRequest) {
needAnyResponse = true;
@ -1649,6 +1685,7 @@ void MTProtoConnectionPrivate::tryToSend() {
}
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, resendRequest);
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, ackRequest);
if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, httpWaitRequest);
mtpMsgId contMsgId = prepareToSend(toSendRequest, bigMsgId);
*(mtpMsgId*)(haveSentIdsWrap->data() + 4) = contMsgId;
@ -1695,9 +1732,9 @@ void MTProtoConnectionPrivate::socketStart(bool afterConfig) {
onConnected();
return;
}
setState(MTProtoConnection::Connecting);
pingId = pingMsgId = toSendPingId = 0;
_pingId = _pingMsgId = _pingIdToSend = _pingSent = 0;
_pingSender.stop();
const mtpDcOption *dcOption = 0;
const mtpDcOptions &options(cDcOptions());
@ -1801,6 +1838,19 @@ void MTProtoConnectionPrivate::onOldConnection() {
DEBUG_LOG(("This connection marked as old! delay now %1ms").arg(receiveDelay));
}
void MTProtoConnectionPrivate::onPingSender() {
if (_pingId) {
if (_pingSent + (MTPPingSendAfter - 1) * 1000 < getms(true)) {
LOG(("Could not send ping for MTPPingSendAfter seconds, restarting.."));
return restart();
} else {
_pingSender.start(_pingSent + (MTPPingSendAfter * 1000) - getms(true));
}
} else {
emit needToSendAsync();
}
}
void MTProtoConnectionPrivate::onBadConnection() {
if (cConnectionType() != dbictAuto && cConnectionType() != dbictTcpProxy) {
return;
@ -2102,6 +2152,9 @@ int32 MTProtoConnectionPrivate::handleOneReceived(const mtpPrime *from, const mt
LOG(("Message Info: bad message notification received (error_code %3) for msg_id = %1, seq_no = %2").arg(data.vbad_msg_id.v).arg(data.vbad_msg_seqno.v).arg(data.verror_code.v));
mtpMsgId resendId = data.vbad_msg_id.v;
if (resendId == _pingMsgId) {
_pingId = 0;
}
int32 errorCode = data.verror_code.v;
if (errorCode == 16 || errorCode == 17 || errorCode == 32 || errorCode == 33 || errorCode == 64) { // can handle
bool needResend = (errorCode == 16 || errorCode == 17); // bad msg_id
@ -2175,7 +2228,9 @@ int32 MTProtoConnectionPrivate::handleOneReceived(const mtpPrime *from, const mt
DEBUG_LOG(("Message Info: bad server salt received (error_code %4) for msg_id = %1, seq_no = %2, new salt: %3").arg(data.vbad_msg_id.v).arg(data.vbad_msg_seqno.v).arg(data.vnew_server_salt.v).arg(data.verror_code.v));
mtpMsgId resendId = data.vbad_msg_id.v;
if (!wasSent(resendId)) {
if (resendId == _pingMsgId) {
_pingId = 0;
} else if (!wasSent(resendId)) {
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId));
return (badTime ? 0 : 1);
}
@ -2483,8 +2538,8 @@ int32 MTProtoConnectionPrivate::handleOneReceived(const mtpPrime *from, const mt
DEBUG_LOG(("Message Error: such msg_id %1 ping_id %2 was not sent recently").arg(data.vmsg_id.v).arg(data.vping_id.v));
return 0;
}
if (data.vping_id.v == pingId) {
pingId = 0;
if (data.vping_id.v == _pingId) {
_pingId = 0;
} else {
DEBUG_LOG(("Message Info: just pong.."));
}
@ -2732,13 +2787,13 @@ void MTProtoConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, con
}
void MTProtoConnectionPrivate::resend(quint64 msgId, quint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) {
if (msgId == pingMsgId) return;
if (msgId == _pingMsgId) return;
emit resendAsync(msgId, msCanWait, forceContainer, sendMsgStateInfo);
}
void MTProtoConnectionPrivate::resendMany(QVector<quint64> msgIds, quint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) {
for (int32 i = 0, l = msgIds.size(); i < l; ++i) {
if (msgIds.at(i) == pingMsgId) {
if (msgIds.at(i) == _pingMsgId) {
msgIds.remove(i);
--l;
}
@ -3218,14 +3273,14 @@ void MTProtoConnectionPrivate::authKeyCreated() {
}
}
toSendPingId = MTP::nonce<uint64>(); // get server_salt
_pingIdToSend = MTP::nonce<uint64>(); // get server_salt
emit needToSendAsync();
}
void MTProtoConnectionPrivate::clearAuthKeyData() {
if (authKeyData) {
#ifdef Q_OS_WIN // TODO
#ifdef Q_OS_WIN
SecureZeroMemory(authKeyData, sizeof(AuthKeyCreateData));
if (!authKeyStrings->dh_prime.isEmpty()) SecureZeroMemory(authKeyStrings->dh_prime.data(), authKeyStrings->dh_prime.size());
if (!authKeyStrings->g_a.isEmpty()) SecureZeroMemory(authKeyStrings->g_a.data(), authKeyStrings->g_a.size());
@ -3371,7 +3426,7 @@ bool MTProtoConnectionPrivate::sendRequest(mtpRequest &request, bool needAnyResp
}
mtpRequestId MTProtoConnectionPrivate::wasSent(mtpMsgId msgId) const {
if (msgId == pingMsgId) return mtpRequestId(0xFFFFFFFF);
if (msgId == _pingMsgId) return mtpRequestId(0xFFFFFFFF);
{
QReadLocker locker(sessionData->haveSentMutex());
const mtpRequestMap &haveSent(sessionData->haveSentMap());

View File

@ -116,6 +116,9 @@ public:
virtual void disconnectFromServer() = 0;
virtual void connectToServer(const QString &addr, int32 port) = 0;
virtual bool isConnected() = 0;
virtual bool usingHttpWait() {
return false;
}
virtual bool needHttpWait() {
return false;
}
@ -181,6 +184,7 @@ public:
void disconnectFromServer();
void connectToServer(const QString &addr, int32 port);
bool isConnected();
bool usingHttpWait();
bool needHttpWait();
int32 debugState() const;
@ -268,6 +272,7 @@ public:
void disconnectFromServer();
void connectToServer(const QString &addr, int32 port);
bool isConnected();
bool usingHttpWait();
bool needHttpWait();
int32 debugState() const;
@ -325,6 +330,7 @@ public slots:
void restartNow();
void restart(bool maybeBadKey = false);
void onPingSender();
void onBadConnection();
void onOldConnection();
void onSentSome(uint64 size);
@ -399,8 +405,10 @@ private:
// remove msgs with such ids from sessionData->haveSent, add to sessionData->wereAcked
void requestsAcked(const QVector<MTPlong> &ids, bool byResponse = false);
mtpPingId pingId, toSendPingId;
mtpMsgId pingMsgId;
mtpPingId _pingId, _pingIdToSend;
uint64 _pingSent;
mtpMsgId _pingMsgId;
SingleTimer _pingSender;
void resend(quint64 msgId, quint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false);
void resendMany(QVector<quint64> msgIds, quint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false);

View File

@ -66,16 +66,11 @@ void MTPSessionData::clear() {
MTProtoSession::MTProtoSession() : data(this), dcId(0), dc(0), msSendCall(0), msWait(0) {
}
void MTProtoSession::start(int32 dcenter, uint32 connects) {
void MTProtoSession::start(int32 dcenter) {
if (dcId) {
DEBUG_LOG(("Session Info: MTProtoSession::start called on already started session"));
return;
}
if (connects < 1) {
connects = cConnectionsInSession();
} else if (connects > 4) {
connects = 4;
}
msSendCall = msWait = 0;
@ -86,8 +81,8 @@ void MTProtoSession::start(int32 dcenter, uint32 connects) {
MTProtoDCMap &dcs(mtpDCMap());
connections.reserve(connects);
for (uint32 i = 0; i < connects; ++i) {
connections.reserve(cConnectionsInSession());
for (uint32 i = 0; i < cConnectionsInSession(); ++i) {
connections.push_back(new MTProtoConnection());
dcId = connections.back()->start(&data, dcenter);
if (!dcId) {
@ -179,10 +174,6 @@ void MTProtoSession::needToResumeAndSend() {
emit needToSend();
}
void MTProtoSession::sendHttpWait() {
send(MTPHttpWait(MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000))), RPCResponseHandler(), 50);
}
void MTProtoSession::sendPong(quint64 msgId, quint64 pingId) {
send(MTP_pong(MTP_long(msgId), MTP_long(pingId)));
}

View File

@ -221,7 +221,7 @@ public:
MTProtoSession();
void start(int32 dcenter = 0, uint32 connects = 0);
void start(int32 dcenter = 0);
void restart();
void stop();
@ -265,8 +265,7 @@ public slots:
void onConnectionStateChange(qint32 newState);
void onResetDone();
void sendAnything(quint64 msCanWait);
void sendHttpWait();
void sendAnything(quint64 msCanWait = 0);
void sendPong(quint64 msgId, quint64 pingId);
void sendMsgsStateInfo(quint64 msgId, QByteArray data);