tdesktop/Telegram/SourceFiles/mtproto/mtpFileLoader.cpp

442 lines
12 KiB
C++
Raw Normal View History

/*
This file is part of Telegram Desktop,
an unofficial desktop messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014 John Preston, https://tdesktop.com
*/
#include "stdafx.h"
#include "mainwidget.h"
#include "window.h"
#include "application.h"
namespace {
int32 _priority = 1;
struct DataRequested {
DataRequested() {
memset(v, 0, sizeof(v));
}
int64 v[MTPDownloadSessionsCount];
};
QMap<int32, DataRequested> _dataRequested;
}
struct mtpFileLoaderQueue {
mtpFileLoaderQueue() : queries(0), start(0), end(0) {
}
int32 queries;
mtpFileLoader *start, *end;
};
namespace {
typedef QMap<int32, mtpFileLoaderQueue> LoaderQueues;
LoaderQueues queues;
}
mtpFileLoader::mtpFileLoader(int32 dc, const int64 &volume, int32 local, const int64 &secret, int32 size) : prev(0), next(0),
priority(0), inQueue(false), complete(false), skippedBytes(0), nextRequestOffset(0), lastComplete(false),
dc(dc), locationType(0), volume(volume), local(local), secret(secret),
id(0), access(0), fileIsOpen(false), size(size), type(MTP_storage_fileUnknown()) {
LoaderQueues::iterator i = queues.find(dc);
if (i == queues.cend()) {
i = queues.insert(dc, mtpFileLoaderQueue());
}
queue = &i.value();
}
mtpFileLoader::mtpFileLoader(int32 dc, const uint64 &id, const uint64 &access, mtpTypeId locType, const QString &to, int32 size) : prev(0), next(0),
priority(0), inQueue(false), complete(false), skippedBytes(0), nextRequestOffset(0), lastComplete(false),
2014-09-04 07:33:44 +00:00
dc(dc), locationType(locType),
id(id), access(access), file(to), fname(to), fileIsOpen(false), duplicateInData(false), size(size), type(MTP_storage_fileUnknown()) {
LoaderQueues::iterator i = queues.find(MTP::dld[0] + dc);
2014-09-04 07:33:44 +00:00
if (i == queues.cend()) {
i = queues.insert(MTP::dld[0] + dc, mtpFileLoaderQueue());
2014-09-04 07:33:44 +00:00
}
queue = &i.value();
}
mtpFileLoader::mtpFileLoader(int32 dc, const uint64 &id, const uint64 &access, mtpTypeId locType, const QString &to, int32 size, bool todata) : prev(0), next(0),
priority(0), inQueue(false), complete(false), skippedBytes(0), nextRequestOffset(0), lastComplete(false),
2014-09-04 07:33:44 +00:00
dc(dc), locationType(locType),
id(id), access(access), file(to), fname(to), fileIsOpen(false), duplicateInData(todata), size(size), type(MTP_storage_fileUnknown()) {
LoaderQueues::iterator i = queues.find(MTP::dld[0] + dc);
if (i == queues.cend()) {
i = queues.insert(MTP::dld[0] + dc, mtpFileLoaderQueue());
}
queue = &i.value();
}
QString mtpFileLoader::fileName() const {
return fname;
}
bool mtpFileLoader::done() const {
return complete;
}
mtpTypeId mtpFileLoader::fileType() const {
return type.type();
}
const QByteArray &mtpFileLoader::bytes() const {
return data;
}
float64 mtpFileLoader::currentProgress() const {
if (complete) return 1;
if (!fullSize()) return 0;
return float64(currentOffset()) / fullSize();
}
int32 mtpFileLoader::currentOffset(bool includeSkipped) const {
return (fileIsOpen ? file.size() : data.size()) - (includeSkipped ? 0 : skippedBytes);
}
int32 mtpFileLoader::fullSize() const {
return size;
}
void mtpFileLoader::setFileName(const QString &fileName) {
if (duplicateInData && fname.isEmpty()) {
file.setFileName(fname = fileName);
2014-09-04 07:33:44 +00:00
}
}
uint64 mtpFileLoader::objId() const {
return id;
}
void mtpFileLoader::loadNext() {
if (queue->queries >= MaxFileQueries) return;
for (mtpFileLoader *i = queue->start; i;) {
if (i->loadPart()) {
if (queue->queries >= MaxFileQueries) return;
} else {
i = i->next;
}
}
}
void mtpFileLoader::finishFail() {
bool started = currentOffset(true) > 0;
cancelRequests();
type = MTP_storage_fileUnknown();
complete = true;
if (fileIsOpen) {
file.close();
fileIsOpen = false;
file.remove();
}
data = QByteArray();
emit failed(this, started);
file.setFileName(fname = QString());
loadNext();
}
bool mtpFileLoader::loadPart() {
2014-10-31 12:57:32 +00:00
if (complete || lastComplete || (!requests.isEmpty() && !size)) return false;
if (size && nextRequestOffset >= size) return false;
int32 limit = DocumentDownloadPartSize;
MTPInputFileLocation loc;
switch (locationType) {
case 0: loc = MTP_inputFileLocation(MTP_long(volume), MTP_int(local), MTP_long(secret)); limit = DownloadPartSize; break;
case mtpc_inputVideoFileLocation: loc = MTP_inputVideoFileLocation(MTP_long(id), MTP_long(access)); break;
case mtpc_inputAudioFileLocation: loc = MTP_inputAudioFileLocation(MTP_long(id), MTP_long(access)); break;
case mtpc_inputDocumentFileLocation: loc = MTP_inputDocumentFileLocation(MTP_long(id), MTP_long(access)); break;
default:
finishFail();
return false;
break;
}
int32 offset = nextRequestOffset, dcIndex = 0;
DataRequested &dr(_dataRequested[dc]);
if (size) {
for (int32 i = 1; i < MTPDownloadSessionsCount; ++i) {
if (dr.v[i] < dr.v[dcIndex]) {
dcIndex = i;
}
}
}
if (dcIndex) {
App::app()->killDownloadSessionsStop(dc);
}
MTPupload_GetFile request(MTPupload_getFile(loc, MTP_int(offset), MTP_int(limit)));
mtpRequestId reqId = MTP::send(request, rpcDone(&mtpFileLoader::partLoaded, offset), rpcFail(&mtpFileLoader::partFailed), MTP::dld[dcIndex] + dc, 50);
++queue->queries;
dr.v[dcIndex] += limit;
requests.insert(reqId, dcIndex);
nextRequestOffset += limit;
return true;
}
void mtpFileLoader::partLoaded(int32 offset, const MTPupload_File &result, mtpRequestId req) {
Requests::iterator i = requests.find(req);
if (i == requests.cend()) return;
int32 limit = locationType ? DocumentDownloadPartSize : DownloadPartSize;
int32 dcIndex = i.value();
_dataRequested[dc].v[dcIndex] -= limit;
--queue->queries;
requests.erase(i);
const MTPDupload_file &d(result.c_upload_file());
const string &bytes(d.vbytes.c_string().v);
if (bytes.size()) {
if (fileIsOpen) {
int64 fsize = file.size();
if (offset < fsize) {
skippedBytes -= bytes.size();
} else if (offset > fsize) {
skippedBytes += offset - fsize;
}
file.seek(offset);
if (file.write(bytes.data(), bytes.size()) != qint64(bytes.size())) {
return finishFail();
}
} else {
data.reserve(offset + bytes.size());
if (offset > data.size()) {
skippedBytes += offset - data.size();
data.resize(offset);
}
if (offset == data.size()) {
data.append(bytes.data(), bytes.size());
} else {
skippedBytes -= bytes.size();
2014-10-31 12:57:32 +00:00
if (int64(offset + bytes.size()) > data.size()) {
data.resize(offset + bytes.size());
2014-09-04 07:33:44 +00:00
}
memcpy(data.data() + offset, bytes.data(), bytes.size());
2014-09-04 07:33:44 +00:00
}
}
}
if (!bytes.size() || (bytes.size() % 1024)) { // bad next offset
lastComplete = true;
}
if (requests.isEmpty() && (lastComplete || (size && nextRequestOffset >= size))) {
if (!fname.isEmpty() && duplicateInData) {
if (!fileIsOpen) fileIsOpen = file.open(QIODevice::WriteOnly);
if (!fileIsOpen) {
return finishFail();
}
if (file.write(data) != qint64(data.size())) {
return finishFail();
}
}
type = d.vtype;
complete = true;
if (fileIsOpen) {
file.close();
fileIsOpen = false;
psPostprocessFile(QFileInfo(file).absoluteFilePath());
}
removeFromQueue();
App::wnd()->update();
App::wnd()->notifyUpdateAllPhotos();
if (!queue->queries && dcIndex) {
App::app()->killDownloadSessionsStart(dc);
}
}
emit progress(this);
loadNext();
}
bool mtpFileLoader::partFailed(const RPCError &error) {
finishFail();
return true;
}
void mtpFileLoader::removeFromQueue() {
if (!inQueue) return;
if (next) {
next->prev = prev;
}
if (prev) {
prev->next = next;
}
if (queue->end == this) {
queue->end = prev;
}
if (queue->start == this) {
queue->start = next;
}
next = prev = 0;
inQueue = false;
}
void mtpFileLoader::pause() {
removeFromQueue();
}
void mtpFileLoader::start(bool loadFirst, bool prior) {
if (complete) return;
if (!fname.isEmpty() && !duplicateInData && !fileIsOpen) {
fileIsOpen = file.open(QIODevice::WriteOnly);
if (!fileIsOpen) {
finishFail();
return;
}
}
mtpFileLoader *before = 0, *after = 0;
if (prior) {
if (inQueue && priority == _priority) {
if (loadFirst) {
if (!prev) return started(loadFirst, prior);
before = queue->start;
} else {
if (!next || next->priority < _priority) return started(loadFirst, prior);
after = next;
while (after->next && after->next->priority == _priority) {
after = after->next;
}
}
} else {
priority = _priority;
if (loadFirst) {
if (inQueue && !prev) return started(loadFirst, prior);
before = queue->start;
} else {
if (inQueue) {
if (next && next->priority == _priority) {
after = next;
} else if (prev && prev->priority < _priority) {
before = prev;
while (before->prev && before->prev->priority < _priority) {
before = before->prev;
}
} else {
return started(loadFirst, prior);
}
} else {
if (queue->start && queue->start->priority == _priority) {
after = queue->start;
} else {
before = queue->start;
}
}
if (after) {
while (after->next && after->next->priority == _priority) {
after = after->next;
}
}
}
}
} else {
if (loadFirst) {
if (inQueue && (!prev || prev->priority == _priority)) return started(loadFirst, prior);
before = prev;
while (before->prev && before->prev->priority != _priority) {
before = before->prev;
}
} else {
if (inQueue && !next) return started(loadFirst, prior);
after = queue->end;
}
}
removeFromQueue();
inQueue = true;
if (!queue->start) {
queue->start = queue->end = this;
} else if (before) {
if (before != next) {
prev = before->prev;
next = before;
next->prev = this;
if (prev) {
prev->next = this;
}
if (queue->start->prev) queue->start = queue->start->prev;
}
} else if (after) {
if (after != prev) {
next = after->next;
prev = after;
after->next = this;
if (next) {
next->prev = this;
}
if (queue->end->next) queue->end = queue->end->next;
}
} else {
LOG(("Queue Error: _start && !before && !after"));
}
return started(loadFirst, prior);
}
void mtpFileLoader::cancel() {
cancelRequests();
type = MTP_storage_fileUnknown();
complete = true;
if (fileIsOpen) {
file.close();
fileIsOpen = false;
file.remove();
}
data = QByteArray();
file.setFileName(QString());
emit progress(this);
loadNext();
}
void mtpFileLoader::cancelRequests() {
if (requests.isEmpty()) return;
int32 limit = locationType ? DocumentDownloadPartSize : DownloadPartSize;
bool wasIndex = false;
DataRequested &dr(_dataRequested[dc]);
for (Requests::const_iterator i = requests.cbegin(), e = requests.cend(); i != e; ++i) {
MTP::cancel(i.key());
int32 dcIndex = i.value();
dr.v[dcIndex] -= limit;
if (dcIndex) wasIndex = true;
}
queue->queries -= requests.size();
requests.clear();
if (!queue->queries && wasIndex) {
App::app()->killDownloadSessionsStart(dc);
}
}
bool mtpFileLoader::loading() const {
return inQueue;
}
void mtpFileLoader::started(bool loadFirst, bool prior) {
if ((queue->queries >= MaxFileQueries && (!loadFirst || !prior)) || complete) return;
loadPart();
}
mtpFileLoader::~mtpFileLoader() {
removeFromQueue();
cancelRequests();
}
namespace MTP {
void clearLoaderPriorities() {
++_priority;
}
}