Buffer audio when waiting data in streaming.

This commit is contained in:
John Preston 2019-02-21 17:40:09 +04:00
parent e5cd7e6d40
commit 3b369fc98e
20 changed files with 188 additions and 49 deletions

View File

@ -48,7 +48,6 @@ enum {
AudioVoiceMsgMaxLength = 100 * 60, // 100 minutes
AudioVoiceMsgUpdateView = 100, // 100ms
AudioVoiceMsgChannels = 2, // stereo
AudioVoiceMsgBufferSize = 256 * 1024, // 256 Kb buffers (1.3 - 3.0 secs)
StickerMaxSize = 2048, // 2048x2048 is a max image size for sticker

View File

@ -326,7 +326,8 @@ void StartStreaming(
auto options = Media::Streaming::PlaybackOptions();
options.speed = 1.;
options.position = (document->duration() / 2) * crl::time(1000);
//options.syncVideoByAudio = false;
//options.position = (document->duration() / 2) * crl::time(1000);
player->init(options);
player->updates(
) | rpl::start_with_next_error_done([=](Update &&update) {

View File

@ -28,13 +28,14 @@ Q_DECLARE_METATYPE(VoiceWaveform);
namespace {
QMutex AudioMutex;
ALCdevice *AudioDevice = nullptr;
ALCcontext *AudioContext = nullptr;
constexpr auto kSuppressRatioAll = 0.2;
constexpr auto kSuppressRatioSong = 0.05;
constexpr auto kPlaybackSpeedMultiplier = 1.7;
constexpr auto kWaveformCounterBufferSize = 256 * 1024;
QMutex AudioMutex;
ALCdevice *AudioDevice = nullptr;
ALCcontext *AudioContext = nullptr;
auto VolumeMultiplierAll = 1.;
auto VolumeMultiplierSong = 1.;
@ -823,6 +824,10 @@ void Mixer::feedFromVideo(const VideoSoundPart &part) {
_loader->feedFromVideo(part);
}
void Mixer::forceToBufferVideo(const AudioMsgId &audioId) {
_loader->forceToBufferVideo(audioId);
}
Mixer::TimeCorrection Mixer::getVideoTimeCorrection(
const AudioMsgId &audio) const {
Expects(audio.type() == AudioMsgId::Type::Video);
@ -1683,7 +1688,7 @@ public:
}
QByteArray buffer;
buffer.reserve(AudioVoiceMsgBufferSize);
buffer.reserve(kWaveformCounterBufferSize);
int64 countbytes = sampleSize() * samplesCount();
int64 processed = 0;
int64 sumbytes = 0;

View File

@ -122,6 +122,7 @@ public:
// Video player audio stream interface.
void feedFromVideo(const VideoSoundPart &part);
void forceToBufferVideo(const AudioMsgId &audioId);
struct TimeCorrection {
crl::time audioPositionValue = kTimeUnknown;
crl::time audioPositionTime = kTimeUnknown;

View File

@ -22,6 +22,7 @@ namespace {
constexpr auto kCaptureFrequency = Player::kDefaultFrequency;
constexpr auto kCaptureSkipDuration = crl::time(400);
constexpr auto kCaptureFadeInDuration = crl::time(300);
constexpr auto kCaptureBufferSlice = 256 * 1024;
Instance *CaptureInstance = nullptr;
@ -323,7 +324,7 @@ void Instance::Inner::onStart() {
_timer.start(50);
_captured.clear();
_captured.reserve(AudioVoiceMsgBufferSize);
_captured.reserve(kCaptureBufferSlice);
DEBUG_LOG(("Audio Capture: started!"));
}
@ -489,8 +490,8 @@ void Instance::Inner::onTimeout() {
// Get samples from OpenAL
auto s = _captured.size();
auto news = s + static_cast<int>(samples * sizeof(short));
if (news / AudioVoiceMsgBufferSize > s / AudioVoiceMsgBufferSize) {
_captured.reserve(((news / AudioVoiceMsgBufferSize) + 1) * AudioVoiceMsgBufferSize);
if (news / kCaptureBufferSlice > s / kCaptureBufferSlice) {
_captured.reserve(((news / kCaptureBufferSlice) + 1) * kCaptureBufferSlice);
}
_captured.resize(news);
alcCaptureSamples(d->device, (ALCvoid *)(_captured.data() + s), samples);

View File

@ -36,9 +36,16 @@ public:
EndOfFile,
};
virtual ReadResult readMore(QByteArray &samples, int64 &samplesCount) = 0;
virtual void enqueuePackets(QQueue<FFMpeg::AVPacketDataWrap> &packets) {
virtual void enqueuePackets(
QQueue<FFMpeg::AVPacketDataWrap> &&packets) {
Unexpected("enqueuePackets() call on not ChildFFMpegLoader.");
}
virtual void setForceToBuffer(bool force) {
Unexpected("setForceToBuffer() call on not ChildFFMpegLoader.");
}
virtual bool forceToBuffer() const {
return false;
}
void saveDecodedSamples(QByteArray *samples, int64 *samplesCount);
void takeSavedDecodedSamples(QByteArray *samples, int64 *samplesCount);

View File

@ -13,8 +13,14 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
namespace Media {
namespace Player {
namespace {
Loaders::Loaders(QThread *thread) : _fromVideoNotify([this] { videoSoundAdded(); }) {
constexpr auto kPlaybackBufferSize = 256 * 1024;
} // namespace
Loaders::Loaders(QThread *thread)
: _fromVideoNotify([=] { videoSoundAdded(); }) {
moveToThread(thread);
_fromVideoNotify.moveToThread(thread);
connect(thread, SIGNAL(started()), this, SLOT(onInit()));
@ -25,8 +31,22 @@ void Loaders::feedFromVideo(const VideoSoundPart &part) {
auto invoke = false;
{
QMutexLocker lock(&_fromVideoMutex);
invoke = _fromVideoQueues.empty()
&& _fromVideoForceToBuffer.empty();
_fromVideoQueues[part.audio].enqueue(FFMpeg::dataWrapFromPacket(*part.packet));
invoke = true;
}
if (invoke) {
_fromVideoNotify.call();
}
}
void Loaders::forceToBufferVideo(const AudioMsgId &audioId) {
auto invoke = false;
{
QMutexLocker lock(&_fromVideoMutex);
invoke = _fromVideoQueues.empty()
&& _fromVideoForceToBuffer.empty();
_fromVideoForceToBuffer.emplace(audioId);
}
if (invoke) {
_fromVideoNotify.call();
@ -34,36 +54,53 @@ void Loaders::feedFromVideo(const VideoSoundPart &part) {
}
void Loaders::videoSoundAdded() {
auto waitingAndAdded = false;
auto queues = decltype(_fromVideoQueues)();
auto forces = decltype(_fromVideoForceToBuffer)();
{
QMutexLocker lock(&_fromVideoMutex);
queues = base::take(_fromVideoQueues);
forces = base::take(_fromVideoForceToBuffer);
}
auto tryLoader = [this](auto &audio, auto &loader, auto &it) {
if (audio == it.key() && loader) {
loader->enqueuePackets(it.value());
if (loader->holdsSavedDecodedSamples()) {
onLoad(audio);
for (const auto &audioId : forces) {
const auto tryLoader = [&](const auto &id, auto &loader) {
if (audioId == id && loader) {
loader->setForceToBuffer(true);
if (loader->holdsSavedDecodedSamples()
&& !queues.contains(audioId)) {
loadData(audioId);
}
return true;
}
return true;
}
return false;
};
for (auto i = queues.begin(), e = queues.end(); i != e; ++i) {
if (!tryLoader(_audio, _audioLoader, i)
&& !tryLoader(_song, _songLoader, i)
&& !tryLoader(_video, _videoLoader, i)) {
for (auto &packetData : i.value()) {
return false;
};
tryLoader(_audio, _audioLoader)
|| tryLoader(_song, _songLoader)
|| tryLoader(_video, _videoLoader);
}
for (auto &pair : queues) {
const auto audioId = pair.first;
auto &packets = pair.second;
const auto tryLoader = [&](const auto &id, auto &loader) {
if (id == audioId && loader) {
loader->enqueuePackets(std::move(packets));
if (loader->holdsSavedDecodedSamples()) {
loadData(audioId);
}
return true;
}
return false;
};
const auto used = tryLoader(_audio, _audioLoader)
|| tryLoader(_song, _songLoader)
|| tryLoader(_video, _videoLoader);
if (!used) {
for (auto &packetData : packets) {
AVPacket packet;
FFMpeg::packetFromDataWrap(packet, packetData);
FFMpeg::freePacket(&packet);
}
}
}
if (waitingAndAdded) {
onLoad(_video);
}
}
Loaders::~Loaders() {
@ -73,8 +110,10 @@ Loaders::~Loaders() {
void Loaders::clearFromVideoQueue() {
auto queues = base::take(_fromVideoQueues);
for (auto &queue : queues) {
for (auto &packetData : queue) {
for (auto &pair : queues) {
const auto audioId = pair.first;
auto &packets = pair.second;
for (auto &packetData : packets) {
AVPacket packet;
FFMpeg::packetFromDataWrap(packet, packetData);
FFMpeg::freePacket(&packet);
@ -120,7 +159,7 @@ void Loaders::emitError(AudioMsgId::Type type) {
}
void Loaders::onLoad(const AudioMsgId &audio) {
loadData(audio, crl::time(0));
loadData(audio);
}
void Loaders::loadData(AudioMsgId audio, crl::time positionMs) {
@ -144,7 +183,7 @@ void Loaders::loadData(AudioMsgId audio, crl::time positionMs) {
if (l->holdsSavedDecodedSamples()) {
l->takeSavedDecodedSamples(&samples, &samplesCount);
}
while (samples.size() < AudioVoiceMsgBufferSize) {
while (samples.size() < kPlaybackBufferSize) {
auto res = l->readMore(samples, samplesCount);
using Result = AudioPlayerLoader::ReadResult;
if (res == Result::Error) {
@ -166,7 +205,8 @@ void Loaders::loadData(AudioMsgId audio, crl::time positionMs) {
} else if (res == Result::Ok) {
errAtStart = false;
} else if (res == Result::Wait) {
waiting = (samples.size() < AudioVoiceMsgBufferSize);
waiting = (samples.size() < kPlaybackBufferSize)
&& !l->forceToBuffer();
if (waiting) {
l->saveDecodedSamples(&samples, &samplesCount);
}
@ -219,8 +259,16 @@ void Loaders::loadData(AudioMsgId audio, crl::time positionMs) {
if (bufferIndex < 0) { // No free buffers, wait.
l->saveDecodedSamples(&samples, &samplesCount);
return;
} else if (l->forceToBuffer()) {
l->setForceToBuffer(false);
}
//LOG(("[%4] PUSHING %1 SAMPLES (%2 BYTES) %3ms"
// ).arg(samplesCount
// ).arg(samples.size()
// ).arg((samplesCount * 1000LL) / track->frequency
// ).arg(crl::now() % 10000, 4, 10, QChar('0')));
track->bufferSamples[bufferIndex] = samples;
track->samplesCount[bufferIndex] = samplesCount;
track->bufferedLength += samplesCount;

View File

@ -22,6 +22,7 @@ class Loaders : public QObject {
public:
Loaders(QThread *thread);
void feedFromVideo(const VideoSoundPart &part);
void forceToBufferVideo(const AudioMsgId &audioId);
~Loaders();
signals:
@ -45,7 +46,8 @@ private:
std::unique_ptr<AudioPlayerLoader> _videoLoader;
QMutex _fromVideoMutex;
QMap<AudioMsgId, QQueue<FFMpeg::AVPacketDataWrap>> _fromVideoQueues;
base::flat_map<AudioMsgId, QQueue<FFMpeg::AVPacketDataWrap>> _fromVideoQueues;
base::flat_set<AudioMsgId> _fromVideoForceToBuffer;
SingleQueuedInvokation _fromVideoNotify;
void emitError(AudioMsgId::Type type);
@ -58,7 +60,7 @@ private:
SetupErrorLoadedFull = 2,
SetupNoErrorStarted = 3,
};
void loadData(AudioMsgId audio, crl::time positionMs);
void loadData(AudioMsgId audio, crl::time positionMs = 0);
AudioPlayerLoader *setupLoader(
const AudioMsgId &audio,
SetupError &err,

View File

@ -111,11 +111,24 @@ AudioPlayerLoader::ReadResult ChildFFMpegLoader::readMore(
return ReadResult::Ok;
}
void ChildFFMpegLoader::enqueuePackets(QQueue<FFMpeg::AVPacketDataWrap> &packets) {
_queue += std::move(packets);
void ChildFFMpegLoader::enqueuePackets(
QQueue<FFMpeg::AVPacketDataWrap> &&packets) {
if (_queue.empty()) {
_queue = std::move(packets);
} else {
_queue += std::move(packets);
}
packets.clear();
}
void ChildFFMpegLoader::setForceToBuffer(bool force) {
_forceToBuffer = force;
}
bool ChildFFMpegLoader::forceToBuffer() const {
return _forceToBuffer;
}
ChildFFMpegLoader::~ChildFFMpegLoader() {
for (auto &packetData : base::take(_queue)) {
AVPacket packet;

View File

@ -70,7 +70,10 @@ public:
}
ReadResult readMore(QByteArray &result, int64 &samplesAdded) override;
void enqueuePackets(QQueue<FFMpeg::AVPacketDataWrap> &packets) override;
void enqueuePackets(
QQueue<FFMpeg::AVPacketDataWrap> &&packets) override;
void setForceToBuffer(bool force) override;
bool forceToBuffer() const override;
bool eofReached() const {
return _eofReached;
@ -88,6 +91,7 @@ private:
std::unique_ptr<VideoSoundData> _parentData;
QQueue<FFMpeg::AVPacketDataWrap> _queue;
bool _forceToBuffer = false;
bool _eofReached = false;
};

View File

@ -50,6 +50,12 @@ void AudioTrack::process(Packet &&packet) {
}
}
void AudioTrack::waitForData() {
if (initialized()) {
mixerForceToBuffer();
}
}
bool AudioTrack::initialized() const {
return !_ready;
}
@ -115,6 +121,10 @@ void AudioTrack::mixerEnqueue(Packet &&packet) {
packet.release();
}
void AudioTrack::mixerForceToBuffer() {
Media::Player::mixer()->forceToBufferVideo(_audioId);
}
void AudioTrack::start(crl::time startTime) {
Expects(initialized());

View File

@ -39,6 +39,7 @@ public:
// Called from the same unspecified thread.
void process(Packet &&packet);
void waitForData();
// Called from the main thread.
~AudioTrack();
@ -50,6 +51,7 @@ private:
[[nodiscard]] bool fillStateFromFrame();
void mixerInit();
void mixerEnqueue(Packet &&packet);
void mixerForceToBuffer();
void callReady();
const PlaybackOptions _options;

View File

@ -27,6 +27,7 @@ struct PlaybackOptions {
Mode mode = Mode::Both;
crl::time position = 0;
float64 speed = 1.; // Valid values between 0.5 and 2.
bool syncVideoByAudio = true;
};
struct TrackState {

View File

@ -41,6 +41,7 @@ int File::Context::read(bytes::span buffer) {
}
buffer = buffer.subspan(0, amount);
while (!_reader->fill(buffer, _offset, &_semaphore)) {
_delegate->fileWaitingForData();
_semaphore.acquire();
if (_interrupted) {
return -1;

View File

@ -17,6 +17,7 @@ class FileDelegate {
public:
virtual void fileReady(Stream &&video, Stream &&audio) = 0;
virtual void fileError() = 0;
virtual void fileWaitingForData() = 0;
// Return true if reading and processing more packets is desired.
// Return false if sleeping until 'wake()' is called is desired.

View File

@ -13,7 +13,7 @@ namespace Media {
namespace Streaming {
namespace {
constexpr auto kMaxConcurrentRequests = 1; // #TODO streaming
constexpr auto kMaxConcurrentRequests = 2; // #TODO streaming
} // namespace

View File

@ -158,7 +158,6 @@ void Player::trackPlayedTill(
void Player::audioReceivedTill(crl::time position) {
Expects(_audio != nullptr);
//LOG(("AUDIO TILL: %1").arg(position));
trackReceivedTill(*_audio, _information.audio.state, position);
}
@ -171,7 +170,6 @@ void Player::audioPlayedTill(crl::time position) {
void Player::videoReceivedTill(crl::time position) {
Expects(_video != nullptr);
//LOG(("VIDEO TILL: %1").arg(position));
trackReceivedTill(*_video, _information.video.state, position);
}
@ -182,6 +180,8 @@ void Player::videoPlayedTill(crl::time position) {
}
void Player::fileReady(Stream &&video, Stream &&audio) {
_waitingForData = false;
const auto weak = base::make_weak(&_sessionGuard);
const auto ready = [=](const Information & data) {
crl::on_main(weak, [=, data = data]() mutable {
@ -226,12 +226,32 @@ void Player::fileReady(Stream &&video, Stream &&audio) {
}
void Player::fileError() {
_waitingForData = false;
crl::on_main(&_sessionGuard, [=] {
fail();
});
}
void Player::fileWaitingForData() {
if (_waitingForData) {
return;
}
_waitingForData = true;
crl::on_main(&_sessionGuard, [=] {
_updates.fire({ WaitingForData() });
});
if (_audio) {
_audio->waitForData();
}
if (_video) {
_video->waitForData();
}
}
bool Player::fileProcessPacket(Packet &&packet) {
_waitingForData = false;
const auto &native = packet.fields();
const auto index = native.stream_index;
if (packet.empty()) {
@ -250,12 +270,20 @@ bool Player::fileProcessPacket(Packet &&packet) {
}
} else if (_audio && _audio->streamIndex() == native.stream_index) {
const auto time = PacketPosition(packet, _audio->streamTimeBase());
//LOG(("[%2] AUDIO PACKET FOR %1ms"
// ).arg(time
// ).arg(crl::now() % 10000, 4, 10, QChar('0')));
crl::on_main(&_sessionGuard, [=] {
audioReceivedTill(time);
});
_audio->process(std::move(packet));
} else if (_video && _video->streamIndex() == native.stream_index) {
const auto time = PacketPosition(packet, _video->streamTimeBase());
//LOG(("[%2] VIDEO PACKET FOR %1ms"
// ).arg(time
// ).arg(crl::now() % 10000, 4, 10, QChar('0')));
crl::on_main(&_sessionGuard, [=] {
videoReceivedTill(time);
});
@ -368,7 +396,16 @@ rpl::lifetime &Player::lifetime() {
return _lifetime;
}
Player::~Player() = default;
Player::~Player() {
// The order of field destruction is important.
//
// We are forced to maintain the correct order in the stop() method,
// because it can be called even before the player destruction.
//
// So instead of maintaining it in the class definition as well we
// simply call stop() here, after that the destruction is trivial.
stop();
}
} // namespace Streaming
} // namespace Media

View File

@ -52,6 +52,9 @@ public:
~Player();
private:
static constexpr auto kReceivedTillEnd
= std::numeric_limits<crl::time>::max();
enum class Stage {
Uninitialized,
Initializing,
@ -66,6 +69,7 @@ private:
// FileDelegate methods are called only from the File thread.
void fileReady(Stream &&video, Stream &&audio) override;
void fileError() override;
void fileWaitingForData() override;
bool fileProcessPacket(Packet &&packet) override;
bool fileReadMore() override;
@ -95,9 +99,6 @@ private:
const std::unique_ptr<File> _file;
static constexpr auto kReceivedTillEnd
= std::numeric_limits<crl::time>::max();
// Immutable while File is active after it is ready.
AudioMsgId _audioId;
std::unique_ptr<AudioTrack> _audio;
@ -109,6 +110,7 @@ private:
// Belongs to the File thread while File is active.
bool _readTillEnd = false;
bool _waitingForData = false;
// Belongs to the main thread.
Information _information;

View File

@ -277,7 +277,7 @@ VideoTrackObject::TrackTime VideoTrackObject::trackTime() const {
return result;
}
const auto correction = _audioId.playId()
const auto correction = (_options.syncVideoByAudio && _audioId.playId())
? Media::Player::mixer()->getVideoTimeCorrection(_audioId)
: Media::Player::Mixer::TimeCorrection();
const auto knownValue = correction
@ -478,6 +478,9 @@ void VideoTrack::process(Packet &&packet) {
});
}
void VideoTrack::waitForData() {
}
void VideoTrack::start(crl::time startTime) {
_wrapped.with([=](Implementation &unwrapped) {
unwrapped.start(startTime);

View File

@ -33,6 +33,7 @@ public:
// Called from the same unspecified thread.
void process(Packet &&packet);
void waitForData();
// Called from the main thread.
void start(crl::time startTime);