Use make_state for flatten_latest().

This commit is contained in:
John Preston 2017-10-02 12:14:07 +03:00
parent c4d33f9986
commit 525cde3498
4 changed files with 53 additions and 28 deletions

View File

@ -63,7 +63,7 @@ public:
virtual void put_error_copy(const Error &error) = 0; virtual void put_error_copy(const Error &error) = 0;
virtual void put_done() = 0; virtual void put_done() = 0;
void add_lifetime(lifetime &&lifetime); bool add_lifetime(lifetime &&lifetime);
template <typename Type, typename... Args> template <typename Type, typename... Args>
Type *make_state(Args&& ...args); Type *make_state(Args&& ...args);
@ -135,20 +135,23 @@ private:
}; };
template <typename Value, typename Error> template <typename Value, typename Error>
inline void type_erased_handlers<Value, Error>::add_lifetime( inline bool type_erased_handlers<Value, Error>::add_lifetime(
lifetime &&lifetime) { lifetime &&lifetime) {
if (_terminated) { if (_terminated) {
lifetime.destroy(); lifetime.destroy();
} else { return false;
_lifetime.add(std::move(lifetime));
} }
_lifetime.add(std::move(lifetime));
return true;
} }
template <typename Value, typename Error> template <typename Value, typename Error>
template <typename Type, typename... Args> template <typename Type, typename... Args>
inline Type *type_erased_handlers<Value, Error>::make_state( inline Type *type_erased_handlers<Value, Error>::make_state(
Args&& ...args) { Args&& ...args) {
Expects(!_terminated); if (_terminated) {
return nullptr;
}
return _lifetime.make_state<Type>(std::forward<Args>(args)...); return _lifetime.make_state<Type>(std::forward<Args>(args)...);
} }
@ -320,7 +323,7 @@ public:
} }
void put_done() const; void put_done() const;
void add_lifetime(lifetime &&lifetime) const; bool add_lifetime(lifetime &&lifetime) const;
template <typename Type, typename... Args> template <typename Type, typename... Args>
Type *make_state(Args&& ...args) const; Type *make_state(Args&& ...args) const;
@ -447,22 +450,32 @@ inline void consumer_base<Value, Error, Handlers>::put_done() const {
} }
template <typename Value, typename Error, typename Handlers> template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::add_lifetime( inline bool consumer_base<Value, Error, Handlers>::add_lifetime(
lifetime &&lifetime) const { lifetime &&lifetime) const {
if (_handlers) { if (!_handlers) {
_handlers->add_lifetime(std::move(lifetime));
} else {
lifetime.destroy(); lifetime.destroy();
return false;
} }
if (_handlers->add_lifetime(std::move(lifetime))) {
return true;
}
_handlers = nullptr;
return false;
} }
template <typename Value, typename Error, typename Handlers> template <typename Value, typename Error, typename Handlers>
template <typename Type, typename... Args> template <typename Type, typename... Args>
inline Type *consumer_base<Value, Error, Handlers>::make_state( inline Type *consumer_base<Value, Error, Handlers>::make_state(
Args&& ...args) const { Args&& ...args) const {
Expects(_handlers != nullptr); if (!_handlers) {
return _handlers->template make_state<Type>( return nullptr;
std::forward<Args>(args)...); }
if (auto result = _handlers->template make_state<Type>(
std::forward<Args>(args)...)) {
return result;
}
_handlers = nullptr;
return nullptr;
} }
template <typename Value, typename Error, typename Handlers> template <typename Value, typename Error, typename Handlers>

View File

@ -39,11 +39,12 @@ public:
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial) initial = std::move(initial)
](const auto &consumer) mutable { ](const auto &consumer) mutable {
auto state = std::make_shared<State>(); auto state = consumer.template make_state<State>();
return std::move(initial).start( return std::move(initial).start(
[consumer, state](producer<Value, Error> &&inner) { [consumer, state](producer<Value, Error> &&inner) {
state->finished = false; state->finished = false;
state->alive = std::move(inner).start( state->alive = lifetime();
auto started = std::move(inner).start(
[consumer](auto &&value) { [consumer](auto &&value) {
consumer.put_next_forward(std::forward<decltype(value)>(value)); consumer.put_next_forward(std::forward<decltype(value)>(value));
}, [consumer](auto &&error) { }, [consumer](auto &&error) {
@ -55,6 +56,9 @@ public:
state->finished = true; state->finished = true;
} }
}); });
if (started) {
state->alive = std::move(started);
}
}, [consumer](auto &&error) { }, [consumer](auto &&error) {
consumer.put_error_forward(std::forward<decltype(error)>(error)); consumer.put_error_forward(std::forward<decltype(error)>(error));
}, [consumer, state] { }, [consumer, state] {

View File

@ -238,16 +238,22 @@ TEST_CASE("basic operators tests", "[rpl::operators]") {
auto sum = std::make_shared<std::string>(""); auto sum = std::make_shared<std::string>("");
{ {
rpl::lifetime lifetime; rpl::lifetime lifetime;
{
rpl::event_stream<int> stream;
single(single(1) | then(single(2))) single(single(1) | then(single(2)))
| then(single(single(3) | then(single(4)))) | then(single(single(3) | then(single(4))))
| then(single(single(5) | then(single(6)))) | then(single(single(5) | then(stream.events())))
| flatten_latest() | flatten_latest()
| map([](int value) { | map([](int value) {
return std::to_string(value); return std::to_string(value);
}) })
| start_with_next([=](std::string &&value) { | start_with_next_done([=](std::string &&value) {
*sum += std::move(value) + ' '; *sum += std::move(value) + ' ';
}, [=] {
*sum += "done ";
}, lifetime); }, lifetime);
stream.fire(6);
}
single(single(1)) single(single(1))
| then(single(single(2) | then(single(3)))) | then(single(single(2) | then(single(3))))
| then(single(single(4) | then(single(5)) | then(single(6)))) | then(single(single(4) | then(single(5)) | then(single(6))))
@ -259,7 +265,7 @@ TEST_CASE("basic operators tests", "[rpl::operators]") {
*sum += std::move(value) + ' '; *sum += std::move(value) + ' ';
}, lifetime); }, lifetime);
} }
REQUIRE(*sum == "1 2 3 4 5 6 1 2 3 4 5 6 "); REQUIRE(*sum == "1 2 3 4 5 6 done 1 2 3 4 5 6 ");
} }
SECTION("combine vector test") { SECTION("combine vector test") {

View File

@ -265,9 +265,11 @@ template <typename Value, typename Error, typename Generator>
template <typename Handlers> template <typename Handlers>
inline lifetime producer_base<Value, Error, Generator>::start_existing( inline lifetime producer_base<Value, Error, Generator>::start_existing(
const consumer_type<Handlers> &consumer) && { const consumer_type<Handlers> &consumer) && {
consumer.add_lifetime(std::move(_generator)(consumer)); if (consumer.add_lifetime(std::move(_generator)(consumer))) {
return [consumer] { consumer.terminate(); }; return [consumer] { consumer.terminate(); };
} }
return lifetime();
}
template <typename Value, typename Error> template <typename Value, typename Error>
using producer_base_type_erased = producer_base< using producer_base_type_erased = producer_base<