diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index 9d44f6b819..dfad51d697 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -28,6 +28,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "mainwindow.h" #include "core/application.h" +// #TODO streaming +#include "media/streaming/media_streaming_file.h" +#include "media/streaming/media_streaming_loader_mtproto.h" + namespace { constexpr auto kMemoryForCache = 32 * 1024 * 1024; @@ -303,6 +307,24 @@ void DocumentOpenClickHandler::Open( return; } } + if (playMusic || playVideo) { + AssertIsDebug(); + if (auto loader = data->createStreamingLoader(origin)) { + static auto file = std::unique_ptr(); + file = std::make_unique( + &data->owner(), + std::move(loader)); + data->session().lifetime().add([] { + file = nullptr; + }); + file->start( + (playMusic + ? Media::Streaming::Mode::Audio + : Media::Streaming::Mode::Video), + 0); + } + return; + } if (!location.isEmpty() || (!data->data().isEmpty() && (playVoice || playMusic || playVideo || playAnimation))) { using State = Media::Player::State; if (playVoice) { @@ -1286,6 +1308,21 @@ bool DocumentData::hasRemoteLocation() const { return (_dc != 0 && _access != 0); } +auto DocumentData::createStreamingLoader(Data::FileOrigin origin) const +-> std::unique_ptr { + return hasRemoteLocation() + ? std::make_unique( + &session().api(), + _dc, + MTP_inputDocumentFileLocation( + MTP_long(id), + MTP_long(_access), + MTP_bytes(_fileReference)), + size, + origin) + : nullptr; +} + bool DocumentData::hasWebLocation() const { return _urlLocation.dc() != 0 && _urlLocation.accessHash() != 0; } diff --git a/Telegram/SourceFiles/data/data_document.h b/Telegram/SourceFiles/data/data_document.h index 41ede516ec..215fa934d0 100644 --- a/Telegram/SourceFiles/data/data_document.h +++ b/Telegram/SourceFiles/data/data_document.h @@ -20,6 +20,12 @@ struct Key; } // namespace Cache } // namespace Storage +namespace Media { +namespace Streaming { +class Loader; +} // namespace Streaming +} // namespace Media + namespace Data { class Session; } // namespace Data @@ -217,6 +223,9 @@ public: const QString &songPerformer); [[nodiscard]] QString composeNameString() const; + [[nodiscard]] auto createStreamingLoader(Data::FileOrigin origin) const + -> std::unique_ptr; + ~DocumentData(); DocumentId id = 0; diff --git a/Telegram/SourceFiles/data/data_types.cpp b/Telegram/SourceFiles/data/data_types.cpp index 735ddd454f..a0c728283a 100644 --- a/Telegram/SourceFiles/data/data_types.cpp +++ b/Telegram/SourceFiles/data/data_types.cpp @@ -173,6 +173,13 @@ bool ReplyPreview::empty() const { } // namespace Data +AudioMsgId AudioMsgId::ForVideo() { + auto result = AudioMsgId(); + result._playId = rand_value(); + result._type = Type::Video; + return result; +} + void AudioMsgId::setTypeFromAudio() { if (_audio->isVoiceMessage() || _audio->isVideoMessage()) { _type = Type::Voice; diff --git a/Telegram/SourceFiles/data/data_types.h b/Telegram/SourceFiles/data/data_types.h index d1c2c312a1..dec5d5c651 100644 --- a/Telegram/SourceFiles/data/data_types.h +++ b/Telegram/SourceFiles/data/data_types.h @@ -369,6 +369,7 @@ public: , _playId(playId) { setTypeFromAudio(); } + [[nodiscard]] static AudioMsgId ForVideo(); Type type() const { return _type; diff --git a/Telegram/SourceFiles/media/audio/media_audio.cpp b/Telegram/SourceFiles/media/audio/media_audio.cpp index 613d4fe41d..78697e3a61 100644 --- a/Telegram/SourceFiles/media/audio/media_audio.cpp +++ b/Telegram/SourceFiles/media/audio/media_audio.cpp @@ -772,8 +772,8 @@ void Mixer::play( } } -void Mixer::feedFromVideo(VideoSoundPart &&part) { - _loader->feedFromVideo(std::move(part)); +void Mixer::feedFromVideo(const VideoSoundPart &part) { + _loader->feedFromVideo(part); } crl::time Mixer::getVideoCorrectedTime(const AudioMsgId &audio, crl::time frameMs, crl::time systemMs) { diff --git a/Telegram/SourceFiles/media/audio/media_audio.h b/Telegram/SourceFiles/media/audio/media_audio.h index b34460256e..cc71b68056 100644 --- a/Telegram/SourceFiles/media/audio/media_audio.h +++ b/Telegram/SourceFiles/media/audio/media_audio.h @@ -120,7 +120,7 @@ public: void stop(const AudioMsgId &audio, State state); // Video player audio stream interface. - void feedFromVideo(VideoSoundPart &&part); + void feedFromVideo(const VideoSoundPart &part); crl::time getVideoCorrectedTime( const AudioMsgId &id, crl::time frameMs, diff --git a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp index 7fc5074933..922da09d54 100644 --- a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp +++ b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp @@ -21,7 +21,7 @@ Loaders::Loaders(QThread *thread) : _fromVideoNotify([this] { videoSoundAdded(); connect(thread, SIGNAL(finished()), this, SLOT(deleteLater())); } -void Loaders::feedFromVideo(VideoSoundPart &&part) { +void Loaders::feedFromVideo(const VideoSoundPart &part) { auto invoke = false; { QMutexLocker lock(&_fromVideoMutex); diff --git a/Telegram/SourceFiles/media/audio/media_audio_loaders.h b/Telegram/SourceFiles/media/audio/media_audio_loaders.h index 656e3af98d..e4aeff7f72 100644 --- a/Telegram/SourceFiles/media/audio/media_audio_loaders.h +++ b/Telegram/SourceFiles/media/audio/media_audio_loaders.h @@ -21,7 +21,7 @@ class Loaders : public QObject { public: Loaders(QThread *thread); - void feedFromVideo(VideoSoundPart &&part); + void feedFromVideo(const VideoSoundPart &part); ~Loaders(); signals: diff --git a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h index a645a4a0e3..7858d78319 100644 --- a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h +++ b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h @@ -17,15 +17,14 @@ struct VideoSoundData { }; struct VideoSoundPart { - AVPacket *packet = nullptr; + const AVPacket *packet = nullptr; AudioMsgId audio; - uint32 playId = 0; }; namespace FFMpeg { // AVPacket has a deprecated field, so when you copy an AVPacket -// variable (e.g. inside QQueue), a compile warning is emited. +// variable (e.g. inside QQueue), a compile warning is emitted. // We wrap full AVPacket data in a new AVPacketDataWrap struct. // All other fields are copied from AVPacket without modifications. struct AVPacketDataWrap { diff --git a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp index 54a03eff38..c81f09bb4b 100644 --- a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp +++ b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp @@ -47,9 +47,6 @@ bool isAlignedImage(const QImage &image) { FFMpegReaderImplementation::FFMpegReaderImplementation(FileLocation *location, QByteArray *data, const AudioMsgId &audio) : ReaderImplementation(location, data) , _audioMsgId(audio) { _frame = av_frame_alloc(); - av_init_packet(&_packetNull); - _packetNull.data = nullptr; - _packetNull.size = 0; } ReaderImplementation::ReadResult FFMpegReaderImplementation::readNextFrame() { @@ -317,7 +314,6 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) { LOG(("Gif Error: Unable to av_find_best_stream %1, error %2, %3").arg(logData()).arg(_streamId).arg(av_make_error_string(err, sizeof(err), _streamId))); return false; } - _packetNull.stream_index = _streamId; auto rotateTag = av_dict_get(_fmtContext->streams[_streamId]->metadata, "rotate", NULL, 0); if (rotateTag && *rotateTag->value) { @@ -341,7 +337,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) { av_codec_set_pkt_timebase(_codecContext, _fmtContext->streams[_streamId]->time_base); av_opt_set_int(_codecContext, "refcounted_frames", 1, 0); - _codec = avcodec_find_decoder(_codecContext->codec_id); + const auto codec = avcodec_find_decoder(_codecContext->codec_id); _audioStreamId = av_find_best_stream(_fmtContext, AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0); if (_mode == Mode::Inspecting) { @@ -351,7 +347,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) { _audioStreamId = -1; } - if ((res = avcodec_open2(_codecContext, _codec, 0)) < 0) { + if ((res = avcodec_open2(_codecContext, codec, 0)) < 0) { LOG(("Gif Error: Unable to avcodec_open2 %1, error %2, %3").arg(logData()).arg(res).arg(av_make_error_string(err, sizeof(err), res))); return false; } @@ -370,7 +366,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) { av_codec_set_pkt_timebase(audioContext, _fmtContext->streams[_audioStreamId]->time_base); av_opt_set_int(audioContext, "refcounted_frames", 1, 0); - auto audioCodec = avcodec_find_decoder(audioContext->codec_id); + const auto audioCodec = avcodec_find_decoder(audioContext->codec_id); if ((res = avcodec_open2(audioContext, audioCodec, 0)) < 0) { avcodec_free_context(&audioContext); LOG(("Gif Error: Unable to avcodec_open2 %1, error %2, %3").arg(logData()).arg(res).arg(av_make_error_string(err, sizeof(err), res))); @@ -490,10 +486,11 @@ FFMpegReaderImplementation::PacketResult FFMpegReaderImplementation::readPacket( if (res == AVERROR_EOF) { if (_audioStreamId >= 0) { // queue terminating packet to audio player - VideoSoundPart part; - part.packet = &_packetNull; - part.audio = _audioMsgId; - Player::mixer()->feedFromVideo(std::move(part)); + auto drain = AVPacket(); + av_init_packet(&drain); + drain.data = nullptr; + drain.size = 0; + Player::mixer()->feedFromVideo({ &drain, _audioMsgId }); } return PacketResult::EndOfFile; } @@ -516,10 +513,7 @@ void FFMpegReaderImplementation::processPacket(AVPacket *packet) { _lastReadAudioMs = countPacketMs(packet); // queue packet to audio player - VideoSoundPart part; - part.packet = packet; - part.audio = _audioMsgId; - Player::mixer()->feedFromVideo(std::move(part)); + Player::mixer()->feedFromVideo({ packet, _audioMsgId }); } } else { av_packet_unref(packet); diff --git a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.h b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.h index d4fc0ecd7d..14add74c3d 100644 --- a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.h +++ b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.h @@ -84,7 +84,6 @@ private: uchar *_ioBuffer = nullptr; AVIOContext *_ioContext = nullptr; AVFormatContext *_fmtContext = nullptr; - AVCodec *_codec = nullptr; AVCodecContext *_codecContext = nullptr; int _streamId = 0; AVFrame *_frame = nullptr; @@ -100,7 +99,6 @@ private: crl::time _lastReadAudioMs = 0; QQueue _packetQueue; - AVPacket _packetNull; // for final decoding int _packetStartedSize = 0; uint8_t *_packetStartedData = nullptr; bool _packetStarted = false; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_common.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_common.cpp new file mode 100644 index 0000000000..9f0c292ca1 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_common.cpp @@ -0,0 +1,175 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_common.h" + +extern "C" { +#include +} // extern "C" + +namespace Media { +namespace Streaming { +namespace { + +constexpr int kSkipInvalidDataPackets = 10; + +} // namespace + +void LogError(QLatin1String method) { + LOG(("Streaming Error: Error in %1.").arg(method)); +} + +void LogError(QLatin1String method, AvErrorWrap error) { + LOG(("Streaming Error: Error in %1 (code: %2, text: %3)." + ).arg(method + ).arg(error.code() + ).arg(error.text())); +} + +crl::time PtsToTime(int64_t pts, const AVRational &timeBase) { + return (pts == AV_NOPTS_VALUE) + ? Information::kDurationUnknown + : ((pts * 1000LL * timeBase.num) / timeBase.den); +} + +std::optional ReadNextFrame(Stream &stream) { + Expects(stream.frame != nullptr); + + auto error = AvErrorWrap(); + + if (stream.frame->data) { + av_frame_unref(stream.frame.get()); + } + do { + error = avcodec_receive_frame(stream.codec, stream.frame.get()); + if (!error) { + //processReadFrame(); // #TODO streaming + return std::nullopt; + } + + if (error.code() != AVERROR(EAGAIN) || stream.queue.empty()) { + return error; + } + + const auto packet = &stream.queue.front().fields(); + const auto guard = gsl::finally([ + &, + size = packet->size, + data = packet->data + ] { + packet->size = size; + packet->data = data; + stream.queue.pop_front(); + }); + + error = avcodec_send_packet( + stream.codec, + packet->data ? packet : nullptr); // Drain on eof. + if (!error) { + continue; + } + LogError(qstr("avcodec_send_packet"), error); + if (error.code() == AVERROR_INVALIDDATA + // There is a sample voice message where skipping such packet + // results in a crash (read_access to nullptr) in swr_convert(). + && stream.codec->codec_id != AV_CODEC_ID_OPUS) { + if (++stream.invalidDataPackets < kSkipInvalidDataPackets) { + continue; // Try to skip a bad packet. + } + } + return error; + } while (true); + + [[unreachable]]; +} + +CodecPointer::CodecPointer(std::nullptr_t) { +} + +CodecPointer::CodecPointer(CodecPointer &&other) +: _context(base::take(other._context)) { +} + +CodecPointer &CodecPointer::operator=(CodecPointer &&other) { + if (this != &other) { + destroy(); + _context = base::take(other._context); + } + return *this; +} + +CodecPointer &CodecPointer::operator=(std::nullptr_t) { + destroy(); + return *this; +} + +void CodecPointer::destroy() { + if (_context) { + avcodec_free_context(&_context); + } +} + +CodecPointer CodecPointer::FromStream(not_null stream) { + auto error = AvErrorWrap(); + + auto result = CodecPointer(); + const auto context = result._context = avcodec_alloc_context3(nullptr); + if (!context) { + LogError(qstr("avcodec_alloc_context3")); + return {}; + } + error = avcodec_parameters_to_context(context, stream->codecpar); + if (error) { + LogError(qstr("avcodec_parameters_to_context"), error); + return {}; + } + av_codec_set_pkt_timebase(context, stream->time_base); + av_opt_set_int(context, "refcounted_frames", 1, 0); + + const auto codec = avcodec_find_decoder(context->codec_id); + if (!codec) { + LogError(qstr("avcodec_find_decoder"), context->codec_id); + return {}; + } else if ((error = avcodec_open2(context, codec, nullptr))) { + LogError(qstr("avcodec_open2"), error); + return {}; + } + return result; +} + +AVCodecContext *CodecPointer::get() const { + return _context; +} + +AVCodecContext *CodecPointer::operator->() const { + Expects(_context != nullptr); + + return get(); +} + +CodecPointer::operator AVCodecContext*() const { + return get(); +} + +AVCodecContext* CodecPointer::release() { + return base::take(_context); +} + +CodecPointer::~CodecPointer() { + destroy(); +} + +FrameDeleter::pointer FrameDeleter::create() { + return av_frame_alloc(); +} + +void FrameDeleter::operator()(pointer value) { + av_frame_free(&value); +} + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_common.h b/Telegram/SourceFiles/media/streaming/media_streaming_common.h new file mode 100644 index 0000000000..d47fe7b07c --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_common.h @@ -0,0 +1,162 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +extern "C" { +#include +#include +} // extern "C" + +namespace Media { +namespace Streaming { + +[[nodiscard]] crl::time PtsToTime(int64_t pts, const AVRational &timeBase); + +enum class Mode { + Both, + Audio, + Video, + Inspection +}; + +struct Information { + static constexpr auto kDurationUnknown = crl::time(-1); + + QSize video; + bool audio = false; + crl::time duration = kDurationUnknown; + + crl::time started = 0; + QImage cover; +}; + +class AvErrorWrap { +public: + AvErrorWrap(int code = 0) : _code(code) { + } + + [[nodiscard]] explicit operator bool() const { + return (_code < 0); + } + + [[nodiscard]] int code() const { + return _code; + } + + [[nodiscard]] QString text() const { + char string[AV_ERROR_MAX_STRING_SIZE] = { 0 }; + return QString::fromUtf8(av_make_error_string( + string, + sizeof(string), + _code)); + } + +private: + int _code = 0; + +}; + +void LogError(QLatin1String method); +void LogError(QLatin1String method, AvErrorWrap error); + +class Packet { +public: + Packet() { + setEmpty(); + } + Packet(const AVPacket &data) { + bytes::copy(_data, bytes::object_as_span(&data)); + } + Packet(Packet &&other) { + bytes::copy(_data, other._data); + if (!other.empty()) { + other.release(); + } + } + Packet &operator=(Packet &&other) { + if (this != &other) { + av_packet_unref(&fields()); + bytes::copy(_data, other._data); + if (!other.empty()) { + other.release(); + } + } + return *this; + } + ~Packet() { + av_packet_unref(&fields()); + } + + [[nodiscard]] AVPacket &fields() { + return *reinterpret_cast(_data); + } + [[nodiscard]] const AVPacket &fields() const { + return *reinterpret_cast(_data); + } + + [[nodiscard]] bool empty() const { + return !fields().data; + } + void release() { + setEmpty(); + } + +private: + void setEmpty() { + auto &native = fields(); + av_init_packet(&native); + native.data = nullptr; + native.size = 0; + } + + alignas(alignof(AVPacket)) bytes::type _data[sizeof(AVPacket)]; + +}; + +class CodecPointer { +public: + CodecPointer(std::nullptr_t = nullptr); + CodecPointer(CodecPointer &&other); + CodecPointer &operator=(CodecPointer &&other); + CodecPointer &operator=(std::nullptr_t); + ~CodecPointer(); + + [[nodiscard]] static CodecPointer FromStream( + not_null stream); + + [[nodiscard]] AVCodecContext *get() const; + [[nodiscard]] AVCodecContext *operator->() const; + [[nodiscard]] operator AVCodecContext*() const; + [[nodiscard]] AVCodecContext* release(); + +private: + void destroy(); + + AVCodecContext *_context = nullptr; + +}; + +struct FrameDeleter { + using pointer = AVFrame*; + [[nodiscard]] static pointer create(); + void operator()(pointer value); +}; +using FramePointer = std::unique_ptr; + +struct Stream { + CodecPointer codec; + FramePointer frame; + std::deque queue; + crl::time lastReadPositionTime = 0; + int invalidDataPackets = 0; +}; + +[[nodiscard]] std::optional ReadNextFrame(Stream &stream); + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp new file mode 100644 index 0000000000..e211ec86be --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -0,0 +1,536 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_file.h" + +#include "media/streaming/media_streaming_loader.h" + +#include "media/audio/media_audio.h" // #TODO streaming +#include "media/audio/media_child_ffmpeg_loader.h" +#include "ui/toast/toast.h" + +namespace Media { +namespace Streaming { + +File::Context::Context(not_null reader) +: _reader(reader) +, _size(reader->size()) +, _audioMsgId(AudioMsgId::ForVideo()) { +} + +int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) { + return static_cast(opaque)->read( + bytes::make_span(buffer, bufferSize)); +} + +int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) { + return static_cast(opaque)->seek(offset, whence); +} + +int File::Context::read(bytes::span buffer) { + const auto amount = std::min(size_type(_size - _offset), buffer.size()); + if (unroll() || amount < 0) { + return -1; + } else if (!amount) { + return amount; + } + buffer = buffer.subspan(0, amount); + while (!_reader->fill(buffer, _offset, &_semaphore)) { + _semaphore.acquire(); + if (_interrupted) { + return -1; + } else if (_reader->failed()) { + _failed = true; + return -1; + } + } + _offset += amount; + return amount; +} + +int64_t File::Context::seek(int64_t offset, int whence) { + const auto checkedSeek = [&](int64_t offset) { + if (_failed || offset < 0 || offset > _size) { + return -1; + } + return (_offset = offset); + }; + switch (whence) { + case SEEK_SET: return checkedSeek(offset); + case SEEK_CUR: return checkedSeek(_offset + offset); + case SEEK_END: return checkedSeek(_size + offset); + case AVSEEK_SIZE: return _size; + } + return -1; +} + +void File::Context::logError(QLatin1String method) { + if (!unroll()) { + LogError(method); + } +} + +void File::Context::logError(QLatin1String method, AvErrorWrap error) { + if (!unroll()) { + LogError(method, error); + } +} + +void File::Context::logFatal(QLatin1String method) { + if (!unroll()) { + LogError(method); + _failed = true; + } +} + +void File::Context::logFatal(QLatin1String method, AvErrorWrap error) { + if (!unroll()) { + LogError(method, error); + _failed = true; + } +} + +void File::Context::initStream(StreamWrap &wrap, AVMediaType type) { + wrap.id = av_find_best_stream( + _formatContext, + type, + -1, + -1, + 0, + 0); + if (wrap.id < 0) { + return; + } + + wrap.info = _formatContext->streams[wrap.id]; + if (type == AVMEDIA_TYPE_VIDEO) { + const auto rotateTag = av_dict_get( + wrap.info->metadata, + "rotate", + nullptr, + 0); + if (rotateTag && *rotateTag->value) { + const auto stringRotateTag = QString::fromUtf8(rotateTag->value); + auto toIntSucceeded = false; + const auto rotateDegrees = stringRotateTag.toInt(&toIntSucceeded); + if (toIntSucceeded) { + //_rotation = rotationFromDegrees(rotateDegrees); // #TODO streaming + } + } + } + + wrap.stream.codec = CodecPointer::FromStream(wrap.info); + if (!wrap.stream.codec) { + ClearStream(wrap); + return; + } + wrap.stream.frame.reset(FrameDeleter::create()); + if (!wrap.stream.frame) { + ClearStream(wrap); + return; + } +} + +void File::Context::seekToPosition(crl::time positionTime) { + auto error = AvErrorWrap(); + + if (!positionTime) { + return; + } + const auto &main = mainStream(); + Assert(main.info != nullptr); + const auto timeBase = main.info->time_base; + const auto timeStamp = (positionTime * timeBase.den) + / (1000LL * timeBase.num); + error = av_seek_frame( + _formatContext, + main.id, + timeStamp, + 0); + if (!error) { + return; + } + error = av_seek_frame( + _formatContext, + main.id, + timeStamp, + AVSEEK_FLAG_BACKWARD); + if (!error) { + return; + } + return logFatal(qstr("av_seek_frame"), error); +} + +base::variant File::Context::readPacket() { + auto error = AvErrorWrap(); + + auto result = Packet(); + error = av_read_frame(_formatContext, &result.fields()); + if (unroll()) { + return AvErrorWrap(); + } else if (!error) { + return std::move(result); + } else if (error.code() != AVERROR_EOF) { + logFatal(qstr("av_read_frame"), error); + } + return error; +} + +void File::Context::start(Mode mode, crl::time positionTime) { + auto error = AvErrorWrap(); + + _mode = mode; + if (unroll()) { + return; + } + _ioBuffer = reinterpret_cast(av_malloc(AVBlockSize)); + _ioContext = avio_alloc_context( + _ioBuffer, + AVBlockSize, + 0, + static_cast(this), + &Context::Read, + nullptr, + &Context::Seek); + _formatContext = avformat_alloc_context(); + if (!_formatContext) { + return logFatal(qstr("avformat_alloc_context")); + } + _formatContext->pb = _ioContext; + + error = avformat_open_input(&_formatContext, nullptr, nullptr, nullptr); + if (error) { + _ioBuffer = nullptr; + return logFatal(qstr("avformat_open_input"), error); + } + _opened = true; + + if ((error = avformat_find_stream_info(_formatContext, nullptr))) { + return logFatal(qstr("avformat_find_stream_info"), error); + } + + initStream(_video, AVMEDIA_TYPE_VIDEO); + initStream(_audio, AVMEDIA_TYPE_AUDIO); + if (!mainStreamUnchecked().info) { + return logFatal(qstr("RequiredStreamAbsent")); + } + + readInformation(positionTime); + + if (_audio.info + && (_mode == Mode::Audio || _mode == Mode::Both)) { // #TODO streaming + Player::mixer()->resume(_audioMsgId, true); + } +} + +auto File::Context::mainStreamUnchecked() const -> const StreamWrap & { + return (_mode == Mode::Video || (_video.info && _mode != Mode::Audio)) + ? _video + : _audio; +} + +auto File::Context::mainStream() const -> const StreamWrap & { + const auto &result = mainStreamUnchecked(); + + Ensures(result.info != nullptr); + return result; +} + +auto File::Context::mainStream() -> StreamWrap & { + return const_cast(((const Context*)this)->mainStream()); +} + +void File::Context::readInformation(crl::time positionTime) { + const auto &main = mainStream(); + const auto info = main.info; + auto information = Information(); + information.duration = PtsToTime(info->duration, info->time_base); + + auto result = readPacket(); + const auto packet = base::get_if(&result); + if (unroll()) { + return; + } else if (packet) { + if (positionTime > 0) { + const auto time = CountPacketPositionTime( + _formatContext->streams[packet->fields().stream_index], + *packet); + information.started = (time == Information::kDurationUnknown) + ? positionTime + : time; + } + } else { + information.started = positionTime; + } + + if (_audio.info + && (_mode == Mode::Audio || _mode == Mode::Both)) { // #TODO streaming + auto soundData = std::make_unique(); + soundData->context = _audio.stream.codec.release(); + soundData->frequency = _audio.info->codecpar->sample_rate; + if (_audio.info->duration == AV_NOPTS_VALUE) { + soundData->length = (_formatContext->duration * soundData->frequency) / AV_TIME_BASE; + } else { + soundData->length = (_audio.info->duration * soundData->frequency * _audio.info->time_base.num) / _audio.info->time_base.den; + } + Player::mixer()->play(_audioMsgId, std::move(soundData), information.started); + } + + if (packet) { + processPacket(std::move(*packet)); + } else { + enqueueEofPackets(); + } + + information.cover = readFirstVideoFrame(); + if (unroll()) { + return; + } + + information.audio = (_audio.info != nullptr); + _information = std::move(information); +} + +QImage File::Context::readFirstVideoFrame() { + auto result = QImage(); + while (_video.info && result.isNull()) { + auto frame = tryReadFirstVideoFrame(); + if (unroll()) { + return QImage(); + } + frame.match([&](QImage &image) { + if (!image.isNull()) { + result = std::move(image); + } else { + ClearStream(_video); + } + }, [&](const AvErrorWrap &error) { + if (error.code() == AVERROR(EAGAIN)) { + readNextPacket(); + } else { + ClearStream(_video); + } + }); + } + if (!_video.info && _mode == Mode::Video) { + logFatal(qstr("RequiredStreamEmpty")); + return QImage(); + } + return result; +} + +base::variant File::Context::tryReadFirstVideoFrame() { + Expects(_video.info != nullptr); + + if (unroll()) { + return AvErrorWrap(); + } + const auto error = ReadNextFrame(_video.stream); + if (error) { + if (error->code() == AVERROR_EOF) { + // No valid video stream. + if (_mode == Mode::Video) { + logFatal(qstr("RequiredStreamEmpty")); + } + return QImage(); + } else if (error->code() != AVERROR(EAGAIN)) { + _failed = true; + } + return *error; + } + return QImage(); // #TODO streaming decode frame +} + +void File::Context::enqueueEofPackets() { + if (_audio.info) { + Enqueue(_audio, Packet()); + } + if (_video.info) { + Enqueue(_video, Packet()); + } + _readTillEnd = true; +} + +void File::Context::processPacket(Packet &&packet) { + const auto &native = packet.fields(); + const auto streamId = native.stream_index; + const auto check = [&](StreamWrap &wrap) { + if ((native.stream_index == wrap.id) && wrap.info) { + // #TODO streaming queue packet to audio player + if ((_mode == Mode::Audio || _mode == Mode::Both) + && (wrap.info == _audio.info)) { + Player::mixer()->feedFromVideo({ &native, _audioMsgId }); + packet.release(); + } else { + Enqueue(wrap, std::move(packet)); + } + return true; + } + return false; + }; + + check(_audio) && check(_video); +} + +void File::Context::readNextPacket() { + auto result = readPacket(); + const auto packet = base::get_if(&result); + if (unroll()) { + return; + } else if (packet) { + processPacket(std::move(*packet)); + } else { + // Still trying to read by drain. + Assert(result.is()); + Assert(result.get().code() == AVERROR_EOF); + enqueueEofPackets(); + } +} + +crl::time File::Context::CountPacketPositionTime( + not_null info, + const Packet &packet) { + const auto &native = packet.fields(); + const auto packetPts = (native.pts == AV_NOPTS_VALUE) + ? native.dts + : native.pts; + const auto &timeBase = info->time_base; + return PtsToTime(packetPts, info->time_base); +} + +void File::Context::ClearStream(StreamWrap &wrap) { + wrap.id = -1; + wrap.stream = Stream(); + wrap.info = nullptr; +} + +crl::time File::Context::CountPacketPositionTime( + const StreamWrap &wrap, + const Packet &packet) { + return CountPacketPositionTime(wrap.info, packet); +} + +void File::Context::Enqueue(StreamWrap &wrap, Packet &&packet) { + const auto time = CountPacketPositionTime(wrap, packet); + if (time != Information::kDurationUnknown) { + wrap.stream.lastReadPositionTime = time; + } + + QMutexLocker lock(&wrap.mutex); + wrap.stream.queue.push_back(std::move(packet)); +} + +void File::Context::interrupt() { + _interrupted = true; + _semaphore.release(); +} + +bool File::Context::interrupted() const { + return _interrupted; +} + +bool File::Context::failed() const { + return _failed; +} + +bool File::Context::unroll() const { + return failed() || interrupted(); +} + +File::Context::~Context() { + ClearStream(_audio); + ClearStream(_video); + + //if (_swsContext) { + // sws_freeContext(_swsContext); + //} + if (_opened) { + avformat_close_input(&_formatContext); + } + if (_ioContext) { + av_freep(&_ioContext->buffer); + av_freep(&_ioContext); + } else if (_ioBuffer) { + av_freep(&_ioBuffer); + } + if (_formatContext) { + avformat_free_context(_formatContext); + } +} + +bool File::Context::started() const { + return _information.has_value(); +} + +bool File::Context::finished() const { + return unroll() || _readTillEnd; +} + +const Media::Streaming::Information & File::Context::information() const { + Expects(_information.has_value()); + + return *_information; +} + +File::File( + not_null owner, + std::unique_ptr loader) +: _reader(owner, std::move(loader)) { +} + +void File::start(Mode mode, crl::time positionTime) { + finish(); + + _context = std::make_unique(&_reader); + _thread = std::thread([=, context = _context.get()] { + context->start(mode, positionTime); + if (context->interrupted()) { + return; + } else if (context->failed()) { + crl::on_main(context, [=] { + // #TODO streaming failed + }); + } else { + crl::on_main(context, [=, info = context->information()] { + // #TODO streaming started + }); + while (!context->finished()) { + context->readNextPacket(); + } + crl::on_main(context, [] { AssertIsDebug(); + Ui::Toast::Show("Finished loading."); + }); + } + }); +} + +//rpl::producer File::information() const { +// +//} +// +//rpl::producer File::video() const { +// +//} +// +//rpl::producer File::audio() const { +// +//} + +void File::finish() { + if (_thread.joinable()) { + _context->interrupt(); + _thread.join(); + } + _context = nullptr; +} + +File::~File() { + finish(); +} + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h new file mode 100644 index 0000000000..75a100ea5b --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h @@ -0,0 +1,136 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "media/streaming/media_streaming_common.h" +#include "media/streaming/media_streaming_reader.h" +#include "base/bytes.h" +#include "base/weak_ptr.h" + +#include + +namespace Data { +class Session; +} // namespace Data + +namespace Media { +namespace Streaming { + +class Loader; + +class File final { +public: + File(not_null owner, std::unique_ptr loader); + + File(const File &other) = delete; + File &operator=(const File &other) = delete; + + void start(Mode mode, crl::time positionTime); + + //rpl::producer information() const; + //rpl::producer video() const; + //rpl::producer audio() const; + + ~File(); + +private: + void finish(); + + class Context final : public base::has_weak_ptr { + public: + Context(not_null reader); + + void start(Mode mode, crl::time positionTime); + void readNextPacket(); + + void interrupt(); + [[nodiscard]] bool interrupted() const; + [[nodiscard]] bool failed() const; + [[nodiscard]] bool started() const; + [[nodiscard]] bool finished() const; + [[nodiscard]] const Information &information() const; + + ~Context(); + + private: + struct StreamWrap { + int id = -1; + AVStream *info = nullptr; + Stream stream; + QMutex mutex; + }; + + static int Read(void *opaque, uint8_t *buffer, int bufferSize); + static int64_t Seek(void *opaque, int64_t offset, int whence); + + [[nodiscard]] int read(bytes::span buffer); + [[nodiscard]] int64_t seek(int64_t offset, int whence); + + [[nodiscard]] bool unroll() const; + void logError(QLatin1String method); + void logError(QLatin1String method, AvErrorWrap error); + void logFatal(QLatin1String method); + void logFatal(QLatin1String method, AvErrorWrap error); + + void initStream(StreamWrap &wrap, AVMediaType type); + void seekToPosition(crl::time positionTime); + + // #TODO base::expected. + [[nodiscard]] base::variant readPacket(); + + void processPacket(Packet &&packet); + [[nodiscard]] const StreamWrap &mainStreamUnchecked() const; + [[nodiscard]] const StreamWrap &mainStream() const; + [[nodiscard]] StreamWrap &mainStream(); + void readInformation(crl::time positionTime); + + [[nodiscard]] QImage readFirstVideoFrame(); + [[nodiscard]] auto tryReadFirstVideoFrame() + -> base::variant; + + void enqueueEofPackets(); + + static void ClearStream(StreamWrap &wrap); + [[nodiscard]] static crl::time CountPacketPositionTime( + not_null info, + const Packet &packet); + [[nodiscard]] static crl::time CountPacketPositionTime( + const StreamWrap &wrap, + const Packet &packet); + static void Enqueue(StreamWrap &wrap, Packet &&packet); + + not_null _reader; + Mode _mode = Mode::Both; + int _offset = 0; + int _size = 0; + bool _failed = false; + bool _opened = false; + bool _readTillEnd = false; + crl::semaphore _semaphore; + std::atomic _interrupted = false; + std::optional _information; + + uchar *_ioBuffer = nullptr; + AVIOContext *_ioContext = nullptr; + AVFormatContext *_formatContext = nullptr; + + StreamWrap _video; + StreamWrap _audio; + + AudioMsgId _audioMsgId; + + }; + + std::thread _thread; + Reader _reader; + std::unique_ptr _context; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp new file mode 100644 index 0000000000..2c5177f53f --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp @@ -0,0 +1,14 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_loader.h" + +namespace Media { +namespace Streaming { + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h new file mode 100644 index 0000000000..0b12342fab --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -0,0 +1,38 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +namespace Media { +namespace Streaming { + +struct LoadedPart { + int offset = 0; + QByteArray bytes; + + static constexpr auto kFailedOffset = -1; +}; + +class Loader { +public: + static constexpr auto kPartSize = 128 * 1024; + + //[[nodiscard]] virtual Storage::Cache::Key baseCacheKey() const = 0; + [[nodiscard]] virtual int size() const = 0; + + virtual void load(int offset, int till = -1) = 0; + virtual void stop() = 0; + + // Parts will be sent from the main thread. + [[nodiscard]] virtual rpl::producer parts() const = 0; + + virtual ~Loader() = default; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp new file mode 100644 index 0000000000..bb3d23c3b6 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -0,0 +1,144 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_loader_mtproto.h" + +#include "apiwrap.h" + +namespace Media { +namespace Streaming { +namespace { + +constexpr auto kMaxConcurrentRequests = 1; // #TODO streaming + +} // namespace + +LoaderMtproto::LoaderMtproto( + not_null api, + MTP::DcId dcId, + const MTPInputFileLocation &location, + int size, + Data::FileOrigin origin) +: _api(api) +, _dcId(dcId) +, _location(location) +, _size(size) +, _origin(origin) { +} + +int LoaderMtproto::size() const { + return _size; +} + +void LoaderMtproto::load(int offset, int till) { + crl::on_main(this, [=] { + cancelRequestsBefore(offset); + _till = till; + sendNext(offset); + }); +} + +void LoaderMtproto::sendNext(int possibleOffset) { + Expects((possibleOffset % kPartSize) == 0); + + const auto offset = _requests.empty() + ? possibleOffset + : _requests.back().first + kPartSize; + if ((_till >= 0 && offset >= _till) || (_size > 0 && offset >= _size)) { + return; + } else if (_requests.size() >= kMaxConcurrentRequests) { + return; + } + + static auto DcIndex = 0; + const auto id = _sender.request(MTPupload_GetFile( + _location, + MTP_int(offset), + MTP_int(kPartSize) + )).done([=](const MTPupload_File &result) { + requestDone(offset, result); + }).fail([=](const RPCError &error) { + requestFailed(offset, error); + }).toDC( + MTP::downloadDcId(_dcId, (++DcIndex) % MTP::kDownloadSessionsCount) + ).send(); + _requests.emplace(offset, id); + + sendNext(offset + kPartSize); +} + +void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) { + result.match([&](const MTPDupload_file &data) { + _requests.erase(offset); + if (data.vbytes.v.size() == kPartSize) { + sendNext(offset + kPartSize); + } + _parts.fire({ offset, data.vbytes.v }); + }, [&](const MTPDupload_fileCdnRedirect &data) { + changeCdnParams( + offset, + data.vdc_id.v, + data.vfile_token.v, + data.vencryption_key.v, + data.vencryption_iv.v, + data.vfile_hashes.v); + }); +} + +void LoaderMtproto::changeCdnParams( + int offset, + MTP::DcId dcId, + const QByteArray &token, + const QByteArray &encryptionKey, + const QByteArray &encryptionIV, + const QVector &hashes) { + // #TODO streaming +} + +void LoaderMtproto::requestFailed(int offset, const RPCError &error) { + const auto &type = error.type(); + if (error.code() != 400 || !type.startsWith(qstr("FILE_REFERENCE_"))) { + _parts.fire({ LoadedPart::kFailedOffset }); + return; + } + const auto callback = [=](const Data::UpdatedFileReferences &updated) { + // #TODO streaming + }; + _api->refreshFileReference(_origin, crl::guard(this, callback)); +} + +void LoaderMtproto::stop() { + crl::on_main(this, [=] { + for (const auto [offset, requestId] : base::take(_requests)) { + _sender.request(requestId).cancel(); + } + }); +} + +rpl::producer LoaderMtproto::parts() const { + return _parts.events(); +} + +LoaderMtproto::~LoaderMtproto() = default; + +void LoaderMtproto::cancelRequestsBefore(int offset) { + const auto from = begin(_requests); + const auto till = ranges::lower_bound( + _requests, + offset, + ranges::less(), + [](auto pair) { return pair.first; }); + ranges::for_each( + from, + till, + _sender.requestCanceller(), + &base::flat_map::value_type::second); + _requests.erase(from, till); +} + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h new file mode 100644 index 0000000000..af95f8c753 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -0,0 +1,68 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "media/streaming/media_streaming_loader.h" +#include "mtproto/sender.h" +#include "data/data_file_origin.h" + +class ApiWrap; + +namespace Media { +namespace Streaming { + +class LoaderMtproto : public Loader, public base::has_weak_ptr { +public: + LoaderMtproto( + not_null api, + MTP::DcId dcId, + const MTPInputFileLocation &location, + int size, + Data::FileOrigin origin); + + //[[nodiscard]] Storage::Cache::Key baseCacheKey() const override; + [[nodiscard]] int size() const override; + + void load(int offset, int till = -1) override; + void stop() override; + + // Parts will be sent from the main thread. + [[nodiscard]] rpl::producer parts() const override; + + ~LoaderMtproto(); + +private: + void cancelRequestsBefore(int offset); + void sendNext(int possibleOffset); + + void requestDone(int offset, const MTPupload_File &result); + void requestFailed(int offset, const RPCError &error); + void changeCdnParams( + int offset, + MTP::DcId dcId, + const QByteArray &token, + const QByteArray &encryptionKey, + const QByteArray &encryptionIV, + const QVector &hashes); + + const not_null _api; + const MTP::DcId _dcId = 0; + const MTPInputFileLocation _location; + const int _size = 0; + const Data::FileOrigin _origin; + + int _till = -1; + MTP::Sender _sender; + + base::flat_map _requests; + rpl::event_stream _parts; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp new file mode 100644 index 0000000000..0c92059e76 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -0,0 +1,161 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_reader.h" + +#include "media/streaming/media_streaming_loader.h" +#include "storage/cache/storage_cache_database.h" +#include "data/data_session.h" + +namespace Media { +namespace Streaming { +namespace { + +template // Range::value_type is Pair +int FindNotLoadedStart(Range &&parts, int offset) { + auto result = offset; + for (const auto &part : parts) { + const auto partStart = part.first; + const auto partEnd = partStart + part.second.size(); + if (partStart <= result && partEnd >= result) { + result = partEnd; + } else { + break; + } + } + return result; +} + +template // Range::value_type is Pair +void CopyLoaded(bytes::span buffer, Range &&parts, int offset, int till) { + auto filled = offset; + for (const auto &part : parts) { + const auto bytes = bytes::make_span(part.second); + const auto partStart = part.first; + const auto partEnd = int(partStart + bytes.size()); + const auto copyTill = std::min(partEnd, till); + Assert(partStart <= filled && filled < copyTill); + + const auto from = filled - partStart; + const auto copy = copyTill - filled; + bytes::copy(buffer, bytes.subspan(from, copy)); + buffer = buffer.subspan(copy); + filled += copy; + } +} + +} // namespace + +Reader::Reader( + not_null owner, + std::unique_ptr loader) +: _owner(owner) +, _loader(std::move(loader)) { + _loader->parts( + ) | rpl::start_with_next([=](LoadedPart &&part) { + QMutexLocker lock(&_loadedPartsMutex); + _loadedParts.push_back(std::move(part)); + lock.unlock(); + + if (const auto waiting = _waiting.load()) { + _waiting = nullptr; + waiting->release(); + } + }, _lifetime); +} + +int Reader::size() const { + return _loader->size(); +} + +bool Reader::failed() const { + return _failed; +} + +bool Reader::fill( + bytes::span buffer, + int offset, + crl::semaphore *notify) { + Expects(offset + buffer.size() <= size()); + + const auto wait = [&](int offset) { + _waiting = notify; + loadFor(offset); + return false; + }; + const auto done = [&] { + _waiting = nullptr; + return true; + }; + const auto failed = [&] { + _waiting = nullptr; + if (notify) { + notify->release(); + } + return false; + }; + + processLoadedParts(); + if (_failed) { + return failed(); + } + + const auto after = ranges::upper_bound( + _data, + offset, + ranges::less(), + &base::flat_map::value_type::first); + if (after == begin(_data)) { + return wait(offset); + } + + const auto till = int(offset + buffer.size()); + const auto start = after - 1; + const auto finish = ranges::lower_bound( + start, + end(_data), + till, + ranges::less(), + &base::flat_map::value_type::first); + const auto parts = ranges::make_iterator_range(start, finish); + + const auto haveTill = FindNotLoadedStart(parts, offset); + if (haveTill < till) { + return wait(haveTill); + } + CopyLoaded(buffer, parts, offset, till); + return done(); +} + +void Reader::processLoadedParts() { + QMutexLocker lock(&_loadedPartsMutex); + auto loaded = std::move(_loadedParts); + lock.unlock(); + + if (_failed) { + return; + } + for (auto &part : loaded) { + if (part.offset == LoadedPart::kFailedOffset + || (part.bytes.size() != Loader::kPartSize + && part.offset + part.bytes.size() != size())) { + _failed = true; + return; + } + _data.emplace(part.offset, std::move(part.bytes)); + } +} + +void Reader::loadFor(int offset) { + const auto part = offset / Loader::kPartSize; + _loader->load(part * Loader::kPartSize); +} + +Reader::~Reader() = default; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h new file mode 100644 index 0000000000..e54e6bf042 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -0,0 +1,56 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "base/bytes.h" + +namespace Data { +class Session; +} // namespace Data + +namespace Media { +namespace Streaming { + +class Loader; +struct LoadedPart; + +class Reader final { +public: + Reader(not_null owner, std::unique_ptr loader); + + static constexpr auto kPartSize = 128 * 1024; + + int size() const; + bool fill( + bytes::span buffer, + int offset, + crl::semaphore *notify = nullptr); + bool failed() const; + + ~Reader(); + +private: + void processLoadedParts(); + void loadFor(int offset); + + const not_null _owner; + const std::unique_ptr _loader; + + QMutex _loadedPartsMutex; + std::vector _loadedParts; + std::atomic _waiting = nullptr; + + // #TODO streaming optimize + base::flat_map _data; + bool _failed = false; + rpl::lifetime _lifetime; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/ThirdParty/crl b/Telegram/ThirdParty/crl index 74ddc4d1ac..84072fba75 160000 --- a/Telegram/ThirdParty/crl +++ b/Telegram/ThirdParty/crl @@ -1 +1 @@ -Subproject commit 74ddc4d1ac3a6a2cfc82aa963f7779010c8b8a78 +Subproject commit 84072fba75f14620935e5e91788ce603daeb1988 diff --git a/Telegram/gyp/settings_linux.gypi b/Telegram/gyp/settings_linux.gypi index 17219fa2c2..22160515cf 100644 --- a/Telegram/gyp/settings_linux.gypi +++ b/Telegram/gyp/settings_linux.gypi @@ -22,6 +22,7 @@ '-Wno-unused-but-set-variable', '-Wno-missing-field-initializers', '-Wno-sign-compare', + '-Wno-attributes', ], }, 'conditions': [ diff --git a/Telegram/gyp/settings_mac.gypi b/Telegram/gyp/settings_mac.gypi index 5319191aac..659527825d 100644 --- a/Telegram/gyp/settings_mac.gypi +++ b/Telegram/gyp/settings_mac.gypi @@ -43,6 +43,7 @@ '-Wno-comment', '-Wno-missing-field-initializers', '-Wno-sign-compare', + '-Wno-unknown-attributes', ], }, 'xcode_settings': { diff --git a/Telegram/gyp/telegram_sources.txt b/Telegram/gyp/telegram_sources.txt index 390c685a94..7e4181f967 100644 --- a/Telegram/gyp/telegram_sources.txt +++ b/Telegram/gyp/telegram_sources.txt @@ -454,6 +454,16 @@ <(src_loc)/media/player/media_player_volume_controller.h <(src_loc)/media/player/media_player_widget.cpp <(src_loc)/media/player/media_player_widget.h +<(src_loc)/media/streaming/media_streaming_common.cpp +<(src_loc)/media/streaming/media_streaming_common.h +<(src_loc)/media/streaming/media_streaming_file.cpp +<(src_loc)/media/streaming/media_streaming_file.h +<(src_loc)/media/streaming/media_streaming_loader.cpp +<(src_loc)/media/streaming/media_streaming_loader.h +<(src_loc)/media/streaming/media_streaming_loader_mtproto.cpp +<(src_loc)/media/streaming/media_streaming_loader_mtproto.h +<(src_loc)/media/streaming/media_streaming_reader.cpp +<(src_loc)/media/streaming/media_streaming_reader.h <(src_loc)/media/view/media_clip_controller.cpp <(src_loc)/media/view/media_clip_controller.h <(src_loc)/media/view/media_clip_playback.cpp