Support and use not type-erased consumers.

This commit is contained in:
John Preston 2017-09-27 18:15:26 +03:00
parent fddcdf359b
commit 80d9938e96
18 changed files with 651 additions and 346 deletions

View File

@ -466,7 +466,7 @@ template <
typename = std::enable_if_t<!std::is_same_v<Type, void>>> typename = std::enable_if_t<!std::is_same_v<Type, void>>>
inline auto ObservableViewer(base::Observable<Type> &observable) { inline auto ObservableViewer(base::Observable<Type> &observable) {
return rpl::make_producer<Type>([&observable]( return rpl::make_producer<Type>([&observable](
const rpl::consumer<Type> &consumer) { const auto &consumer) {
auto lifetime = rpl::lifetime(); auto lifetime = rpl::lifetime();
lifetime.make_state<base::Subscription>( lifetime.make_state<base::Subscription>(
observable.add_subscription([consumer](auto &&update) { observable.add_subscription([consumer](auto &&update) {
@ -479,7 +479,7 @@ inline auto ObservableViewer(base::Observable<Type> &observable) {
inline auto ObservableViewer(base::Observable<void> &observable) { inline auto ObservableViewer(base::Observable<void> &observable) {
return rpl::make_producer<>([&observable]( return rpl::make_producer<>([&observable](
const rpl::consumer<> &consumer) { const auto &consumer) {
auto lifetime = rpl::lifetime(); auto lifetime = rpl::lifetime();
lifetime.make_state<base::Subscription>( lifetime.make_state<base::Subscription>(
observable.add_subscription([consumer]() { observable.add_subscription([consumer]() {

View File

@ -34,7 +34,7 @@ namespace Profile {
rpl::producer<Notify::PeerUpdate> PeerUpdateViewer( rpl::producer<Notify::PeerUpdate> PeerUpdateViewer(
Notify::PeerUpdate::Flags flags) { Notify::PeerUpdate::Flags flags) {
return [=](const rpl::consumer<Notify::PeerUpdate> &consumer) { return [=](const auto &consumer) {
auto lifetime = rpl::lifetime(); auto lifetime = rpl::lifetime();
lifetime.make_state<base::Subscription>( lifetime.make_state<base::Subscription>(
Notify::PeerUpdated().add_subscription({ flags, [=]( Notify::PeerUpdated().add_subscription({ flags, [=](

View File

@ -38,7 +38,7 @@ public:
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial), initial = std::move(initial),
method = std::move(_method) method = std::move(_method)
](const consumer<Value, Error> &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
[method = std::move(method), consumer](auto &&value) { [method = std::move(method), consumer](auto &&value) {
auto copy = method; auto copy = method;

View File

@ -107,7 +107,7 @@ inline void combine_subscribe(
const consumer_type &consumer, const consumer_type &consumer,
combine_state<Values...> *state, combine_state<Values...> *state,
std::index_sequence<I...>, std::index_sequence<I...>,
producer<Values, Errors, Generators> &&...producers) { std::tuple<producer<Values, Errors, Generators>...> &&saved) {
auto consume = { ( auto consume = { (
details::combine_subscribe_one< details::combine_subscribe_one<
I, I,
@ -116,10 +116,53 @@ inline void combine_subscribe(
>( >(
consumer, consumer,
state state
).subscribe(std::move(producers)), 0)... }; ).subscribe(std::get<I>(std::move(saved))), 0)... };
(void)consume; (void)consume;
} }
template <typename ...Producers>
class combine_implementation_helper;
template <typename ...Producers>
combine_implementation_helper<std::decay_t<Producers>...>
make_combine_implementation_helper(Producers &&...producers) {
return combine_implementation_helper<std::decay_t<Producers>...>(
std::forward<Producers>(producers)...);
}
template <
typename ...Values,
typename ...Errors,
typename ...Generators>
class combine_implementation_helper<producer<Values, Errors, Generators>...> {
public:
using CombinedValue = std::tuple<Values...>;
using CombinedError = details::normalized_variant_t<Errors...>;
combine_implementation_helper(
producer<Values, Errors, Generators> &&...producers)
: _saved(std::make_tuple(std::move(producers)...)) {
}
template <typename Handlers>
lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
auto state = consumer.template make_state<
details::combine_state<Values...>>();
constexpr auto kArity = sizeof...(Values);
details::combine_subscribe(
consumer,
state,
std::make_index_sequence<kArity>(),
std::move(_saved));
return lifetime();
}
private:
std::tuple<producer<Values, Errors, Generators>...> _saved;
};
template < template <
typename ...Values, typename ...Values,
typename ...Errors, typename ...Errors,
@ -128,26 +171,9 @@ inline auto combine_implementation(
producer<Values, Errors, Generators> &&...producers) { producer<Values, Errors, Generators> &&...producers) {
using CombinedValue = std::tuple<Values...>; using CombinedValue = std::tuple<Values...>;
using CombinedError = details::normalized_variant_t<Errors...>; using CombinedError = details::normalized_variant_t<Errors...>;
using consumer_type = consumer<CombinedValue, CombinedError>;
auto result = [](
const consumer_type &consumer,
producer<Values, Errors, Generators> &...producers) {
auto state = consumer.template make_state<
details::combine_state<Values...>>();
constexpr auto kArity = sizeof...(Values); return make_producer<CombinedValue, CombinedError>(
details::combine_subscribe( make_combine_implementation_helper(std::move(producers)...));
consumer,
state,
std::make_index_sequence<kArity>(),
std::move(producers)...);
return lifetime();
};
return make_producer<CombinedValue, CombinedError>(std::bind(
result,
std::placeholders::_1,
std::move(producers)...));
} }
template <typename ...Args> template <typename ...Args>
@ -290,10 +316,9 @@ template <typename Value, typename Error, typename Generator>
inline auto combine( inline auto combine(
std::vector<producer<Value, Error, Generator>> &&producers) { std::vector<producer<Value, Error, Generator>> &&producers) {
using state_type = details::combine_vector_state<Value>; using state_type = details::combine_vector_state<Value>;
using consumer_type = consumer<std::vector<Value>, Error>;
return make_producer<std::vector<Value>, Error>([ return make_producer<std::vector<Value>, Error>([
producers = std::move(producers) producers = std::move(producers)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
auto count = producers.size(); auto count = producers.size();
auto state = consumer.template make_state<state_type>(); auto state = consumer.template make_state<state_type>();
state->accumulated.resize(count); state->accumulated.resize(count);

View File

@ -31,12 +31,9 @@ public:
template <typename Value, typename Error, typename Generator> template <typename Value, typename Error, typename Generator>
auto operator()( auto operator()(
producer<Value, Error, Generator> &&initial) const { producer<Value, Error, Generator> &&initial) const {
using consumer_type = consumer<
std::tuple<Value, Value>,
Error>;
return make_producer<std::tuple<Value, Value>, Error>([ return make_producer<std::tuple<Value, Value>, Error>([
initial = std::move(initial) initial = std::move(initial)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
auto previous = consumer.template make_state< auto previous = consumer.template make_state<
base::optional<Value> base::optional<Value>
>(); >();
@ -74,13 +71,10 @@ public:
template <typename Value, typename Error, typename Generator> template <typename Value, typename Error, typename Generator>
auto operator()(producer<Value, Error, Generator> &&initial) { auto operator()(producer<Value, Error, Generator> &&initial) {
using consumer_type = consumer<
std::tuple<Value, Value>,
Error>;
return make_producer<std::tuple<Value, Value>, Error>([ return make_producer<std::tuple<Value, Value>, Error>([
initial = std::move(initial), initial = std::move(initial),
value = Value(std::move(_value)) value = Value(std::move(_value))
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
auto previous = consumer.template make_state<Value>( auto previous = consumer.template make_state<Value>(
std::move(value)); std::move(value));
return std::move(initial).start( return std::move(initial).start(

View File

@ -26,8 +26,7 @@ namespace rpl {
template <typename Value = empty_value, typename Error = no_error> template <typename Value = empty_value, typename Error = no_error>
inline auto complete() { inline auto complete() {
return make_producer<Value, Error>([]( return make_producer<Value, Error>([](const auto &consumer) {
const consumer<Value, Error> &consumer) {
consumer.put_done(); consumer.put_done();
return lifetime(); return lifetime();
}); });

View File

@ -25,6 +25,241 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include <rpl/details/callable.h> #include <rpl/details/callable.h>
namespace rpl { namespace rpl {
namespace details {
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
class consumer_handlers;
template <typename Value, typename Error>
class type_erased_handlers {
public:
template <
typename OnNext,
typename OnError,
typename OnDone>
static std::shared_ptr<type_erased_handlers> create(
OnNext &&next,
OnError &&error,
OnDone &&done) {
return std::make_shared<consumer_handlers<
Value,
Error,
std::decay_t<OnNext>,
std::decay_t<OnError>,
std::decay_t<OnDone>>>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done));
}
virtual bool put_next(Value &&value) = 0;
virtual bool put_next_copy(const Value &value) = 0;
virtual void put_error(Error &&error) = 0;
virtual void put_error_copy(const Error &error) = 0;
virtual void put_done() = 0;
void add_lifetime(lifetime &&lifetime);
template <typename Type, typename... Args>
Type *make_state(Args&& ...args);
void terminate();
protected:
lifetime _lifetime;
bool _terminated = false;
};
template <typename Handlers>
struct is_type_erased_handlers
: std::false_type {
};
template <typename Value, typename Error>
struct is_type_erased_handlers<type_erased_handlers<Value, Error>>
: std::true_type {
};
template <typename Value, typename Error, typename OnNext, typename OnError, typename OnDone>
class consumer_handlers final
: public type_erased_handlers<Value, Error> {
public:
template <
typename OnNextOther,
typename OnErrorOther,
typename OnDoneOther>
consumer_handlers(
OnNextOther &&next,
OnErrorOther &&error,
OnDoneOther &&done)
: _next(std::forward<OnNextOther>(next))
, _error(std::forward<OnErrorOther>(error))
, _done(std::forward<OnDoneOther>(done)) {
}
template <
typename OnNextOther,
typename OnErrorOther,
typename OnDoneOther>
static std::shared_ptr<consumer_handlers> create(
OnNextOther &&next,
OnErrorOther &&error,
OnDoneOther &&done) {
return std::make_shared<consumer_handlers>(
std::forward<OnNextOther>(next),
std::forward<OnErrorOther>(error),
std::forward<OnDoneOther>(done));
}
bool put_next(Value &&value) final override;
bool put_next_copy(const Value &value) final override;
void put_error(Error &&error) final override;
void put_error_copy(const Error &error) final override;
void put_done() final override;
private:
OnNext _next;
OnError _error;
OnDone _done;
};
template <typename Value, typename Error>
inline void type_erased_handlers<Value, Error>::add_lifetime(
lifetime &&lifetime) {
if (_terminated) {
lifetime.destroy();
} else {
_lifetime.add(std::move(lifetime));
}
}
template <typename Value, typename Error>
template <typename Type, typename... Args>
inline Type *type_erased_handlers<Value, Error>::make_state(
Args&& ...args) {
Expects(!_terminated);
return _lifetime.make_state<Type>(std::forward<Args>(args)...);
}
template <typename Value, typename Error>
inline void type_erased_handlers<Value, Error>::terminate() {
if (!_terminated) {
_terminated = true;
base::take(_lifetime).destroy();
}
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
bool consumer_handlers<
Value,
Error,
OnNext,
OnError,
OnDone
>::put_next(Value &&value) {
if (this->_terminated) {
return false;
}
auto handler = this->_next;
details::callable_invoke(std::move(handler), std::move(value));
return true;
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
bool consumer_handlers<
Value,
Error,
OnNext,
OnError,
OnDone
>::put_next_copy(const Value &value) {
if (this->_terminated) {
return false;
}
auto handler = this->_next;
details::const_ref_call_invoke(std::move(handler), value);
return true;
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
void consumer_handlers<
Value,
Error,
OnNext,
OnError,
OnDone
>::put_error(Error &&error) {
if (!this->_terminated) {
details::callable_invoke(
std::move(this->_error),
std::move(error));
this->terminate();
}
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
void consumer_handlers<
Value,
Error,
OnNext,
OnError,
OnDone
>::put_error_copy(const Error &error) {
if (!this->_terminated) {
details::const_ref_call_invoke(
std::move(this->_error),
error);
this->terminate();
}
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
void consumer_handlers<
Value,
Error,
OnNext,
OnError,
OnDone
>::put_done() {
if (!this->_terminated) {
std::move(this->_done)();
this->terminate();
}
}
} // namespace details
struct no_value { struct no_value {
no_value() = delete; no_value() = delete;
@ -40,19 +275,24 @@ struct empty_value {
struct empty_error { struct empty_error {
}; };
template <typename Value = empty_value, typename Error = no_error> template <
class consumer { typename Value = empty_value,
typename Error = no_error,
typename Handlers = details::type_erased_handlers<Value, Error>>
class consumer;
namespace details {
template <typename Value, typename Error, typename Handlers>
class consumer_base {
using is_type_erased = is_type_erased_handlers<Handlers>;
public: public:
template < template <
typename OnNext, typename OnNext,
typename OnError, typename OnError,
typename OnDone, typename OnDone>
typename = std::enable_if_t< consumer_base(
details::is_callable_v<OnNext, Value>>,
typename = decltype(std::declval<OnError>()(
std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
consumer(
OnNext &&next, OnNext &&next,
OnError &&error, OnError &&error,
OnDone &&done); OnDone &&done);
@ -82,104 +322,322 @@ public:
void terminate() const; void terminate() const;
bool operator==(const consumer &other) const { const details::type_erased_handlers<Value, Error> *comparable() const {
return _instance == other._instance; return _handlers.get();
}
bool operator!=(const consumer &other) const {
return !(*this == other);
}
bool operator<(const consumer &other) const {
return _instance < other._instance;
}
bool operator>(const consumer &other) const {
return other < *this;
}
bool operator<=(const consumer &other) const {
return !(other < *this);
}
bool operator>=(const consumer &other) const {
return !(*this < other);
} }
private: private:
class abstract_instance;
template <typename OnNext, typename OnError, typename OnDone>
class instance;
template <typename OnNext, typename OnError, typename OnDone>
std::shared_ptr<abstract_instance> ConstructInstance(
OnNext &&next,
OnError &&error,
OnDone &&done);
mutable std::shared_ptr<abstract_instance> _instance;
};
template <typename Value, typename Error>
class consumer<Value, Error>::abstract_instance {
public:
virtual bool put_next(Value &&value) = 0;
virtual bool put_next_copy(const Value &value) = 0;
virtual void put_error(Error &&error) = 0;
virtual void put_error_copy(const Error &error) = 0;
virtual void put_done() = 0;
void add_lifetime(lifetime &&lifetime);
template <typename Type, typename... Args>
Type *make_state(Args&& ...args);
void terminate();
protected:
lifetime _lifetime;
bool _terminated = false;
};
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
class consumer<Value, Error>::instance
: public consumer<Value, Error>::abstract_instance {
public:
template < template <
typename OnNextImpl, typename OtherHandlers,
typename OnErrorImpl, typename = std::enable_if_t<
typename OnDoneImpl> std::is_base_of_v<Handlers, OtherHandlers>>>
instance( consumer_base(const std::shared_ptr<OtherHandlers> &handlers)
OnNextImpl &&next, : _handlers(handlers) {
OnErrorImpl &&error,
OnDoneImpl &&done)
: _next(std::forward<OnNextImpl>(next))
, _error(std::forward<OnErrorImpl>(error))
, _done(std::forward<OnDoneImpl>(done)) {
} }
bool put_next(Value &&value) override; template <
bool put_next_copy(const Value &value) override; typename OtherHandlers,
void put_error(Error &&error) override; typename = std::enable_if_t<
void put_error_copy(const Error &error) override; std::is_base_of_v<Handlers, OtherHandlers>>>
void put_done() override; consumer_base(std::shared_ptr<OtherHandlers> &&handlers)
: _handlers(std::move(handlers)) {
}
private: mutable std::shared_ptr<Handlers> _handlers;
OnNext _next;
OnError _error; bool handlers_put_next(Value &&value, std::true_type) const {
OnDone _done; return _handlers->put_next(std::move(value));
}
bool handlers_put_next_copy(
const Value &value,
std::true_type) const {
return _handlers->put_next_copy(value);
}
void handlers_put_error(Error &&error, std::true_type) const {
return std::exchange(_handlers, nullptr)->put_error(std::move(error));
}
void handlers_put_error_copy(
const Error &error,
std::true_type) const {
return std::exchange(_handlers, nullptr)->put_error_copy(error);
}
void handlers_put_done(std::true_type) const {
return std::exchange(_handlers, nullptr)->put_done();
}
bool handlers_put_next(Value &&value, std::false_type) const {
return _handlers->Handlers::put_next(std::move(value));
}
bool handlers_put_next_copy(
const Value &value,
std::false_type) const {
return _handlers->Handlers::put_next_copy(value);
}
void handlers_put_error(Error &&error, std::false_type) const {
return std::exchange(_handlers, nullptr)->Handlers::put_error(std::move(error));
}
void handlers_put_error_copy(
const Error &error,
std::false_type) const {
return std::exchange(_handlers, nullptr)->Handlers::put_error_copy(error);
}
void handlers_put_done(std::false_type) const {
return std::exchange(_handlers, nullptr)->Handlers::put_done();
}
template <
typename OtherValue,
typename OtherError,
typename OtherHandlers>
friend class ::rpl::consumer;
};
template <typename Value, typename Error, typename Handlers>
template <typename OnNext, typename OnError, typename OnDone>
inline consumer_base<Value, Error, Handlers>::consumer_base(
OnNext &&next,
OnError &&error,
OnDone &&done) : _handlers(Handlers::create(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done))) {
}
template <typename Value, typename Error, typename Handlers>
inline bool consumer_base<Value, Error, Handlers>::put_next(
Value &&value) const {
if (_handlers) {
if (handlers_put_next(std::move(value), is_type_erased())) {
return true;
}
_handlers = nullptr;
}
return false;
}
template <typename Value, typename Error, typename Handlers>
inline bool consumer_base<Value, Error, Handlers>::put_next_copy(
const Value &value) const {
if (_handlers) {
if (handlers_put_next_copy(value, is_type_erased())) {
return true;
}
_handlers = nullptr;
}
return false;
}
template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::put_error(
Error &&error) const {
if (_handlers) {
handlers_put_error(std::move(error), is_type_erased());
}
}
template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::put_error_copy(
const Error &error) const {
if (_handlers) {
handlers_put_error_copy(error, is_type_erased());
}
}
template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::put_done() const {
if (_handlers) {
handlers_put_done(is_type_erased());
}
}
template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::add_lifetime(
lifetime &&lifetime) const {
if (_handlers) {
_handlers->add_lifetime(std::move(lifetime));
} else {
lifetime.destroy();
}
}
template <typename Value, typename Error, typename Handlers>
template <typename Type, typename... Args>
inline Type *consumer_base<Value, Error, Handlers>::make_state(
Args&& ...args) const {
Expects(_handlers != nullptr);
return _handlers->template make_state<Type>(
std::forward<Args>(args)...);
}
template <typename Value, typename Error, typename Handlers>
inline void consumer_base<Value, Error, Handlers>::terminate() const {
if (_handlers) {
std::exchange(_handlers, nullptr)->terminate();
}
}
template <typename Value, typename Error>
using consumer_base_type_erased = consumer_base<
Value,
Error,
details::type_erased_handlers<Value, Error>>;
template <typename Value, typename Error, typename Handlers>
constexpr bool is_specific_handlers_v = !std::is_same_v<
details::type_erased_handlers<Value, Error>,
Handlers
> && std::is_base_of_v<
details::type_erased_handlers<Value, Error>,
Handlers
>;
} // namespace details
template <typename Value, typename Error, typename Handlers>
class consumer final
: public details::consumer_base<Value, Error, Handlers> {
using parent_type = details::consumer_base<
Value,
Error,
Handlers>;
public:
using parent_type::parent_type;
}; };
template <typename Value, typename Error> template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone> class consumer<Value, Error, details::type_erased_handlers<Value, Error>> final
inline std::shared_ptr< : public details::consumer_base_type_erased<Value, Error> {
typename consumer<Value, Error>::abstract_instance> using parent_type = details::consumer_base_type_erased<
consumer<Value, Error>::ConstructInstance( Value,
Error>;
public:
using parent_type::parent_type;
template <
typename Handlers,
typename = std::enable_if_t<
details::is_specific_handlers_v<Value, Error, Handlers>>>
consumer(const details::consumer_base<Value, Error, Handlers> &other)
: parent_type(other._handlers) {
}
template <
typename Handlers,
typename = std::enable_if_t<
details::is_specific_handlers_v<Value, Error, Handlers>>>
consumer(details::consumer_base<Value, Error, Handlers> &&other)
: parent_type(std::move(other._handlers)) {
}
template <
typename Handlers,
typename = std::enable_if_t<
details::is_specific_handlers_v<Value, Error, Handlers>>>
consumer &operator=(
const details::consumer_base<Value, Error, Handlers> &other) {
_handlers = other._handlers;
return *this;
}
template <
typename Handlers,
typename = std::enable_if_t<
details::is_specific_handlers_v<Value, Error, Handlers>>>
consumer &operator=(
details::consumer_base<Value, Error, Handlers> &&other) {
_handlers = std::move(other._handlers);
return *this;
}
};
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator==(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return a.comparable() == b.comparable();
}
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator<(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return a.comparable() < b.comparable();
}
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator!=(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return !(a == b);
}
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator>(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return b < a;
}
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator<=(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return !(b < a);
}
template <
typename Value,
typename Error,
typename Handlers1,
typename Handlers2>
inline bool operator>=(
const consumer<Value, Error, Handlers1> &a,
const consumer<Value, Error, Handlers2> &b) {
return !(a < b);
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone,
typename = std::enable_if_t<
details::is_callable_v<OnNext, Value> &&
details::is_callable_v<OnError, Error> &&
details::is_callable_v<OnDone>>>
inline auto make_consumer(
OnNext &&next, OnNext &&next,
OnError &&error, OnError &&error,
OnDone &&done) { OnDone &&done) {
return std::make_shared<instance< return consumer<Value, Error, details::consumer_handlers<
Value,
Error,
std::decay_t<OnNext>, std::decay_t<OnNext>,
std::decay_t<OnError>, std::decay_t<OnError>,
std::decay_t<OnDone>>>( std::decay_t<OnDone>>>(
@ -188,173 +646,4 @@ consumer<Value, Error>::ConstructInstance(
std::forward<OnDone>(done)); std::forward<OnDone>(done));
} }
template <typename Value, typename Error>
template <
typename OnNext,
typename OnError,
typename OnDone,
typename,
typename,
typename>
inline consumer<Value, Error>::consumer(
OnNext &&next,
OnError &&error,
OnDone &&done) : _instance(ConstructInstance(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done))) {
}
template <typename Value, typename Error>
inline bool consumer<Value, Error>::put_next(Value &&value) const {
if (_instance) {
if (_instance->put_next(std::move(value))) {
return true;
}
_instance = nullptr;
}
return false;
}
template <typename Value, typename Error>
inline bool consumer<Value, Error>::put_next_copy(
const Value &value) const {
if (_instance) {
if (_instance->put_next_copy(value)) {
return true;
}
_instance = nullptr;
}
return false;
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::put_error(Error &&error) const {
if (_instance) {
std::exchange(_instance, nullptr)->put_error(
std::move(error));
}
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::put_error_copy(
const Error &error) const {
if (_instance) {
std::exchange(_instance, nullptr)->put_error_copy(error);
}
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::put_done() const {
if (_instance) {
std::exchange(_instance, nullptr)->put_done();
}
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::add_lifetime(lifetime &&lifetime) const {
if (_instance) {
_instance->add_lifetime(std::move(lifetime));
} else {
lifetime.destroy();
}
}
template <typename Value, typename Error>
template <typename Type, typename... Args>
inline Type *consumer<Value, Error>::make_state(Args&& ...args) const {
Expects(_instance != nullptr);
return _instance->template make_state<Type>(std::forward<Args>(args)...);
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::terminate() const {
if (_instance) {
std::exchange(_instance, nullptr)->terminate();
}
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::abstract_instance::add_lifetime(
lifetime &&lifetime) {
if (_terminated) {
lifetime.destroy();
} else {
_lifetime.add(std::move(lifetime));
}
}
template <typename Value, typename Error>
template <typename Type, typename... Args>
inline Type *consumer<Value, Error>::abstract_instance::make_state(
Args&& ...args) {
Expects(!_terminated);
return _lifetime.make_state<Type>(std::forward<Args>(args)...);
}
template <typename Value, typename Error>
inline void consumer<Value, Error>::abstract_instance::terminate() {
if (!_terminated) {
_terminated = true;
base::take(_lifetime).destroy();
}
}
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
bool consumer<Value, Error>::instance<OnNext, OnError, OnDone>::put_next(
Value &&value) {
if (this->_terminated) {
return false;
}
auto handler = this->_next;
details::callable_invoke(std::move(handler), std::move(value));
return true;
}
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
bool consumer<Value, Error>::instance<OnNext, OnError, OnDone>::put_next_copy(
const Value &value) {
if (this->_terminated) {
return false;
}
auto handler = this->_next;
details::const_ref_call_invoke(std::move(handler), value);
return true;
}
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
void consumer<Value, Error>::instance<OnNext, OnError, OnDone>::put_error(
Error &&error) {
if (!this->_terminated) {
details::callable_invoke(
std::move(this->_error),
std::move(error));
this->terminate();
}
}
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
void consumer<Value, Error>::instance<OnNext, OnError, OnDone>::put_error_copy(
const Error &error) {
if (!this->_terminated) {
details::const_ref_call_invoke(
std::move(this->_error),
error);
this->terminate();
}
}
template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone>
void consumer<Value, Error>::instance<OnNext, OnError, OnDone>::put_done() {
if (!this->_terminated) {
std::move(this->_done)();
this->terminate();
}
}
} // namespace rpl } // namespace rpl

View File

@ -31,7 +31,7 @@ template <
inline auto deferred(Creator &&creator) { inline auto deferred(Creator &&creator) {
return make_producer<Value, Error>([ return make_producer<Value, Error>([
creator = std::forward<Creator>(creator) creator = std::forward<Creator>(creator)
](const consumer<Value, Error> &consumer) mutable { ](const auto &consumer) mutable {
return std::move(creator)().start_existing(consumer); return std::move(creator)().start_existing(consumer);
}); });
} }

View File

@ -33,7 +33,7 @@ public:
producer<Value, Error, Generator> &&initial) const { producer<Value, Error, Generator> &&initial) const {
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial) initial = std::move(initial)
](const consumer<Value, Error> &consumer) mutable { ](const auto &consumer) mutable {
auto previous = consumer.template make_state< auto previous = consumer.template make_state<
base::optional<Value> base::optional<Value>
>(); >();

View File

@ -47,9 +47,8 @@ public:
return fire_forward(value); return fire_forward(value);
} }
auto events() const { auto events() const {
using consumer_type = consumer<Value, no_error>;
return make_producer<Value>([weak = weak()]( return make_producer<Value>([weak = weak()](
const consumer_type &consumer) { const auto &consumer) {
if (auto strong = weak.lock()) { if (auto strong = weak.lock()) {
auto result = [weak, consumer] { auto result = [weak, consumer] {
if (auto strong = weak.lock()) { if (auto strong = weak.lock()) {

View File

@ -26,10 +26,9 @@ namespace rpl {
template <typename Value, typename Error> template <typename Value, typename Error>
inline auto fail(Error &&error) { inline auto fail(Error &&error) {
using consumer_t = consumer<Value, std::decay_t<Error>>;
return make_producer<Value, std::decay_t<Error>>([ return make_producer<Value, std::decay_t<Error>>([
error = std::forward<Error>(error) error = std::forward<Error>(error)
](const consumer_t &consumer) mutable { ](const auto &consumer) mutable {
consumer.put_error(std::move(error)); consumer.put_error(std::move(error));
return lifetime(); return lifetime();
}); });

View File

@ -43,11 +43,10 @@ public:
typename = std::enable_if_t< typename = std::enable_if_t<
details::is_callable_v<Predicate, Value>>> details::is_callable_v<Predicate, Value>>>
auto operator()(producer<Value, Error, Generator> &&initial) { auto operator()(producer<Value, Error, Generator> &&initial) {
using consumer_type = consumer<Value, Error>;
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial), initial = std::move(initial),
predicate = std::move(_predicate) predicate = std::move(_predicate)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
[ [
consumer, consumer,
@ -126,10 +125,9 @@ public:
base::optional<Value>, base::optional<Value>,
Error, Error,
Generator> &&initial) const { Generator> &&initial) const {
using consumer_type = consumer<Value, Error>;
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial) initial = std::move(initial)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
[consumer](auto &&value) { [consumer](auto &&value) {
if (value) { if (value) {

View File

@ -36,10 +36,9 @@ public:
producer<Value, Error, Generator>, producer<Value, Error, Generator>,
Error, Error,
MetaGenerator> &&initial) const { MetaGenerator> &&initial) const {
using consumer_type = consumer<Value, Error>;
return make_producer<Value, Error>([ return make_producer<Value, Error>([
initial = std::move(initial) initial = std::move(initial)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
auto state = std::make_shared<State>(); auto state = std::make_shared<State>();
return std::move(initial).start( return std::move(initial).start(
[consumer, state](producer<Value, Error> &&inner) { [consumer, state](producer<Value, Error> &&inner) {

View File

@ -28,12 +28,13 @@ namespace details {
template < template <
typename Transform, typename Transform,
typename NewValue, typename NewValue,
typename Error> typename Error,
typename Handlers>
class map_transform_helper { class map_transform_helper {
public: public:
map_transform_helper( map_transform_helper(
Transform &&transform, Transform &&transform,
const consumer<NewValue, Error> &consumer) const consumer<NewValue, Error, Handlers> &consumer)
: _consumer(consumer) : _consumer(consumer)
, _transform(std::move(transform)) { , _transform(std::move(transform)) {
} }
@ -55,7 +56,7 @@ public:
} }
private: private:
consumer<NewValue, Error> _consumer; consumer<NewValue, Error, Handlers> _consumer;
Transform _transform; Transform _transform;
}; };
@ -64,12 +65,13 @@ template <
typename Transform, typename Transform,
typename NewValue, typename NewValue,
typename Error, typename Error,
typename Handlers,
typename = std::enable_if_t< typename = std::enable_if_t<
std::is_rvalue_reference_v<Transform&&>>> std::is_rvalue_reference_v<Transform&&>>>
inline map_transform_helper<Transform, NewValue, Error> inline map_transform_helper<Transform, NewValue, Error, Handlers>
map_transform( map_transform(
Transform &&transform, Transform &&transform,
const consumer<NewValue, Error> &consumer) { const consumer<NewValue, Error, Handlers> &consumer) {
return { std::move(transform), consumer }; return { std::move(transform), consumer };
} }
@ -92,7 +94,7 @@ public:
return make_producer<NewValue, Error>([ return make_producer<NewValue, Error>([
initial = std::move(initial), initial = std::move(initial),
transform = std::move(_transform) transform = std::move(_transform)
](const consumer<NewValue, Error> &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
map_transform( map_transform(
std::move(transform), std::move(transform),
@ -125,12 +127,13 @@ namespace details {
template < template <
typename Transform, typename Transform,
typename Value, typename Value,
typename NewError> typename NewError,
typename Handlers>
class map_error_transform_helper { class map_error_transform_helper {
public: public:
map_error_transform_helper( map_error_transform_helper(
Transform &&transform, Transform &&transform,
const consumer<Value, NewError> &consumer) const consumer<Value, NewError, Handlers> &consumer)
: _transform(std::move(transform)) : _transform(std::move(transform))
, _consumer(consumer) { , _consumer(consumer) {
} }
@ -152,7 +155,7 @@ public:
} }
private: private:
consumer<Value, NewError> _consumer; consumer<Value, NewError, Handlers> _consumer;
Transform _transform; Transform _transform;
}; };
@ -161,12 +164,13 @@ template <
typename Transform, typename Transform,
typename Value, typename Value,
typename NewError, typename NewError,
typename Handlers,
typename = std::enable_if_t< typename = std::enable_if_t<
std::is_rvalue_reference_v<Transform&&>>> std::is_rvalue_reference_v<Transform&&>>>
inline map_error_transform_helper<Transform, Value, NewError> inline map_error_transform_helper<Transform, Value, NewError, Handlers>
map_error_transform( map_error_transform(
Transform &&transform, Transform &&transform,
const consumer<Value, NewError> &consumer) { const consumer<Value, NewError, Handlers> &consumer) {
return { std::move(transform), consumer }; return { std::move(transform), consumer };
} }
@ -189,7 +193,7 @@ public:
return make_producer<Value, NewError>([ return make_producer<Value, NewError>([
initial = std::move(initial), initial = std::move(initial),
transform = std::move(_transform) transform = std::move(_transform)
](const consumer<Value, NewError> &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
[consumer](auto &&value) { [consumer](auto &&value) {
consumer.put_next_forward( consumer.put_next_forward(

View File

@ -26,9 +26,7 @@ namespace rpl {
template <typename Value = empty_value, typename Error = no_error> template <typename Value = empty_value, typename Error = no_error>
inline auto never() { inline auto never() {
using consumer_type = consumer<Value, Error>; return make_producer<Value, Error>([](const auto &consumer) {
return make_producer<Value, Error>([](
const consumer_type &consumer) {
return lifetime(); return lifetime();
}); });
} }

View File

@ -60,9 +60,10 @@ private:
template <typename Value, typename Error> template <typename Value, typename Error>
class type_erased_generator final { class type_erased_generator final {
public: public:
template <typename Handlers>
using consumer_type = consumer<Value, Error, Handlers>;
using value_type = Value; using value_type = Value;
using error_type = Error; using error_type = Error;
using consumer_type = consumer<Value, Error>;
type_erased_generator( type_erased_generator(
const type_erased_generator &other) = default; const type_erased_generator &other) = default;
@ -102,12 +103,13 @@ public:
return *this; return *this;
} }
lifetime operator()(const consumer_type &consumer) { template <typename Handlers>
lifetime operator()(const consumer_type<Handlers> &consumer) {
return _implementation(consumer); return _implementation(consumer);
} }
private: private:
base::lambda<lifetime(const consumer_type &)> _implementation; base::lambda<lifetime(const consumer_type<type_erased_handlers<Value, Error>> &)> _implementation;
}; };
@ -162,9 +164,10 @@ namespace details {
template <typename Value, typename Error, typename Generator> template <typename Value, typename Error, typename Generator>
class producer_base { class producer_base {
public: public:
template <typename Handlers>
using consumer_type = consumer<Value, Error, Handlers>;
using value_type = Value; using value_type = Value;
using error_type = Error; using error_type = Error;
using consumer_type = consumer<Value, Error>;
template < template <
typename OtherGenerator, typename OtherGenerator,
@ -203,7 +206,9 @@ public:
OnError &&error, OnError &&error,
OnDone &&done) const &; OnDone &&done) const &;
lifetime start_existing(const consumer_type &consumer) &&; template <typename Handlers>
lifetime start_existing(
const consumer_type<Handlers> &consumer) &&;
private: private:
Generator _generator; Generator _generator;
@ -233,7 +238,7 @@ inline lifetime producer_base<Value, Error, Generator>::start(
OnNext &&next, OnNext &&next,
OnError &&error, OnError &&error,
OnDone &&done) && { OnDone &&done) && {
return std::move(*this).start_existing(consumer<Value, Error>( return std::move(*this).start_existing(make_consumer<Value, Error>(
std::forward<OnNext>(next), std::forward<OnNext>(next),
std::forward<OnError>(error), std::forward<OnError>(error),
std::forward<OnDone>(done))); std::forward<OnDone>(done)));
@ -257,8 +262,9 @@ inline lifetime producer_base<Value, Error, Generator>::start_copy(
} }
template <typename Value, typename Error, typename Generator> template <typename Value, typename Error, typename Generator>
template <typename Handlers>
inline lifetime producer_base<Value, Error, Generator>::start_existing( inline lifetime producer_base<Value, Error, Generator>::start_existing(
const consumer_type &consumer) && { const consumer_type<Handlers> &consumer) && {
consumer.add_lifetime(std::move(_generator)(consumer)); consumer.add_lifetime(std::move(_generator)(consumer));
return [consumer] { consumer.terminate(); }; return [consumer] { consumer.terminate(); };
} }
@ -278,6 +284,7 @@ class producer final
Value, Value,
Error, Error,
Generator>; Generator>;
public: public:
using parent_type::parent_type; using parent_type::parent_type;
@ -292,6 +299,7 @@ class producer<
using parent_type = details::producer_base_type_erased< using parent_type = details::producer_base_type_erased<
Value, Value,
Error>; Error>;
public: public:
using parent_type::parent_type;; using parent_type::parent_type;;

View File

@ -26,10 +26,9 @@ namespace rpl {
template <typename Value> template <typename Value>
inline auto single(Value &&value) { inline auto single(Value &&value) {
using consumer_type = consumer<std::decay_t<Value>, no_error>;
return make_producer<std::decay_t<Value>>([ return make_producer<std::decay_t<Value>>([
value = std::forward<Value>(value) value = std::forward<Value>(value)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
consumer.put_next(std::move(value)); consumer.put_next(std::move(value));
consumer.put_done(); consumer.put_done();
return lifetime(); return lifetime();
@ -37,9 +36,7 @@ inline auto single(Value &&value) {
} }
inline auto single() { inline auto single() {
using consumer_type = consumer<empty_value, no_error>; return make_producer<>([](const auto &consumer) {
return make_producer<>([](
const consumer_type &consumer) {
consumer.put_next({}); consumer.put_next({});
consumer.put_done(); consumer.put_done();
return lifetime(); return lifetime();
@ -48,10 +45,9 @@ inline auto single() {
template <typename Value> template <typename Value>
inline auto vector(std::vector<Value> &&values) { inline auto vector(std::vector<Value> &&values) {
using consumer_type = consumer<Value, no_error>;
return make_producer<Value>([ return make_producer<Value>([
values = std::move(values) values = std::move(values)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
for (auto &value : values) { for (auto &value : values) {
consumer.put_next(std::move(value)); consumer.put_next(std::move(value));
} }
@ -61,10 +57,9 @@ inline auto vector(std::vector<Value> &&values) {
} }
inline auto vector(std::vector<bool> &&values) { inline auto vector(std::vector<bool> &&values) {
using consumer_type = consumer<bool, no_error>;
return make_producer<bool>([ return make_producer<bool>([
values = std::move(values) values = std::move(values)
](const consumer_type &consumer) { ](const auto &consumer) {
for (auto value : values) { for (auto value : values) {
consumer.put_next_copy(value); consumer.put_next_copy(value);
} }
@ -85,8 +80,7 @@ inline auto range(Range &&range) {
inline auto ints(int from, int till) { inline auto ints(int from, int till) {
Expects(from <= till); Expects(from <= till);
return make_producer<int>([from, till]( return make_producer<int>([from, till](const auto &consumer) {
const consumer<int> &consumer) {
for (auto i = from; i != till; ++i) { for (auto i = from; i != till; ++i) {
consumer.put_next_copy(i); consumer.put_next_copy(i);
} }

View File

@ -41,11 +41,10 @@ public:
auto operator()( auto operator()(
producer<OtherValue, OtherError, OtherGenerator> &&initial producer<OtherValue, OtherError, OtherGenerator> &&initial
) { ) {
using consumer_type = consumer<NewValue, NewError>;
return make_producer<NewValue, NewError>([ return make_producer<NewValue, NewError>([
initial = std::move(initial), initial = std::move(initial),
following = std::move(_following) following = std::move(_following)
](const consumer_type &consumer) mutable { ](const auto &consumer) mutable {
return std::move(initial).start( return std::move(initial).start(
[consumer](auto &&value) { [consumer](auto &&value) {
consumer.put_next_forward( consumer.put_next_forward(