diff --git a/Telegram/SourceFiles/rpl/consumer.h b/Telegram/SourceFiles/rpl/consumer.h index 3766c9cd3c..8adfc500cf 100644 --- a/Telegram/SourceFiles/rpl/consumer.h +++ b/Telegram/SourceFiles/rpl/consumer.h @@ -63,7 +63,7 @@ public: virtual void put_error_copy(const Error &error) = 0; virtual void put_done() = 0; - void add_lifetime(lifetime &&lifetime); + bool add_lifetime(lifetime &&lifetime); template Type *make_state(Args&& ...args); @@ -135,20 +135,23 @@ private: }; template -inline void type_erased_handlers::add_lifetime( +inline bool type_erased_handlers::add_lifetime( lifetime &&lifetime) { if (_terminated) { lifetime.destroy(); - } else { - _lifetime.add(std::move(lifetime)); + return false; } + _lifetime.add(std::move(lifetime)); + return true; } template template inline Type *type_erased_handlers::make_state( Args&& ...args) { - Expects(!_terminated); + if (_terminated) { + return nullptr; + } return _lifetime.make_state(std::forward(args)...); } @@ -320,7 +323,7 @@ public: } void put_done() const; - void add_lifetime(lifetime &&lifetime) const; + bool add_lifetime(lifetime &&lifetime) const; template Type *make_state(Args&& ...args) const; @@ -447,22 +450,32 @@ inline void consumer_base::put_done() const { } template -inline void consumer_base::add_lifetime( +inline bool consumer_base::add_lifetime( lifetime &&lifetime) const { - if (_handlers) { - _handlers->add_lifetime(std::move(lifetime)); - } else { + if (!_handlers) { lifetime.destroy(); + return false; } + if (_handlers->add_lifetime(std::move(lifetime))) { + return true; + } + _handlers = nullptr; + return false; } template template inline Type *consumer_base::make_state( Args&& ...args) const { - Expects(_handlers != nullptr); - return _handlers->template make_state( - std::forward(args)...); + if (!_handlers) { + return nullptr; + } + if (auto result = _handlers->template make_state( + std::forward(args)...)) { + return result; + } + _handlers = nullptr; + return nullptr; } template diff --git a/Telegram/SourceFiles/rpl/flatten_latest.h b/Telegram/SourceFiles/rpl/flatten_latest.h index 921124f454..bce4a4e21b 100644 --- a/Telegram/SourceFiles/rpl/flatten_latest.h +++ b/Telegram/SourceFiles/rpl/flatten_latest.h @@ -39,11 +39,12 @@ public: return make_producer([ initial = std::move(initial) ](const auto &consumer) mutable { - auto state = std::make_shared(); + auto state = consumer.template make_state(); return std::move(initial).start( [consumer, state](producer &&inner) { state->finished = false; - state->alive = std::move(inner).start( + state->alive = lifetime(); + auto started = std::move(inner).start( [consumer](auto &&value) { consumer.put_next_forward(std::forward(value)); }, [consumer](auto &&error) { @@ -55,6 +56,9 @@ public: state->finished = true; } }); + if (started) { + state->alive = std::move(started); + } }, [consumer](auto &&error) { consumer.put_error_forward(std::forward(error)); }, [consumer, state] { diff --git a/Telegram/SourceFiles/rpl/operators_tests.cpp b/Telegram/SourceFiles/rpl/operators_tests.cpp index bd7320006b..b46b615830 100644 --- a/Telegram/SourceFiles/rpl/operators_tests.cpp +++ b/Telegram/SourceFiles/rpl/operators_tests.cpp @@ -238,16 +238,22 @@ TEST_CASE("basic operators tests", "[rpl::operators]") { auto sum = std::make_shared(""); { rpl::lifetime lifetime; - single(single(1) | then(single(2))) - | then(single(single(3) | then(single(4)))) - | then(single(single(5) | then(single(6)))) - | flatten_latest() - | map([](int value) { - return std::to_string(value); - }) - | start_with_next([=](std::string &&value) { - *sum += std::move(value) + ' '; - }, lifetime); + { + rpl::event_stream stream; + single(single(1) | then(single(2))) + | then(single(single(3) | then(single(4)))) + | then(single(single(5) | then(stream.events()))) + | flatten_latest() + | map([](int value) { + return std::to_string(value); + }) + | start_with_next_done([=](std::string &&value) { + *sum += std::move(value) + ' '; + }, [=] { + *sum += "done "; + }, lifetime); + stream.fire(6); + } single(single(1)) | then(single(single(2) | then(single(3)))) | then(single(single(4) | then(single(5)) | then(single(6)))) @@ -259,7 +265,7 @@ TEST_CASE("basic operators tests", "[rpl::operators]") { *sum += std::move(value) + ' '; }, lifetime); } - REQUIRE(*sum == "1 2 3 4 5 6 1 2 3 4 5 6 "); + REQUIRE(*sum == "1 2 3 4 5 6 done 1 2 3 4 5 6 "); } SECTION("combine vector test") { diff --git a/Telegram/SourceFiles/rpl/producer.h b/Telegram/SourceFiles/rpl/producer.h index a70c74eb25..7d7d99d1a6 100644 --- a/Telegram/SourceFiles/rpl/producer.h +++ b/Telegram/SourceFiles/rpl/producer.h @@ -265,8 +265,10 @@ template template inline lifetime producer_base::start_existing( const consumer_type &consumer) && { - consumer.add_lifetime(std::move(_generator)(consumer)); - return [consumer] { consumer.terminate(); }; + if (consumer.add_lifetime(std::move(_generator)(consumer))) { + return [consumer] { consumer.terminate(); }; + } + return lifetime(); } template