Support receiving RTMP streams in group calls.

This commit is contained in:
John Preston 2022-02-25 14:14:15 +03:00
parent 9d200017c3
commit 1de35cf8ef
17 changed files with 189 additions and 57 deletions

View File

@ -214,6 +214,9 @@ void Instance::startOrJoinGroupCall(
}, [=](Group::JoinInfo info) {
const auto call = info.peer->groupCall();
info.joinHash = joinHash;
if (call) {
info.rtmp = call->rtmp();
}
createGroupCall(
std::move(info),
call ? call->input() : MTP_inputGroupCall(MTPlong(), MTPlong()));

View File

@ -100,41 +100,6 @@ using JoinClientFields = std::variant<
JoinVideoEndpoint,
JoinBroadcastStream>;
class RequestCurrentTimeTask final : public tgcalls::BroadcastPartTask {
public:
RequestCurrentTimeTask(
base::weak_ptr<GroupCall> call,
Fn<void(int64)> done);
void done(int64 value);
void cancel() override;
private:
const base::weak_ptr<GroupCall> _call;
Fn<void(int64)> _done;
QMutex _mutex;
};
RequestCurrentTimeTask::RequestCurrentTimeTask(
base::weak_ptr<GroupCall> call,
Fn<void(int64)> done)
: _call(call)
, _done(std::move(done)) {
}
void RequestCurrentTimeTask::done(int64 value) {
QMutexLocker lock(&_mutex);
if (_done) {
base::take(_done)(value);
}
}
void RequestCurrentTimeTask::cancel() {
QMutexLocker lock(&_mutex);
_done = nullptr;
}
[[nodiscard]] JoinClientFields ParseJoinResponse(const QByteArray &json) {
auto error = QJsonParseError{ 0, QJsonParseError::NoError };
const auto document = QJsonDocument::fromJson(json, &error);
@ -235,6 +200,23 @@ private:
};
class GroupCall::RequestCurrentTimeTask final
: public tgcalls::BroadcastPartTask {
public:
RequestCurrentTimeTask(
base::weak_ptr<GroupCall> call,
Fn<void(int64)> done);
void done(int64 value);
void cancel() override;
private:
const base::weak_ptr<GroupCall> _call;
Fn<void(int64)> _done;
QMutex _mutex;
};
struct GroupCall::SinkPointer {
std::weak_ptr<Webrtc::SinkInterface> data;
};
@ -286,6 +268,10 @@ GroupCall::VideoTrack::VideoTrack(
return false;
}
bool VideoEndpoint::rtmp() const noexcept {
return (id == Data::RtmpEndpointId());
}
struct VideoParams {
std::string endpointId;
std::vector<tgcalls::MediaSsrcGroup> ssrcGroups;
@ -544,6 +530,25 @@ void GroupCall::MediaChannelDescriptionsTask::cancel() {
}
}
GroupCall::RequestCurrentTimeTask::RequestCurrentTimeTask(
base::weak_ptr<GroupCall> call,
Fn<void(int64)> done)
: _call(call)
, _done(std::move(done)) {
}
void GroupCall::RequestCurrentTimeTask::done(int64 value) {
QMutexLocker lock(&_mutex);
if (_done) {
base::take(_done)(value);
}
}
void GroupCall::RequestCurrentTimeTask::cancel() {
QMutexLocker lock(&_mutex);
_done = nullptr;
}
not_null<PeerData*> GroupCall::TrackPeer(
const std::unique_ptr<VideoTrack> &track) {
return track->peer;
@ -577,6 +582,7 @@ GroupCall::GroupCall(
, _checkJoinedTimer([=] { checkJoined(); })
, _pushToTalkCancelTimer([=] { pushToTalkCancel(); })
, _connectingSoundTimer([=] { playConnectingSoundOnce(); })
, _rtmp(info.rtmp)
, _mediaDevices(CreateMediaDevices()) {
_muted.value(
) | rpl::combine_previous(
@ -991,6 +997,10 @@ bool GroupCall::scheduleStartSubscribed() const {
return false;
}
bool GroupCall::rtmp() const {
return _rtmp;
}
Data::GroupCall *GroupCall::lookupReal() const {
const auto real = _peer->groupCall();
return (real && real->id() == _id) ? real : nullptr;
@ -1779,11 +1789,13 @@ void GroupCall::handlePossibleCreateOrJoinResponse(
data.vid(),
data.vaccess_hash());
const auto scheduleDate = data.vschedule_date().value_or_empty();
const auto rtmp = data.is_rtmp_stream();
_rtmp = rtmp;
setScheduledDate(scheduleDate);
if (const auto chat = _peer->asChat()) {
chat->setGroupCall(input, scheduleDate);
chat->setGroupCall(input, scheduleDate, rtmp);
} else if (const auto group = _peer->asChannel()) {
group->setGroupCall(input, scheduleDate);
group->setGroupCall(input, scheduleDate, rtmp);
} else {
Unexpected("Peer type in GroupCall::join.");
}
@ -2303,7 +2315,7 @@ bool GroupCall::tryCreateController() {
call,
std::move(done));
crl::on_main(weak, [=] {
result->done(approximateServerTimeInMs());
requestCurrentTimeStart(std::move(result));
});
return result;
},
@ -2557,6 +2569,48 @@ void GroupCall::mediaChannelDescriptionsCancel(
}
}
void GroupCall::requestCurrentTimeStart(
std::shared_ptr<RequestCurrentTimeTask> task) {
if (!_rtmp) {
task->done(approximateServerTimeInMs());
return;
}
_requestCurrentTimes.emplace(std::move(task));
if (_requestCurrentTimeRequestId) {
return;
}
const auto finish = [=](int64 value) {
for (const auto &task : base::take(_requestCurrentTimes)) {
task->done(value);
}
};
_requestCurrentTimeRequestId = _api.request(
MTPphone_GetGroupCallStreamChannels(inputCall())
).done([=](const MTPphone_GroupCallStreamChannels &result) {
result.match([&](const MTPDphone_groupCallStreamChannels &data) {
const auto &list = data.vchannels().v;
if (!list.isEmpty()) {
const auto &first = list.front();
first.match([&](const MTPDgroupCallStreamChannel &data) {
finish(data.vlast_timestamp_ms().v);
});
} else {
finish(0);
}
});
}).fail([=] {
finish(0);
}).handleAllErrors().send();
}
void GroupCall::requestCurrentTimeCancel(
not_null<RequestCurrentTimeTask*> task) {
const auto i = _requestCurrentTimes.find(task.get());
if (i != end(_requestCurrentTimes)) {
_requestCurrentTimes.erase(i);
}
}
int64 GroupCall::approximateServerTimeInMs() const {
Expects(_serverTimeMs != 0);
@ -2581,6 +2635,15 @@ void GroupCall::updateRequestedVideoChannels() {
const auto &endpointId = endpoint.id;
if (endpointId == camera || endpointId == screen) {
continue;
} else if (endpointId == Data::RtmpEndpointId()) {
channels.push_back({
.endpointId = endpointId,
.minQuality = (video->quality == Group::VideoQuality::Full
? Quality::Full
: Quality::Thumbnail),
.maxQuality = Quality::Full,
});
continue;
}
const auto participant = real->participantByEndpoint(endpointId);
const auto params = (participant && participant->ssrc)
@ -2676,6 +2739,17 @@ void GroupCall::fillActiveVideoEndpoints() {
const auto real = lookupReal();
Assert(real != nullptr);
if (_rtmp) {
_videoIsWorking = true;
markEndpointActive({
VideoEndpointType::Screen,
_peer,
Data::RtmpEndpointId(),
}, true, false);
updateRequestedVideoChannels();
return;
}
const auto me = real->participantByPeer(joinAs());
if (me && me->videoJoined) {
_videoIsWorking = true;
@ -3004,7 +3078,7 @@ void GroupCall::setInstanceMode(InstanceMode mode) {
case InstanceMode::Stream: return Mode::GroupConnectionModeBroadcast;
}
Unexpected("Mode in GroupCall::setInstanceMode.");
}(), true, false); // #TODO streams
}(), true, _rtmp);
}
void GroupCall::setScreenInstanceMode(InstanceMode mode) {

View File

@ -97,6 +97,7 @@ struct VideoEndpoint {
PeerData *peer = nullptr;
std::string id;
[[nodiscard]] bool rtmp() const noexcept;
[[nodiscard]] bool empty() const noexcept {
Expects(id.empty() || peer != nullptr);
@ -230,6 +231,7 @@ public:
return _scheduleDate;
}
[[nodiscard]] bool scheduleStartSubscribed() const;
[[nodiscard]] bool rtmp() const;
[[nodiscard]] Data::GroupCall *lookupReal() const;
[[nodiscard]] rpl::producer<not_null<Data::GroupCall*>> real() const;
@ -406,6 +408,7 @@ public:
private:
class LoadPartTask;
class MediaChannelDescriptionsTask;
class RequestCurrentTimeTask;
using GlobalShortcutValue = base::GlobalShortcutValue;
using Error = Group::Error;
struct SinkPointer;
@ -460,6 +463,10 @@ private:
std::shared_ptr<MediaChannelDescriptionsTask> task);
void mediaChannelDescriptionsCancel(
not_null<MediaChannelDescriptionsTask*> task);
void requestCurrentTimeStart(
std::shared_ptr<RequestCurrentTimeTask> task);
void requestCurrentTimeCancel(
not_null<RequestCurrentTimeTask*> task);
[[nodiscard]] int64 approximateServerTimeInMs() const;
[[nodiscard]] bool mediaChannelDescriptionsFill(
@ -567,10 +574,14 @@ private:
MTP::DcId _broadcastDcId = 0;
base::flat_map<not_null<LoadPartTask*>, LoadingPart> _broadcastParts;
base::flat_set<
std::shared_ptr<
MediaChannelDescriptionsTask>,
std::shared_ptr<MediaChannelDescriptionsTask>,
base::pointer_comparator<
MediaChannelDescriptionsTask>> _mediaChannelDescriptionses;
base::flat_set<
std::shared_ptr<RequestCurrentTimeTask>,
base::pointer_comparator<
RequestCurrentTimeTask>> _requestCurrentTimes;
mtpRequestId _requestCurrentTimeRequestId = 0;
rpl::variable<not_null<PeerData*>> _joinAs;
std::vector<not_null<PeerData*>> _possibleJoinAs;
@ -648,6 +659,7 @@ private:
base::Timer _pushToTalkCancelTimer;
base::Timer _connectingSoundTimer;
bool _hadJoinedState = false;
bool _rtmp = false;
std::unique_ptr<Webrtc::MediaDevices> _mediaDevices;
QString _audioInputId;

View File

@ -52,6 +52,7 @@ struct JoinInfo {
std::vector<not_null<PeerData*>> possibleJoinAs;
QString joinHash;
TimeId scheduleDate = 0;
bool rtmp = false;
};
enum class PanelMode {

View File

@ -1704,6 +1704,13 @@ Row *Members::lookupRow(not_null<PeerData*> peer) const {
return _listController->findRow(peer);
}
not_null<MembersRow*> Members::rtmpFakeRow(not_null<PeerData*> peer) const {
if (!_rtmpFakeRow) {
_rtmpFakeRow = std::make_unique<Row>(_listController.get(), peer);
}
return _rtmpFakeRow.get();
}
void Members::setMode(PanelMode mode) {
if (_mode.current() == mode) {
return;

View File

@ -61,6 +61,8 @@ public:
}
[[nodiscard]] MembersRow *lookupRow(not_null<PeerData*> peer) const;
[[nodiscard]] not_null<MembersRow*> rtmpFakeRow(
not_null<PeerData*> peer) const;
void setMode(PanelMode mode);
[[nodiscard]] QRect getInnerGeometry() const;
@ -108,6 +110,8 @@ private:
ListWidget *_list = nullptr;
rpl::event_stream<> _addMemberRequests;
mutable std::unique_ptr<MembersRow> _rtmpFakeRow;
rpl::variable<bool> _canInviteByLink;
rpl::variable<bool> _canAddMembers;

View File

@ -905,8 +905,11 @@ void Panel::setupVideo(not_null<Viewport*> viewport) {
const VideoEndpoint &endpoint,
const std::unique_ptr<GroupCall::VideoTrack> &track) {
using namespace rpl::mappers;
const auto row = _members->lookupRow(GroupCall::TrackPeer(track));
const auto row = endpoint.rtmp()
? _members->rtmpFakeRow(GroupCall::TrackPeer(track)).get()
: _members->lookupRow(GroupCall::TrackPeer(track));
Assert(row != nullptr);
auto pinned = rpl::combine(
_call->videoEndpointLargeValue(),
_call->videoEndpointPinnedValue()

View File

@ -722,7 +722,7 @@ void Viewport::updateTilesGeometryColumn(int outerWidth) {
};
const auto topPeer = _large ? _large->row()->peer().get() : nullptr;
const auto reorderNeeded = [&] {
if (!_large) {
if (!topPeer) {
return false;
}
for (const auto &tile : _tiles) {

View File

@ -30,10 +30,11 @@ Viewport::VideoTile::VideoTile(
Fn<void()> update)
: _endpoint(endpoint)
, _update(std::move(update))
, _track(track)
, _trackSize(std::move(trackSize)) {
Expects(track.track != nullptr);
Expects(track.row != nullptr);
, _track(std::move(track))
, _trackSize(std::move(trackSize))
, _rtmp(endpoint.rtmp()) {
Expects(_track.track != nullptr);
Expects(_track.row != nullptr);
using namespace rpl::mappers;
_track.track->stateValue(

View File

@ -36,6 +36,9 @@ public:
[[nodiscard]] not_null<MembersRow*> row() const {
return _track.row;
}
[[nodiscard]] bool rtmp() const {
return _rtmp;
}
[[nodiscard]] QRect geometry() const {
return _geometry;
}
@ -119,6 +122,7 @@ private:
bool _topControlsShown = false;
bool _pinned = false;
bool _hidden = true;
bool _rtmp = false;
std::optional<VideoQuality> _quality;
rpl::lifetime _lifetime;

View File

@ -718,7 +718,8 @@ void ChannelData::migrateCall(std::unique_ptr<Data::GroupCall> call) {
void ChannelData::setGroupCall(
const MTPInputGroupCall &call,
TimeId scheduleDate) {
TimeId scheduleDate,
bool rtmp) {
call.match([&](const MTPDinputGroupCall &data) {
if (_call && _call->id() == data.vid().v) {
return;
@ -736,7 +737,8 @@ void ChannelData::setGroupCall(
this,
data.vid().v,
data.vaccess_hash().v,
scheduleDate);
scheduleDate,
rtmp);
owner().registerGroupCall(_call.get());
session().changes().peerUpdated(this, UpdateFlag::GroupCall);
addFlags(Flag::CallActive);

View File

@ -405,7 +405,8 @@ public:
void migrateCall(std::unique_ptr<Data::GroupCall> call);
void setGroupCall(
const MTPInputGroupCall &call,
TimeId scheduleDate = 0);
TimeId scheduleDate = 0,
bool rtmp = false);
void clearGroupCall();
void setGroupCallDefaultJoinAs(PeerId peerId);
[[nodiscard]] PeerId groupCallDefaultJoinAs() const;

View File

@ -203,7 +203,8 @@ void ChatData::setMigrateToChannel(ChannelData *channel) {
void ChatData::setGroupCall(
const MTPInputGroupCall &call,
TimeId scheduleDate) {
TimeId scheduleDate,
bool rtmp) {
if (migrateTo()) {
return;
}
@ -224,7 +225,8 @@ void ChatData::setGroupCall(
this,
data.vid().v,
data.vaccess_hash().v,
scheduleDate);
scheduleDate,
rtmp);
owner().registerGroupCall(_call.get());
session().changes().peerUpdated(this, UpdateFlag::GroupCall);
addFlags(Flag::CallActive);

View File

@ -148,7 +148,8 @@ public:
}
void setGroupCall(
const MTPInputGroupCall &call,
TimeId scheduleDate = 0);
TimeId scheduleDate = 0,
bool rtmp = false);
void clearGroupCall();
void setGroupCallDefaultJoinAs(PeerId peerId);
[[nodiscard]] PeerId groupCallDefaultJoinAs() const;

View File

@ -35,6 +35,11 @@ constexpr auto kWaitForUpdatesTimeout = 3 * crl::time(1000);
} // namespace
const std::string &RtmpEndpointId() {
static const auto result = std::string("unified");
return result;
}
const std::string &GroupCallParticipant::cameraEndpoint() const {
return GetCameraEndpoint(videoParams);
}
@ -55,13 +60,15 @@ GroupCall::GroupCall(
not_null<PeerData*> peer,
CallId id,
CallId accessHash,
TimeId scheduleDate)
TimeId scheduleDate,
bool rtmp)
: _id(id)
, _accessHash(accessHash)
, _peer(peer)
, _reloadByQueuedUpdatesTimer([=] { reload(); })
, _speakingByActiveFinishTimer([=] { checkFinishSpeakingByActive(); })
, _scheduleDate(scheduleDate) {
, _scheduleDate(scheduleDate)
, _rtmp(rtmp) {
}
GroupCall::~GroupCall() {
@ -78,6 +85,10 @@ bool GroupCall::loaded() const {
return _version > 0;
}
bool GroupCall::rtmp() const {
return _rtmp;
}
not_null<PeerData*> GroupCall::peer() const {
return _peer;
}
@ -383,6 +394,7 @@ void GroupCall::applyCallFields(const MTPDgroupCall &data) {
LOG(("API Error: Got zero version in groupCall."));
_version = 1;
}
_rtmp = data.is_rtmp_stream();
_joinMuted = data.is_join_muted();
_canChangeJoinMuted = data.is_can_change_join_muted();
_joinedToTop = !data.is_join_date_asc();

View File

@ -19,6 +19,8 @@ struct ParticipantVideoParams;
namespace Data {
[[nodiscard]] const std::string &RtmpEndpointId();
struct LastSpokeTimes {
crl::time anything = 0;
crl::time voice = 0;
@ -55,11 +57,13 @@ public:
not_null<PeerData*> peer,
CallId id,
CallId accessHash,
TimeId scheduleDate);
TimeId scheduleDate,
bool rtmp);
~GroupCall();
[[nodiscard]] CallId id() const;
[[nodiscard]] bool loaded() const;
[[nodiscard]] bool rtmp() const;
[[nodiscard]] not_null<PeerData*> peer() const;
[[nodiscard]] MTPInputGroupCall input() const;
[[nodiscard]] QString title() const {
@ -239,6 +243,7 @@ private:
bool _allParticipantsLoaded = false;
bool _joinedToTop = false;
bool _applyingQueuedUpdates = false;
bool _rtmp = false;
};

View File

@ -35,7 +35,7 @@ constexpr ShiftedDcId updaterDcId(DcId dcId) {
return ShiftDcId(dcId, kUpdaterDcShift);
}
// send(MTPupload_GetFile(), MTP::groupCallStreamDcId(dc)) - for gorup call stream
// send(MTPupload_GetFile(), MTP::groupCallStreamDcId(dc)) - for group call stream
constexpr ShiftedDcId groupCallStreamDcId(DcId dcId) {
return ShiftDcId(dcId, kGroupCallStreamDcShift);
}