Add info media preloading to both sides.

This commit is contained in:
John Preston 2017-10-05 18:32:34 +01:00
parent 65cc4d3fbc
commit 7f3c97fb01
9 changed files with 759 additions and 199 deletions

View File

@ -20,6 +20,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#include "history/history_shared_media.h"
#include <rpl/combine.h>
#include "auth_session.h"
#include "apiwrap.h"
#include "storage/storage_facade.h"
@ -114,25 +115,6 @@ private:
};
class SharedMediaMergedSliceBuilder {
public:
using Type = SharedMediaMergedSlice::Type;
using Key = SharedMediaMergedSlice::Key;
SharedMediaMergedSliceBuilder(Key key);
void applyPartUpdate(SharedMediaSlice &&update);
void applyMigratedUpdate(SharedMediaSlice &&update);
SharedMediaMergedSlice snapshot() const;
private:
Key _key;
SharedMediaSlice _part;
base::optional<SharedMediaSlice> _migrated;
};
class SharedMediaWithLastSliceBuilder {
public:
using Type = SharedMediaWithLastSlice::Type;
@ -204,6 +186,15 @@ base::optional<int> SharedMediaSlice::distance(const Key &a, const Key &b) const
return base::none;
}
base::optional<MsgId> SharedMediaSlice::nearest(MsgId msgId) const {
if (auto it = base::lower_bound(_ids, msgId); it != _ids.end()) {
return *it;
} else if (_ids.empty()) {
return base::none;
}
return _ids.back();
}
QString SharedMediaSlice::debug() const {
auto before = _skippedBefore
? (*_skippedBefore
@ -566,7 +557,9 @@ FullMsgId SharedMediaMergedSlice::operator[](int index) const {
return ComputeId(_part, index);
}
base::optional<int> SharedMediaMergedSlice::distance(const Key &a, const Key &b) const {
base::optional<int> SharedMediaMergedSlice::distance(
const Key &a,
const Key &b) const {
if (a.type != _key.type
|| b.type != _key.type
|| a.peerId != _key.peerId
@ -583,31 +576,35 @@ base::optional<int> SharedMediaMergedSlice::distance(const Key &a, const Key &b)
return base::none;
}
auto SharedMediaMergedSlice::nearest(
UniversalMsgId id) const -> base::optional<UniversalMsgId> {
auto convertFromMigratedNearest = [](MsgId result) {
return result - ServerMaxMsgId;
};
if (IsServerMsgId(id)) {
if (auto partNearestId = _part.nearest(id)) {
return partNearestId;
} else if (isolatedInPart()) {
return base::none;
}
return _migrated->nearest(ServerMaxMsgId - 1)
| convertFromMigratedNearest;
}
if (auto migratedNearestId = _migrated
? _migrated->nearest(id + ServerMaxMsgId)
: base::none) {
return migratedNearestId
| convertFromMigratedNearest;
} else if (isolatedInMigrated()) {
return base::none;
}
return _part.nearest(0);
}
QString SharedMediaMergedSlice::debug() const {
return (_migrated ? (_migrated->debug() + '|') : QString()) + _part.debug();
}
SharedMediaMergedSliceBuilder::SharedMediaMergedSliceBuilder(Key key)
: _key(key)
, _part(SharedMediaMergedSlice::PartKey(_key))
, _migrated(SharedMediaMergedSlice::MigratedSlice(_key)) {
}
void SharedMediaMergedSliceBuilder::applyPartUpdate(SharedMediaSlice &&update) {
_part = std::move(update);
}
void SharedMediaMergedSliceBuilder::applyMigratedUpdate(SharedMediaSlice &&update) {
_migrated = std::move(update);
}
SharedMediaMergedSlice SharedMediaMergedSliceBuilder::snapshot() const {
return SharedMediaMergedSlice(
_key,
_part,
_migrated);
}
rpl::producer<SharedMediaMergedSlice> SharedMediaMergedViewer(
SharedMediaMergedSlice::Key key,
int limitBefore,
@ -618,30 +615,35 @@ rpl::producer<SharedMediaMergedSlice> SharedMediaMergedViewer(
Expects((key.universalId != 0) || (limitBefore == 0 && limitAfter == 0));
return [=](auto consumer) {
auto lifetime = rpl::lifetime();
auto builder = lifetime.make_state<SharedMediaMergedSliceBuilder>(key);
SharedMediaViewer(
if (key.migratedPeerId) {
return rpl::combine(
SharedMediaViewer(
SharedMediaMergedSlice::PartKey(key),
limitBefore,
limitAfter),
SharedMediaViewer(
SharedMediaMergedSlice::MigratedKey(key),
limitBefore,
limitAfter))
| rpl::start_with_next([=](
SharedMediaSlice &&part,
SharedMediaSlice &&migrated) {
consumer.put_next(SharedMediaMergedSlice(
key,
std::move(part),
std::move(migrated)));
});
}
return SharedMediaViewer(
SharedMediaMergedSlice::PartKey(key),
limitBefore,
limitAfter
) | rpl::start_with_next([=](SharedMediaSlice &&update) {
builder->applyPartUpdate(std::move(update));
consumer.put_next(builder->snapshot());
}, lifetime);
if (key.migratedPeerId) {
SharedMediaViewer(
SharedMediaMergedSlice::MigratedKey(key),
limitBefore,
limitAfter
) | rpl::start_with_next([=](SharedMediaSlice &&update) {
builder->applyMigratedUpdate(std::move(update));
consumer.put_next(builder->snapshot());
}, lifetime);
}
return lifetime;
limitAfter)
| rpl::start_with_next([=](SharedMediaSlice &&part) {
consumer.put_next(SharedMediaMergedSlice(
key,
std::move(part),
base::none));
});
};
}

View File

@ -52,6 +52,7 @@ public:
int size() const { return _ids.size(); }
MsgId operator[](int index) const;
base::optional<int> distance(const Key &a, const Key &b) const;
base::optional<MsgId> nearest(MsgId msgId) const;
QString debug() const;
@ -82,10 +83,10 @@ public:
PeerId migratedPeerId,
Type type,
UniversalMsgId universalId)
: peerId(peerId)
, migratedPeerId(migratedPeerId)
, type(type)
, universalId(universalId) {
: peerId(peerId)
, migratedPeerId(migratedPeerId)
, type(type)
, universalId(universalId) {
}
bool operator==(const Key &other) const {
@ -117,6 +118,7 @@ public:
int size() const;
FullMsgId operator[](int index) const;
base::optional<int> distance(const Key &a, const Key &b) const;
base::optional<UniversalMsgId> nearest(UniversalMsgId id) const;
QString debug() const;

View File

@ -238,6 +238,14 @@ object_ptr<ListWidget> InnerWidget::setupList(
| rpl::start_with_next(
[this] { refreshHeight(); },
result->lifetime());
using namespace rpl::mappers;
result->scrollToRequests()
| rpl::map([widget = result.data()](int to) {
return widget->y() + to;
})
| rpl::start_to_stream(
_scrollToRequests,
result->lifetime());
return result;
}

View File

@ -52,6 +52,10 @@ public:
void saveState(not_null<Memento*> memento);
void restoreState(not_null<Memento*> memento);
rpl::producer<int> scrollToRequests() const {
return _scrollToRequests.events();
}
protected:
int resizeGetHeight(int newWidth) override;
void visibleTopBottomUpdated(
@ -81,6 +85,8 @@ private:
object_ptr<Ui::PlainShadow> _otherTabsShadow = { nullptr };
object_ptr<ListWidget> _list = { nullptr };
rpl::event_stream<int> _scrollToRequests;
};
} // namespace Media

View File

@ -35,7 +35,10 @@ namespace Info {
namespace Media {
namespace {
constexpr auto kIdsLimit = 256;
constexpr auto kPreloadedScreensCount = 4;
constexpr auto kPreloadIfLessThanScreens = 2;
constexpr auto kPreloadedScreensCountFull
= kPreloadedScreensCount + 1 + kPreloadedScreensCount;
using ItemBase = Layout::ItemBase;
using UniversalMsgId = int32;
@ -93,8 +96,8 @@ public:
}
bool removeItem(UniversalMsgId universalId);
base::optional<QRect> findItemRect(
UniversalMsgId universalId) const;
FoundItem findItemNearId(UniversalMsgId universalId) const;
FoundItem findItemByPoint(QPoint point) const;
void paint(
Painter &p,
@ -102,6 +105,8 @@ public:
int outerWidth,
TimeMs ms) const;
static int MinItemHeight(Type type, int width);
private:
using Items = base::flat_map<
UniversalMsgId,
@ -117,6 +122,9 @@ private:
Items::const_iterator from,
int bottom) const;
QRect findItemRect(not_null<ItemBase*> item) const;
FoundItem completeResult(
not_null<ItemBase*> item,
bool exact) const;
int recountHeight() const;
void refreshHeight();
@ -204,14 +212,6 @@ bool ListWidget::Section::removeItem(UniversalMsgId universalId) {
return false;
}
base::optional<QRect> ListWidget::Section::findItemRect(
UniversalMsgId universalId) const {
if (auto it = _items.find(universalId); it != _items.end()) {
return findItemRect(it->second);
}
return base::none;
}
QRect ListWidget::Section::findItemRect(
not_null<ItemBase*> item) const {
auto position = item->position();
@ -222,6 +222,41 @@ QRect ListWidget::Section::findItemRect(
return QRect(left, top, _itemWidth, item->height());
}
auto ListWidget::Section::completeResult(
not_null<ItemBase*> item,
bool exact) const -> FoundItem {
return { item, findItemRect(item), exact };
}
auto ListWidget::Section::findItemByPoint(
QPoint point) const -> FoundItem {
Expects(!_items.empty());
auto itemIt = findItemAfterTop(point.y());
if (itemIt == _items.end()) {
--itemIt;
}
auto item = itemIt->second;
auto rect = findItemRect(item);
return { item, rect, rect.contains(point) };
}
auto ListWidget::Section::findItemNearId(
UniversalMsgId universalId) const -> FoundItem {
Expects(!_items.empty());
auto itemIt = base::lower_bound(
_items,
universalId,
[this](const auto &item, UniversalMsgId universalId) {
return (item.first > universalId);
});
if (itemIt == _items.end()) {
--itemIt;
}
auto item = itemIt->second;
auto exact = (GetUniversalId(item) == universalId);
return { item, findItemRect(item), exact };
}
auto ListWidget::Section::findItemAfterTop(
int top) -> Items::iterator {
return base::lower_bound(
@ -350,6 +385,30 @@ void ListWidget::Section::resizeToWidth(int newWidth) {
refreshHeight();
}
int ListWidget::Section::MinItemHeight(Type type, int width) {
auto &songSt = st::overviewFileLayout;
switch (type) {
case Type::Photo:
case Type::Video:
case Type::RoundFile: {
auto itemsLeft = st::infoMediaSkip;
auto itemsInRow = (width - itemsLeft)
/ (st::infoMediaMinGridSize + st::infoMediaSkip);
return (st::infoMediaMinGridSize + st::infoMediaSkip) / itemsInRow;
} break;
case Type::VoiceFile:
return songSt.songPadding.top() + songSt.songThumbSize + songSt.songPadding.bottom() + st::lineWidth;
case Type::File:
return songSt.filePadding.top() + songSt.fileThumbSize + songSt.filePadding.bottom() + st::lineWidth;
case Type::MusicFile:
return songSt.songPadding.top() + songSt.songThumbSize + songSt.songPadding.bottom();
case Type::Link:
return st::linksPhotoSize + st::linksMargin.top() + st::linksMargin.bottom() + st::linksBorder;
}
Unexpected("Type in ListWidget::Section::MinItemHeight()");
}
int ListWidget::Section::recountHeight() const {
auto result = headerHeight();
@ -403,7 +462,7 @@ ListWidget::ListWidget(
, _controller(controller)
, _peer(peer)
, _type(type)
, _slice(sliceKey()) {
, _slice(sliceKey(_universalAroundId)) {
start();
refreshViewer();
}
@ -460,9 +519,10 @@ void ListWidget::repaintItem(not_null<const HistoryItem*> item) {
void ListWidget::repaintItem(UniversalMsgId universalId) {
auto sectionIt = findSectionByItem(universalId);
if (sectionIt != _sections.end()) {
if (auto rect = sectionIt->findItemRect(universalId)) {
auto top = padding().top() + sectionIt->top();
rtlupdate(rect->translated(0, top));
auto item = sectionIt->findItemNearId(universalId);
if (item.exact) {
auto top = sectionIt->top();
rtlupdate(item.geometry.translated(0, top));
}
}
}
@ -478,30 +538,35 @@ void ListWidget::invalidatePaletteCache() {
}
}
SharedMediaMergedSlice::Key ListWidget::sliceKey() const {
auto universalId = _universalAroundId;
SharedMediaMergedSlice::Key ListWidget::sliceKey(
UniversalMsgId universalId) const {
using Key = SharedMediaMergedSlice::Key;
if (auto migrateFrom = _peer->migrateFrom()) {
return Key(_peer->id, migrateFrom->id, _type, universalId);
}
if (universalId < 0) {
// Convert back to plain id for non-migrated histories.
universalId += ServerMaxMsgId;
}
return Key(_peer->id, 0, _type, universalId);
}
void ListWidget::refreshViewer() {
_viewerLifetime.destroy();
SharedMediaMergedViewer(
sliceKey(),
countIdsLimit(),
countIdsLimit())
| rpl::start_with_next([this](SharedMediaMergedSlice &&slice) {
sliceKey(_universalAroundId),
_idsLimit,
_idsLimit)
| rpl::start_with_next([this](
SharedMediaMergedSlice &&slice) {
_slice = std::move(slice);
if (auto nearest = _slice.nearest(_universalAroundId)) {
_universalAroundId = *nearest;
}
refreshRows();
}, _viewerLifetime);
}
int ListWidget::countIdsLimit() const {
return kIdsLimit;
}
ItemBase *ListWidget::getLayout(const FullMsgId &itemId) {
auto universalId = GetUniversalId(itemId);
auto it = _layouts.find(universalId);
@ -541,7 +606,7 @@ std::unique_ptr<ItemBase> ListWidget::createLayout(
return nullptr;
};
auto &fileSt = st::overviewFileLayout;
auto &songSt = st::overviewFileLayout;
using namespace Layout;
switch (type) {
case Type::Photo:
@ -556,17 +621,17 @@ std::unique_ptr<ItemBase> ListWidget::createLayout(
return nullptr;
case Type::File:
if (auto file = getFile()) {
return std::make_unique<Document>(item, file, fileSt);
return std::make_unique<Document>(item, file, songSt);
}
return nullptr;
case Type::MusicFile:
if (auto file = getFile()) {
return std::make_unique<Document>(item, file, fileSt);
return std::make_unique<Document>(item, file, songSt);
}
return nullptr;
case Type::VoiceFile:
if (auto file = getFile()) {
return std::make_unique<Voice>(item, file, fileSt);
return std::make_unique<Voice>(item, file, songSt);
}
return nullptr;
case Type::Link:
@ -578,6 +643,8 @@ std::unique_ptr<ItemBase> ListWidget::createLayout(
}
void ListWidget::refreshRows() {
saveScrollState();
markLayoutsStale();
_sections.clear();
@ -600,6 +667,8 @@ void ListWidget::refreshRows() {
clearStaleLayouts();
resizeToWidth(width());
restoreScrollState();
}
void ListWidget::markLayoutsStale() {
@ -617,12 +686,114 @@ int ListWidget::resizeGetHeight(int newWidth) {
return recountHeight();
}
auto ListWidget::findItemByPoint(QPoint point) -> FoundItem {
Expects(!_sections.empty());
auto sectionIt = findSectionAfterTop(point.y());
if (sectionIt == _sections.end()) {
--sectionIt;
}
auto shift = QPoint(0, sectionIt->top());
return foundItemInSection(
sectionIt->findItemByPoint(point - shift),
*sectionIt);
}
auto ListWidget::foundItemInSection(
const FoundItem &item,
const Section &section) -> FoundItem {
return {
item.layout,
item.geometry.translated(0, section.top()),
item.exact };
}
void ListWidget::visibleTopBottomUpdated(
int visibleTop,
int visibleBottom) {
if (width() <= 0) {
auto visibleHeight = (visibleBottom - visibleTop);
if (width() <= 0 || visibleHeight <= 0 || _sections.empty() || _scrollTopId) {
return;
}
_visibleTop = visibleTop;
auto topItem = findItemByPoint({ 0, visibleTop });
auto bottomItem = findItemByPoint({ 0, visibleBottom });
auto preloadedHeight = kPreloadedScreensCountFull * visibleHeight;
auto minItemHeight = Section::MinItemHeight(_type, width());
auto preloadedCount = preloadedHeight / minItemHeight;
auto preloadIdsLimitMin = (preloadedCount / 2) + 1;
auto preloadIdsLimit = preloadIdsLimitMin
+ (visibleHeight / minItemHeight);
auto preloadBefore = kPreloadIfLessThanScreens * visibleHeight;
auto after = _slice.skippedAfter();
auto preloadTop = (visibleTop < preloadBefore);
auto topLoaded = after && (*after == 0);
auto before = _slice.skippedBefore();
auto preloadBottom = (height() - visibleBottom < preloadBefore);
auto bottomLoaded = before && (*before == 0);
auto minScreenDelta = kPreloadedScreensCount
- kPreloadIfLessThanScreens;
auto minUniversalIdDelta = (minScreenDelta * visibleHeight)
/ minItemHeight;
auto preloadAroundItem = [&](const FoundItem &item) {
auto preloadRequired = false;
auto universalId = GetUniversalId(item.layout);
if (!preloadRequired) {
preloadRequired = (_idsLimit < preloadIdsLimitMin);
}
if (!preloadRequired) {
auto delta = _slice.distance(
sliceKey(_universalAroundId),
sliceKey(universalId));
Assert(delta != base::none);
preloadRequired = (qAbs(*delta) >= minUniversalIdDelta);
}
if (preloadRequired) {
_idsLimit = preloadIdsLimit;
_universalAroundId = universalId;
refreshViewer();
}
};
if (preloadTop && !topLoaded) {
preloadAroundItem(topItem);
} else if (preloadBottom && !bottomLoaded) {
preloadAroundItem(bottomItem);
}
}
void ListWidget::saveScrollState() {
if (_sections.empty()) {
_scrollTopId = 0;
_scrollTopShift = 0;
return;
}
auto topItem = findItemByPoint({ 0, _visibleTop });
_scrollTopId = GetUniversalId(topItem.layout);
_scrollTopShift = _visibleTop - topItem.geometry.y();
}
void ListWidget::restoreScrollState() {
auto scrollTopId = base::take(_scrollTopId);
auto scrollTopShift = base::take(_scrollTopShift);
if (_sections.empty() || !scrollTopId) {
return;
}
auto sectionIt = findSectionByItem(scrollTopId);
if (sectionIt == _sections.end()) {
--sectionIt;
}
auto item = foundItemInSection(
sectionIt->findItemNearId(scrollTopId),
*sectionIt);
auto newVisibleTop = item.geometry.y() + scrollTopShift;
if (_visibleTop != newVisibleTop) {
_scrollToRequests.fire_copy(newVisibleTop);
}
}
QMargins ListWidget::padding() const {

View File

@ -56,6 +56,10 @@ public:
return _type;
}
rpl::producer<int> scrollToRequests() const {
return _scrollToRequests.events();
}
~ListWidget();
protected:
@ -77,6 +81,11 @@ private:
bool stale = false;
};
class Section;
struct FoundItem {
not_null<ItemBase*> layout;
QRect geometry;
bool exact = false;
};
void start();
int recountHeight();
@ -92,8 +101,8 @@ private:
void refreshViewer();
void invalidatePaletteCache();
void refreshRows();
int countIdsLimit() const;
SharedMediaMergedSlice::Key sliceKey() const;
SharedMediaMergedSlice::Key sliceKey(
UniversalMsgId universalId) const;
ItemBase *getLayout(const FullMsgId &itemId);
std::unique_ptr<ItemBase> createLayout(
const FullMsgId &itemId,
@ -109,17 +118,31 @@ private:
std::vector<Section>::const_iterator findSectionAfterBottom(
std::vector<Section>::const_iterator from,
int bottom) const;
FoundItem findItemByPoint(QPoint point);
FoundItem foundItemInSection(
const FoundItem &item,
const Section &section);
void saveScrollState();
void restoreScrollState();
not_null<Window::Controller*> _controller;
not_null<PeerData*> _peer;
Type _type = Type::Photo;
UniversalMsgId _universalAroundId = ServerMaxMsgId - 1;
static constexpr auto kMinimalIdsLimit = 16;
int _idsLimit = kMinimalIdsLimit;
SharedMediaMergedSlice _slice;
std::map<UniversalMsgId, CachedItem> _layouts;
std::vector<Section> _sections;
int _visibleTop = 0;
UniversalMsgId _scrollTopId = 0;
int _scrollTopShift = 0;
rpl::event_stream<int> _scrollToRequests;
rpl::lifetime _viewerLifetime;
};

View File

@ -54,6 +54,10 @@ Widget::Widget(
controller,
peer,
type));
_inner->scrollToRequests()
| rpl::start_with_next([this](int skip) {
scrollTo({ skip, -1 });
}, _inner->lifetime());
}
Section Widget::section() const {

View File

@ -188,7 +188,21 @@ public:
is_callable_v<OnNext, Value>
&& is_callable_v<OnError, Error>
&& is_callable_v<OnDone>>>
lifetime start(
void start(
OnNext &&next,
OnError &&error,
OnDone &&done,
lifetime &alive_while) &&;
template <
typename OnNext,
typename OnError,
typename OnDone,
typename = std::enable_if_t<
is_callable_v<OnNext, Value>
&& is_callable_v<OnError, Error>
&& is_callable_v<OnDone>>>
[[nodiscard]] lifetime start(
OnNext &&next,
OnError &&error,
OnDone &&done) &&;
@ -201,13 +215,32 @@ public:
is_callable_v<OnNext, Value>
&& is_callable_v<OnError, Error>
&& is_callable_v<OnDone>>>
lifetime start_copy(
void start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done,
lifetime &alive_while) const &;
template <
typename OnNext,
typename OnError,
typename OnDone,
typename = std::enable_if_t<
is_callable_v<OnNext, Value>
&& is_callable_v<OnError, Error>
&& is_callable_v<OnDone>>>
[[nodiscard]] lifetime start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done) const &;
template <typename Handlers>
lifetime start_existing(
void start_existing(
const consumer_type<Handlers> &consumer,
lifetime &alive_while) &&;
template <typename Handlers>
[[nodiscard]] lifetime start_existing(
const consumer_type<Handlers> &consumer) &&;
private:
@ -234,14 +267,17 @@ template <
typename OnError,
typename OnDone,
typename>
inline lifetime producer_base<Value, Error, Generator>::start(
inline void producer_base<Value, Error, Generator>::start(
OnNext &&next,
OnError &&error,
OnDone &&done) && {
return std::move(*this).start_existing(make_consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)));
OnDone &&done,
lifetime &alive_while) && {
return std::move(*this).start_existing(
make_consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)),
alive_while);
}
template <typename Value, typename Error, typename Generator>
@ -250,25 +286,77 @@ template <
typename OnError,
typename OnDone,
typename>
inline lifetime producer_base<Value, Error, Generator>::start_copy(
[[nodiscard]] inline lifetime producer_base<Value, Error, Generator>::start(
OnNext &&next,
OnError &&error,
OnDone &&done) && {
auto result = lifetime();
std::move(*this).start_existing(
make_consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)),
result);
return result;
}
template <typename Value, typename Error, typename Generator>
template <
typename OnNext,
typename OnError,
typename OnDone,
typename>
inline void producer_base<Value, Error, Generator>::start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done,
lifetime &alive_while) const & {
auto copy = *this;
return std::move(copy).start_existing(
make_consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)),
alive_while);
}
template <typename Value, typename Error, typename Generator>
template <
typename OnNext,
typename OnError,
typename OnDone,
typename>
[[nodiscard]] inline lifetime producer_base<Value, Error, Generator>::start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done) const & {
auto result = lifetime();
auto copy = *this;
return std::move(copy).start(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done));
std::move(copy).start_existing(
make_consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)),
result);
return result;
}
template <typename Value, typename Error, typename Generator>
template <typename Handlers>
inline lifetime producer_base<Value, Error, Generator>::start_existing(
inline void producer_base<Value, Error, Generator>::start_existing(
const consumer_type<Handlers> &consumer,
lifetime &alive_while) && {
alive_while.add([consumer] { consumer.terminate(); });
consumer.add_lifetime(std::move(_generator)(consumer));
}
template <typename Value, typename Error, typename Generator>
template <typename Handlers>
[[nodiscard]] inline lifetime producer_base<Value, Error, Generator>::start_existing(
const consumer_type<Handlers> &consumer) && {
if (consumer.add_lifetime(std::move(_generator)(consumer))) {
return [consumer] { consumer.terminate(); };
}
return lifetime();
auto result = lifetime();
std::move(*this).start_existing(consumer, result);
return result;
}
template <typename Value, typename Error>
@ -387,28 +475,52 @@ inline auto operator|(
namespace details {
struct with_none {
};
struct lifetime_with_none {
lifetime &alive_while;
};
template <typename OnNext>
struct with_next {
OnNext next;
};
template <typename OnNext>
struct lifetime_with_next {
lifetime &alive_while;
OnNext next;
};
template <typename OnError>
struct with_error {
OnError error;
};
template <typename OnError>
struct lifetime_with_error {
lifetime &alive_while;
OnError error;
};
template <typename OnDone>
struct with_done {
OnDone done;
};
template <typename OnDone>
struct lifetime_with_done {
lifetime &alive_while;
OnDone done;
};
template <typename OnNext, typename OnError>
struct with_next_error {
OnNext next;
OnError error;
};
template <typename OnNext, typename OnError>
struct lifetime_with_next_error {
lifetime &alive_while;
@ -416,6 +528,12 @@ struct lifetime_with_next_error {
OnError error;
};
template <typename OnError, typename OnDone>
struct with_error_done {
OnError error;
OnDone done;
};
template <typename OnError, typename OnDone>
struct lifetime_with_error_done {
lifetime &alive_while;
@ -423,6 +541,12 @@ struct lifetime_with_error_done {
OnDone done;
};
template <typename OnNext, typename OnDone>
struct with_next_done {
OnNext next;
OnDone done;
};
template <typename OnNext, typename OnDone>
struct lifetime_with_next_done {
lifetime &alive_while;
@ -430,6 +554,13 @@ struct lifetime_with_next_done {
OnDone done;
};
template <typename OnNext, typename OnError, typename OnDone>
struct with_next_error_done {
OnNext next;
OnError error;
OnDone done;
};
template <typename OnNext, typename OnError, typename OnDone>
struct lifetime_with_next_error_done {
lifetime &alive_while;
@ -440,29 +571,65 @@ struct lifetime_with_next_error_done {
} // namespace details
inline auto start()
-> details::with_none {
return {};
}
inline auto start(lifetime &alive_while)
-> details::lifetime_with_none {
return { alive_while };
}
template <typename OnNext>
inline auto start_with_next(OnNext &&next)
-> details::with_next<std::decay_t<OnNext>> {
return { std::forward<OnNext>(next) };
}
template <typename OnNext>
inline auto start_with_next(OnNext &&next, lifetime &alive_while)
-> details::lifetime_with_next<std::decay_t<OnNext>> {
return { alive_while, std::forward<OnNext>(next) };
}
template <typename OnError>
inline auto start_with_error(OnError &&error)
-> details::with_error<std::decay_t<OnError>> {
return { std::forward<OnError>(error) };
}
template <typename OnError>
inline auto start_with_error(OnError &&error, lifetime &alive_while)
-> details::lifetime_with_error<std::decay_t<OnError>> {
return { alive_while, std::forward<OnError>(error) };
}
template <typename OnDone>
inline auto start_with_done(OnDone &&done)
-> details::with_done<std::decay_t<OnDone>> {
return { std::forward<OnDone>(done) };
}
template <typename OnDone>
inline auto start_with_done(OnDone &&done, lifetime &alive_while)
-> details::lifetime_with_done<std::decay_t<OnDone>> {
return { alive_while, std::forward<OnDone>(done) };
}
template <typename OnNext, typename OnError>
inline auto start_with_next_error(
OnNext &&next,
OnError &&error)
-> details::with_next_error<
std::decay_t<OnNext>,
std::decay_t<OnError>> {
return {
std::forward<OnNext>(next),
std::forward<OnError>(error)
};
}
template <typename OnNext, typename OnError>
inline auto start_with_next_error(
OnNext &&next,
@ -478,6 +645,19 @@ inline auto start_with_next_error(
};
}
template <typename OnError, typename OnDone>
inline auto start_with_error_done(
OnError &&error,
OnDone &&done)
-> details::with_error_done<
std::decay_t<OnError>,
std::decay_t<OnDone>> {
return {
std::forward<OnError>(error),
std::forward<OnDone>(done)
};
}
template <typename OnError, typename OnDone>
inline auto start_with_error_done(
OnError &&error,
@ -493,6 +673,19 @@ inline auto start_with_error_done(
};
}
template <typename OnNext, typename OnDone>
inline auto start_with_next_done(
OnNext &&next,
OnDone &&done)
-> details::with_next_done<
std::decay_t<OnNext>,
std::decay_t<OnDone>> {
return {
std::forward<OnNext>(next),
std::forward<OnDone>(done)
};
}
template <typename OnNext, typename OnDone>
inline auto start_with_next_done(
OnNext &&next,
@ -508,6 +701,22 @@ inline auto start_with_next_done(
};
}
template <typename OnNext, typename OnError, typename OnDone>
inline auto start_with_next_error_done(
OnNext &&next,
OnError &&error,
OnDone &&done)
-> details::with_next_error_done<
std::decay_t<OnNext>,
std::decay_t<OnError>,
std::decay_t<OnDone>> {
return {
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)
};
}
template <typename OnNext, typename OnError, typename OnDone>
inline auto start_with_next_error_done(
OnNext &&next,
@ -528,15 +737,40 @@ inline auto start_with_next_error_done(
namespace details {
template <typename Value, typename Error, typename Generator>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_none &&handlers) {
return std::move(value).start(
[] {},
[] {},
[] {});
}
template <typename Value, typename Error, typename Generator>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_none &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
[] {},
[] {},
[] {}));
lifetime_with_none &&handlers) {
std::move(value).start(
[] {},
[] {},
[] {},
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnNext,
typename = std::enable_if_t<is_callable_v<OnNext, Value>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_next<OnNext> &&handlers) {
return std::move(value).start(
std::move(handlers.next),
[] {},
[] {});
}
template <
@ -547,12 +781,27 @@ template <
typename = std::enable_if_t<is_callable_v<OnNext, Value>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_next<OnNext> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
std::move(lifetime.next),
[] {},
[] {}));
lifetime_with_next<OnNext> &&handlers) {
std::move(value).start(
std::move(handlers.next),
[] {},
[] {},
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnError,
typename = std::enable_if_t<is_callable_v<OnError, Error>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_error<OnError> &&handlers) {
return std::move(value).start(
[] {},
std::move(handlers.error),
[] {});
}
template <
@ -563,12 +812,27 @@ template <
typename = std::enable_if_t<is_callable_v<OnError, Error>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_error<OnError> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
[] {},
std::move(lifetime.error),
[] {}));
lifetime_with_error<OnError> &&handlers) {
std::move(value).start(
[] {},
std::move(handlers.error),
[] {},
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnDone,
typename = std::enable_if_t<is_callable_v<OnDone>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_done<OnDone> &&handlers) {
return std::move(value).start(
[] {},
[] {},
std::move(handlers.done));
}
template <
@ -579,12 +843,30 @@ template <
typename = std::enable_if_t<is_callable_v<OnDone>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_done<OnDone> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
[] {},
[] {},
std::move(lifetime.done)));
lifetime_with_done<OnDone> &&handlers) {
std::move(value).start(
[] {},
[] {},
std::move(handlers.done),
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnNext,
typename OnError,
typename = std::enable_if_t<
is_callable_v<OnNext, Value> &&
is_callable_v<OnError, Error>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_next_error<OnNext, OnError> &&handlers) {
return std::move(value).start(
std::move(handlers.next),
std::move(handlers.error),
[] {});
}
template <
@ -598,12 +880,30 @@ template <
is_callable_v<OnError, Error>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_next_error<OnNext, OnError> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
std::move(lifetime.next),
std::move(lifetime.error),
[] {}));
lifetime_with_next_error<OnNext, OnError> &&handlers) {
std::move(value).start(
std::move(handlers.next),
std::move(handlers.error),
[] {},
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnError,
typename OnDone,
typename = std::enable_if_t<
is_callable_v<OnError, Error> &&
is_callable_v<OnDone>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_error_done<OnError, OnDone> &&handlers) {
return std::move(value).start(
[] {},
std::move(handlers.error),
std::move(handlers.done));
}
template <
@ -617,12 +917,30 @@ template <
is_callable_v<OnDone>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_error_done<OnError, OnDone> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
[] {},
std::move(lifetime.error),
std::move(lifetime.done)));
lifetime_with_error_done<OnError, OnDone> &&handlers) {
std::move(value).start(
[] {},
std::move(handlers.error),
std::move(handlers.done),
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnNext,
typename OnDone,
typename = std::enable_if_t<
is_callable_v<OnNext, Value> &&
is_callable_v<OnDone>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_next_done<OnNext, OnDone> &&handlers) {
return std::move(value).start(
std::move(handlers.next),
[] {},
std::move(handlers.done));
}
template <
@ -636,12 +954,35 @@ template <
is_callable_v<OnDone>>>
inline void operator|(
producer<Value, Error, Generator> &&value,
lifetime_with_next_done<OnNext, OnDone> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
std::move(lifetime.next),
[] {},
std::move(lifetime.done)));
lifetime_with_next_done<OnNext, OnDone> &&handlers) {
std::move(value).start(
std::move(handlers.next),
[] {},
std::move(handlers.done),
handlers.alive_while);
}
template <
typename Value,
typename Error,
typename Generator,
typename OnNext,
typename OnError,
typename OnDone,
typename = std::enable_if_t<
is_callable_v<OnNext, Value> &&
is_callable_v<OnError, Error> &&
is_callable_v<OnDone>>>
[[nodiscard]] inline lifetime operator|(
producer<Value, Error, Generator> &&value,
with_next_error_done<
OnNext,
OnError,
OnDone> &&handlers) {
return std::move(value).start(
std::move(handlers.next),
std::move(handlers.error),
std::move(handlers.done));
}
template <
@ -660,12 +1001,12 @@ inline void operator|(
lifetime_with_next_error_done<
OnNext,
OnError,
OnDone> &&lifetime) {
lifetime.alive_while.add(
std::move(value).start(
std::move(lifetime.next),
std::move(lifetime.error),
std::move(lifetime.done)));
OnDone> &&handlers) {
std::move(value).start(
std::move(handlers.next),
std::move(handlers.error),
std::move(handlers.done),
handlers.alive_while);
}
} // namespace details

View File

@ -52,7 +52,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
*destroyed = true;
});
{
make_producer<int>([=](auto &&consumer) {
auto alive = make_producer<int>([=](auto &&consumer) {
(void)destroyCaller;
consumer.put_next(1);
consumer.put_next(2);
@ -82,7 +82,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("producer error test") {
auto errorGenerated = std::make_shared<bool>(false);
{
make_producer<no_value, bool>([=](auto &&consumer) {
auto alive = make_producer<no_value, bool>([=](auto &&consumer) {
consumer.put_error(true);
return lifetime();
}).start([=](no_value) {
@ -104,14 +104,14 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
++*lifetimeEndCount;
};
});
lifetimes.add(testProducer.start_copy([=](no_value) {
testProducer.start_copy([=](no_value) {
}, [=](no_error) {
}, [=] {
}));
lifetimes.add(std::move(testProducer).start([=](no_value) {
}, lifetimes);
std::move(testProducer).start([=](no_value) {
}, [=](no_error) {
}, [=] {
}));
}, lifetimes);
}
REQUIRE(*lifetimeEndCount == 0);
}
@ -123,7 +123,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto lifetimeEndCount = std::make_shared<int>(0);
auto saved = lifetime();
{
saved = make_producer<int>([=](auto &&consumer) {
make_producer<int>([=](auto &&consumer) {
auto inner = make_producer<int>([=](auto &&consumer) {
consumer.put_next(1);
consumer.put_next(2);
@ -135,22 +135,22 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto result = lifetime([=] {
++*lifetimeEndCount;
});
result.add(inner.start_copy([=](int value) {
inner.start_copy([=](int value) {
consumer.put_next_copy(value);
}, [=](no_error) {
}, [=] {
}));
result.add(std::move(inner).start([=](int value) {
}, result);
std::move(inner).start([=](int value) {
consumer.put_next_copy(value);
}, [=](no_error) {
}, [=] {
}));
}, result);
return result;
}).start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, saved);
}
REQUIRE(*sum == 1 + 2 + 3 + 1 + 2 + 3);
REQUIRE(*lifetimeEndCount == 0);
@ -161,7 +161,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("tuple producer test") {
auto result = std::make_shared<int>(0);
{
make_producer<std::tuple<int, double>>([=](
auto alive = make_producer<std::tuple<int, double>>([=](
auto &&consumer) {
consumer.put_next(std::make_tuple(1, 2.));
return lifetime();
@ -183,11 +183,12 @@ TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
stream.fire(2);
stream.fire(3);
{
auto lifetime = stream.events().start([=, &stream](int value) {
auto saved = lifetime();
stream.events().start([=, &stream](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, saved);
stream.fire(11);
stream.fire(12);
stream.fire(13);
@ -205,29 +206,29 @@ TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
{
auto composite = lifetime();
composite = stream.events().start([=, &stream, &composite](int value) {
stream.events().start([=, &stream, &composite](int value) {
*sum += value;
composite.add(stream.events().start([=](int value) {
stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
}));
}, composite);
}, [=](no_error) {
}, [=] {
});
}, composite);
{
auto inner = lifetime();
inner = stream.events().start([=, &stream, &inner](int value) {
stream.events().start([=, &stream, &inner](int value) {
*sum += value;
inner.add(stream.events().start([=](int value) {
stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
}));
}, inner);
}, [=](no_error) {
}, [=] {
});
}, inner);
stream.fire(1);
stream.fire(2);
@ -256,29 +257,31 @@ TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
{
auto composite = lifetime();
composite = stream.events().start([=, &stream, &composite](int value) {
stream.events().start([=, &stream, &composite](int value) {
*sum += value;
composite = stream.events().start([=](int value) {
composite.destroy();
stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, composite);
}, [=](no_error) {
}, [=] {
});
}, composite);
{
auto inner = lifetime();
inner = stream.events().start([=, &stream, &inner](int value) {
stream.events().start([=, &stream, &inner](int value) {
*sum += value;
inner = stream.events().start([=](int value) {
inner.destroy();
stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, inner);
}, [=](no_error) {
}, [=] {
});
}, inner);
stream.fire(1);
stream.fire(2);
@ -306,11 +309,11 @@ TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
lifetime extended;
{
event_stream<int> stream;
extended = stream.events().start([=](int value) {
stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, extended);
stream.fire(1);
stream.fire(2);
stream.fire(3);