tdesktop/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp

457 lines
11 KiB
C++

/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_file.h"
#include "media/streaming/media_streaming_loader.h"
#include "media/streaming/media_streaming_file_delegate.h"
#include "ffmpeg/ffmpeg_utility.h"
namespace Media {
namespace Streaming {
namespace {
constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
constexpr auto kMaxQueuedPackets = 1024;
} // namespace
File::Context::Context(
not_null<FileDelegate*> delegate,
not_null<Reader*> reader)
: _delegate(delegate)
, _reader(reader)
, _size(reader->size()) {
}
File::Context::~Context() = default;
int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) {
return static_cast<Context*>(opaque)->read(
bytes::make_span(buffer, bufferSize));
}
int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) {
return static_cast<Context*>(opaque)->seek(offset, whence);
}
int File::Context::read(bytes::span buffer) {
Assert(_size >= _offset);
const auto amount = std::min(std::size_t(_size - _offset), buffer.size());
if (unroll()) {
return -1;
} else if (amount > kMaxSingleReadAmount) {
LOG(("Streaming Error: Read callback asked for too much data: %1"
).arg(amount));
return -1;
} else if (!amount) {
return amount;
}
buffer = buffer.subspan(0, amount);
while (true) {
const auto result = _reader->fill(_offset, buffer, &_semaphore);
if (result == Reader::FillState::Success) {
break;
} else if (result == Reader::FillState::WaitingRemote) {
// Perhaps for the correct sleeping in case of enough packets
// being read already we require SleepPolicy::Allowed here.
// Otherwise if we wait for the remote frequently and
// _queuedPackets never get to kMaxQueuedPackets and we don't call
// processQueuedPackets(SleepPolicy::Allowed) ever.
//
// But right now we can't simply pass SleepPolicy::Allowed here,
// it freezes because of two _semaphore.acquire one after another.
processQueuedPackets(SleepPolicy::Disallowed);
_delegate->fileWaitingForData();
}
_semaphore.acquire();
if (_interrupted) {
return -1;
} else if (const auto error = _reader->streamingError()) {
fail(*error);
return -1;
}
}
sendFullInCache();
_offset += amount;
return amount;
}
int64_t File::Context::seek(int64_t offset, int whence) {
const auto checkedSeek = [&](int64_t offset) {
if (_failed || offset < 0 || offset > _size) {
return -1;
}
return (_offset = offset);
};
switch (whence) {
case SEEK_SET: return checkedSeek(offset);
case SEEK_CUR: return checkedSeek(_offset + offset);
case SEEK_END: return checkedSeek(_size + offset);
case AVSEEK_SIZE: return _size;
}
return -1;
}
void File::Context::logError(QLatin1String method) {
if (!unroll()) {
FFmpeg::LogError(method);
}
}
void File::Context::logError(
QLatin1String method,
FFmpeg::AvErrorWrap error) {
if (!unroll()) {
FFmpeg::LogError(method, error);
}
}
void File::Context::logFatal(QLatin1String method) {
if (!unroll()) {
FFmpeg::LogError(method);
fail(_format ? Error::InvalidData : Error::OpenFailed);
}
}
void File::Context::logFatal(
QLatin1String method,
FFmpeg::AvErrorWrap error) {
if (!unroll()) {
FFmpeg::LogError(method, error);
fail(_format ? Error::InvalidData : Error::OpenFailed);
}
}
Stream File::Context::initStream(
not_null<AVFormatContext*> format,
AVMediaType type) {
auto result = Stream();
const auto index = result.index = av_find_best_stream(
format,
type,
-1,
-1,
nullptr,
0);
if (index < 0) {
return result;
}
const auto info = format->streams[index];
if (type == AVMEDIA_TYPE_VIDEO) {
if (info->disposition & AV_DISPOSITION_ATTACHED_PIC) {
// ignore cover streams
return Stream();
}
result.rotation = FFmpeg::ReadRotationFromMetadata(info);
result.aspect = FFmpeg::ValidateAspectRatio(info->sample_aspect_ratio);
} else if (type == AVMEDIA_TYPE_AUDIO) {
result.frequency = info->codecpar->sample_rate;
if (!result.frequency) {
return result;
}
}
result.codec = FFmpeg::MakeCodecPointer(info);
if (!result.codec) {
return result;
}
result.frame = FFmpeg::MakeFramePointer();
if (!result.frame) {
result.codec = nullptr;
return result;
}
result.timeBase = info->time_base;
result.duration = (info->duration != AV_NOPTS_VALUE)
? FFmpeg::PtsToTime(info->duration, result.timeBase)
: FFmpeg::PtsToTime(format->duration, FFmpeg::kUniversalTimeBase);
if (result.duration == kTimeUnknown) {
result.duration = kDurationUnavailable;
} else if (result.duration <= 0) {
result.codec = nullptr;
} else {
++result.duration;
if (result.duration > kDurationMax) {
result.duration = 0;
result.codec = nullptr;
}
}
return result;
}
void File::Context::seekToPosition(
not_null<AVFormatContext*> format,
const Stream &stream,
crl::time position) {
auto error = FFmpeg::AvErrorWrap();
if (!position) {
return;
} else if (stream.duration == kDurationUnavailable) {
// Seek in files with unknown duration is not supported.
return;
}
//
// Non backward search reads the whole file if the position is after
// the last keyframe inside the index. So we search only backward.
//
//const auto seekFlags = 0;
//error = av_seek_frame(
// format,
// streamIndex,
// TimeToPts(position, kUniversalTimeBase),
// seekFlags);
//if (!error) {
// return;
//}
//
error = av_seek_frame(
format,
stream.index,
FFmpeg::TimeToPts(
std::clamp(position, crl::time(0), stream.duration - 1),
stream.timeBase),
AVSEEK_FLAG_BACKWARD);
if (!error) {
return;
}
return logFatal(qstr("av_seek_frame"), error);
}
std::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap> File::Context::readPacket() {
auto error = FFmpeg::AvErrorWrap();
auto result = FFmpeg::Packet();
error = av_read_frame(_format.get(), &result.fields());
if (unroll()) {
return FFmpeg::AvErrorWrap();
} else if (!error) {
return result;
} else if (error.code() != AVERROR_EOF) {
logFatal(qstr("av_read_frame"), error);
}
return error;
}
void File::Context::start(crl::time position) {
auto error = FFmpeg::AvErrorWrap();
if (unroll()) {
return;
}
auto format = FFmpeg::MakeFormatPointer(
static_cast<void *>(this),
&Context::Read,
nullptr,
&Context::Seek);
if (!format) {
return fail(Error::OpenFailed);
}
if ((error = avformat_find_stream_info(format.get(), nullptr))) {
return logFatal(qstr("avformat_find_stream_info"), error);
}
auto video = initStream(format.get(), AVMEDIA_TYPE_VIDEO);
if (unroll()) {
return;
}
auto audio = initStream(format.get(), AVMEDIA_TYPE_AUDIO);
if (unroll()) {
return;
}
_reader->headerDone();
if (_reader->isRemoteLoader()) {
sendFullInCache(true);
}
if (video.codec || audio.codec) {
seekToPosition(format.get(), video.codec ? video : audio, position);
}
if (unroll()) {
return;
}
if (video.codec) {
_queuedPackets[video.index].reserve(kMaxQueuedPackets);
}
if (audio.codec) {
_queuedPackets[audio.index].reserve(kMaxQueuedPackets);
}
const auto header = _reader->headerSize();
if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
return fail(Error::OpenFailed);
}
_format = std::move(format);
}
void File::Context::sendFullInCache(bool force) {
const auto started = _fullInCache.has_value();
if (force || started) {
const auto nowFullInCache = _reader->fullInCache();
if (!started || *_fullInCache != nowFullInCache) {
_fullInCache = nowFullInCache;
_delegate->fileFullInCache(nowFullInCache);
}
}
}
void File::Context::readNextPacket() {
auto result = readPacket();
if (unroll()) {
return;
} else if (const auto packet = std::get_if<FFmpeg::Packet>(&result)) {
const auto index = packet->fields().stream_index;
const auto i = _queuedPackets.find(index);
if (i == end(_queuedPackets)) {
return;
}
i->second.push_back(std::move(*packet));
if (i->second.size() == kMaxQueuedPackets) {
processQueuedPackets(SleepPolicy::Allowed);
}
Assert(i->second.size() < kMaxQueuedPackets);
} else {
// Still trying to read by drain.
Assert(v::is<FFmpeg::AvErrorWrap>(result));
Assert(v::get<FFmpeg::AvErrorWrap>(result).code() == AVERROR_EOF);
processQueuedPackets(SleepPolicy::Allowed);
if (!finished()) {
handleEndOfFile();
}
}
}
void File::Context::handleEndOfFile() {
_delegate->fileProcessEndOfFile();
if (_delegate->fileReadMore()) {
_readTillEnd = false;
auto error = FFmpeg::AvErrorWrap(av_seek_frame(
_format.get(),
-1, // stream_index
0, // timestamp
AVSEEK_FLAG_BACKWARD));
if (error) {
logFatal(qstr("av_seek_frame"));
}
// If we loaded a file till the end then we think it is fully cached,
// assume we finished loading and don't want to keep all other
// download tasks throttled because of an active streaming.
_reader->tryRemoveLoaderAsync();
} else {
_readTillEnd = true;
}
}
void File::Context::processQueuedPackets(SleepPolicy policy) {
const auto more = _delegate->fileProcessPackets(_queuedPackets);
if (!more && policy == SleepPolicy::Allowed) {
do {
_reader->startSleep(&_semaphore);
_semaphore.acquire();
_reader->stopSleep();
} while (!unroll() && !_delegate->fileReadMore());
}
}
void File::Context::interrupt() {
_interrupted = true;
_semaphore.release();
}
void File::Context::wake() {
_semaphore.release();
}
bool File::Context::interrupted() const {
return _interrupted;
}
bool File::Context::failed() const {
return _failed;
}
bool File::Context::unroll() const {
return failed() || interrupted();
}
void File::Context::fail(Error error) {
_failed = true;
_delegate->fileError(error);
}
bool File::Context::finished() const {
return unroll() || _readTillEnd;
}
void File::Context::stopStreamingAsync() {
// If we finished loading we don't want to keep all other
// download tasks throttled because of an active streaming.
_reader->stopStreamingAsync();
}
File::File(std::shared_ptr<Reader> reader)
: _reader(std::move(reader)) {
}
void File::start(not_null<FileDelegate*> delegate, crl::time position) {
stop(true);
_reader->startStreaming();
_context.emplace(delegate, _reader.get());
_thread = std::thread([=, context = &*_context] {
crl::toggle_fp_exceptions(true);
context->start(position);
while (!context->finished()) {
context->readNextPacket();
}
if (!context->interrupted()) {
context->stopStreamingAsync();
}
});
}
void File::wake() {
Expects(_context.has_value());
_context->wake();
}
void File::stop(bool stillActive) {
if (_thread.joinable()) {
_context->interrupt();
_thread.join();
}
_reader->stopStreaming(stillActive);
_context.reset();
}
bool File::isRemoteLoader() const {
return _reader->isRemoteLoader();
}
void File::setLoaderPriority(int priority) {
_reader->setLoaderPriority(priority);
}
File::~File() {
stop();
}
} // namespace Streaming
} // namespace Media