/* This file is part of Telegram Desktop, the official desktop application for the Telegram messaging service. For license and copyright information please follow this link: https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once #include "base/optional.h" #include "base/variant.h" #include #include #include #include #include #include namespace rpl { namespace details { template struct combine_state { combine_state() : accumulated(std::tuple...>()) { } std::optional...>> accumulated; std::optional> latest; int invalid = sizeof...(Values); int working = sizeof...(Values); }; template inline std::tuple combine_make_first( std::tuple...> &&accumulated, std::index_sequence) { return std::make_tuple(std::move(*std::get(accumulated))...); } template class combine_subscribe_one { public: combine_subscribe_one( const consumer_type &consumer, combine_state *state) : _consumer(consumer) , _state(state) { } template void subscribe(producer &&producer) { _consumer.add_lifetime(std::move(producer).start( [consumer = _consumer, state = _state](Value &&value) { if (!state->accumulated) { std::get(*state->latest) = std::move(value); consumer.put_next_copy(*state->latest); } else { auto &accumulated = std::get( *state->accumulated); if (accumulated) { accumulated = std::move(value); } else { accumulated = std::move(value); if (!--state->invalid) { constexpr auto kArity = sizeof...(Values); state->latest = combine_make_first( std::move(*state->accumulated), std::make_index_sequence()); state->accumulated = std::nullopt; consumer.put_next_copy(*state->latest); } } } }, [consumer = _consumer](auto &&error) { consumer.put_error_forward( std::forward(error)); }, [consumer = _consumer, state = _state] { if (!--state->working) { consumer.put_done(); } })); } private: const consumer_type &_consumer; combine_state *_state = nullptr; }; template < typename consumer_type, typename ...Values, typename ...Errors, typename ...Generators, std::size_t ...I> inline void combine_subscribe( const consumer_type &consumer, combine_state *state, std::index_sequence, std::tuple...> &&saved) { auto consume = { ( combine_subscribe_one( consumer, state ).subscribe(std::get(std::move(saved))), 0)... }; (void)consume; } template class combine_implementation_helper; template combine_implementation_helper...> make_combine_implementation_helper(Producers &&...producers) { return combine_implementation_helper...>( std::forward(producers)...); } template < typename ...Values, typename ...Errors, typename ...Generators> class combine_implementation_helper...> { public: using CombinedValue = std::tuple; using CombinedError = base::normalized_variant_t; combine_implementation_helper( producer &&...producers) : _saved(std::make_tuple(std::move(producers)...)) { } template lifetime operator()(const consumer &consumer) { auto state = consumer.template make_state< combine_state>(); constexpr auto kArity = sizeof...(Values); combine_subscribe( consumer, state, std::make_index_sequence(), std::move(_saved)); return lifetime(); } private: std::tuple...> _saved; }; template < typename ...Values, typename ...Errors, typename ...Generators> inline auto combine_implementation( producer &&...producers) { using CombinedValue = std::tuple; using CombinedError = base::normalized_variant_t; return make_producer( make_combine_implementation_helper(std::move(producers)...)); } template struct combine_just_producers : std::false_type { }; template constexpr bool combine_just_producers_v = combine_just_producers::value; template < typename ...Values, typename ...Errors, typename ...Generators> struct combine_just_producers< producer...> : std::true_type { }; template struct combine_just_producers_list : type_list::extract_to_t { }; template struct combine_result_type; template using combine_result_type_t = typename combine_result_type::type; template < typename ...Values, typename ...Errors, typename ...Generators> struct combine_result_type...> { using type = std::tuple; }; template struct combine_result_type_list : type_list::extract_to_t { }; template using combine_result_type_list_t = typename combine_result_type_list::type; template using combine_producers_no_mapper_t = type_list::chop_last_t; template constexpr bool combine_is_good_mapper(std::true_type) { return is_callable_v< type_list::last_t, combine_result_type_list_t< combine_producers_no_mapper_t >>; } template constexpr bool combine_is_good_mapper(std::false_type) { return false; } template struct combine_producers_with_mapper_list : std::bool_constant< combine_is_good_mapper( combine_just_producers_list< combine_producers_no_mapper_t >())> { }; template struct combine_producers_with_mapper : combine_producers_with_mapper_list> { }; template constexpr bool combine_producers_with_mapper_v = combine_producers_with_mapper::value; template inline decltype(auto) combine_call( std::index_sequence, Producers &&...producers) { return combine_implementation( argument_mapper::call(std::move(producers)...)...); } } // namespace details template < typename ...Args, typename = std::enable_if_t< details::combine_just_producers_v || details::combine_producers_with_mapper_v>> inline decltype(auto) combine(Args &&...args) { if constexpr (details::combine_just_producers_v) { return details::combine_implementation(std::move(args)...); } else if constexpr (details::combine_producers_with_mapper_v) { constexpr auto kProducersCount = sizeof...(Args) - 1; return details::combine_call( std::make_index_sequence(), std::forward(args)...) | map(details::argument_mapper::call( std::forward(args)...)); } else { static_assert(false_(args...), "Bad combine() call."); } } namespace details { template struct combine_vector_state { std::vector> accumulated; std::vector latest; int invalid = 0; int working = 0; }; } // namespace details template inline auto combine( std::vector> &&producers) { using state_type = details::combine_vector_state; return make_producer, Error>([ producers = std::move(producers) ](const auto &consumer) mutable { auto count = producers.size(); auto state = consumer.template make_state(); state->accumulated.resize(count); state->invalid = count; state->working = count; for (auto index = 0; index != count; ++index) { auto &producer = producers[index]; consumer.add_lifetime(std::move(producer).start( [consumer, state, index](Value &&value) { if (state->accumulated.empty()) { state->latest[index] = std::move(value); consumer.put_next_copy(state->latest); } else if (state->accumulated[index]) { state->accumulated[index] = std::move(value); } else { state->accumulated[index] = std::move(value); if (!--state->invalid) { state->latest.reserve( state->accumulated.size()); for (auto &&value : state->accumulated) { state->latest.push_back( std::move(*value)); } details::take(state->accumulated); consumer.put_next_copy(state->latest); } } }, [consumer](auto &&error) { consumer.put_error_forward( std::forward(error)); }, [consumer, state] { if (!--state->working) { consumer.put_done(); } })); } if (!count) { consumer.put_done(); } return lifetime(); }); } template < typename Value, typename Error, typename Generator, typename Mapper> inline auto combine( std::vector> &&producers, Mapper &&mapper) { return combine(std::move(producers)) | map(std::forward(mapper)); } } // namespace rpl