tdesktop/Telegram/SourceFiles/rpl/combine.h

350 lines
9.5 KiB
C++

/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "base/optional.h"
#include "base/variant.h"
#include <rpl/map.h>
#include <rpl/producer.h>
#include <rpl/details/type_list.h>
#include <rpl/details/callable.h>
#include <rpl/mappers.h>
#include <rpl/complete.h>
namespace rpl {
namespace details {
template <typename ...Values>
struct combine_state {
combine_state() : accumulated(std::tuple<std::optional<Values>...>()) {
}
std::optional<std::tuple<std::optional<Values>...>> accumulated;
std::optional<std::tuple<Values...>> latest;
int invalid = sizeof...(Values);
int working = sizeof...(Values);
};
template <typename ...Values, std::size_t ...I>
inline std::tuple<Values...> combine_make_first(
std::tuple<std::optional<Values>...> &&accumulated,
std::index_sequence<I...>) {
return std::make_tuple(std::move(*std::get<I>(accumulated))...);
}
template <size_t Index, typename consumer_type, typename ...Values>
class combine_subscribe_one {
public:
combine_subscribe_one(
const consumer_type &consumer,
combine_state<Values...> *state)
: _consumer(consumer)
, _state(state) {
}
template <typename Value, typename Error, typename Generator>
void subscribe(producer<Value, Error, Generator> &&producer) {
_consumer.add_lifetime(std::move(producer).start(
[consumer = _consumer, state = _state](Value &&value) {
if (!state->accumulated) {
std::get<Index>(*state->latest) = std::move(value);
consumer.put_next_copy(*state->latest);
} else {
auto &accumulated = std::get<Index>(
*state->accumulated);
if (accumulated) {
accumulated = std::move(value);
} else {
accumulated = std::move(value);
if (!--state->invalid) {
constexpr auto kArity = sizeof...(Values);
state->latest = combine_make_first(
std::move(*state->accumulated),
std::make_index_sequence<kArity>());
state->accumulated = std::nullopt;
consumer.put_next_copy(*state->latest);
}
}
}
}, [consumer = _consumer](auto &&error) {
consumer.put_error_forward(
std::forward<decltype(error)>(error));
}, [consumer = _consumer, state = _state] {
if (!--state->working) {
consumer.put_done();
}
}));
}
private:
const consumer_type &_consumer;
combine_state<Values...> *_state = nullptr;
};
template <
typename consumer_type,
typename ...Values,
typename ...Errors,
typename ...Generators,
std::size_t ...I>
inline void combine_subscribe(
const consumer_type &consumer,
combine_state<Values...> *state,
std::index_sequence<I...>,
std::tuple<producer<Values, Errors, Generators>...> &&saved) {
auto consume = { (
combine_subscribe_one<I, consumer_type, Values...>(
consumer,
state
).subscribe(std::get<I>(std::move(saved))), 0)... };
(void)consume;
}
template <typename ...Producers>
class combine_implementation_helper;
template <typename ...Producers>
combine_implementation_helper<std::decay_t<Producers>...>
make_combine_implementation_helper(Producers &&...producers) {
return combine_implementation_helper<std::decay_t<Producers>...>(
std::forward<Producers>(producers)...);
}
template <
typename ...Values,
typename ...Errors,
typename ...Generators>
class combine_implementation_helper<producer<Values, Errors, Generators>...> {
public:
using CombinedValue = std::tuple<Values...>;
using CombinedError = base::normalized_variant_t<Errors...>;
combine_implementation_helper(
producer<Values, Errors, Generators> &&...producers)
: _saved(std::make_tuple(std::move(producers)...)) {
}
template <typename Handlers>
lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
auto state = consumer.template make_state<
combine_state<Values...>>();
constexpr auto kArity = sizeof...(Values);
combine_subscribe(
consumer,
state,
std::make_index_sequence<kArity>(),
std::move(_saved));
return lifetime();
}
private:
std::tuple<producer<Values, Errors, Generators>...> _saved;
};
template <
typename ...Values,
typename ...Errors,
typename ...Generators>
inline auto combine_implementation(
producer<Values, Errors, Generators> &&...producers) {
using CombinedValue = std::tuple<Values...>;
using CombinedError = base::normalized_variant_t<Errors...>;
return make_producer<CombinedValue, CombinedError>(
make_combine_implementation_helper(std::move(producers)...));
}
template <typename ...Args>
struct combine_just_producers : std::false_type {
};
template <typename ...Args>
constexpr bool combine_just_producers_v
= combine_just_producers<Args...>::value;
template <
typename ...Values,
typename ...Errors,
typename ...Generators>
struct combine_just_producers<
producer<Values, Errors, Generators>...>
: std::true_type {
};
template <typename ArgsList>
struct combine_just_producers_list
: type_list::extract_to_t<ArgsList, combine_just_producers> {
};
template <typename ...Args>
struct combine_result_type;
template <typename ...Args>
using combine_result_type_t
= typename combine_result_type<Args...>::type;
template <
typename ...Values,
typename ...Errors,
typename ...Generators>
struct combine_result_type<producer<Values, Errors, Generators>...> {
using type = std::tuple<Values...>;
};
template <typename ArgsList>
struct combine_result_type_list
: type_list::extract_to_t<ArgsList, combine_result_type> {
};
template <typename ArgsList>
using combine_result_type_list_t
= typename combine_result_type_list<ArgsList>::type;
template <typename ArgsList>
using combine_producers_no_mapper_t
= type_list::chop_last_t<ArgsList>;
template <typename ArgsList>
constexpr bool combine_is_good_mapper(std::true_type) {
return is_callable_v<
type_list::last_t<ArgsList>,
combine_result_type_list_t<
combine_producers_no_mapper_t<ArgsList>
>>;
}
template <typename ArgsList>
constexpr bool combine_is_good_mapper(std::false_type) {
return false;
}
template <typename ArgsList>
struct combine_producers_with_mapper_list : std::bool_constant<
combine_is_good_mapper<ArgsList>(
combine_just_producers_list<
combine_producers_no_mapper_t<ArgsList>
>())> {
};
template <typename ...Args>
struct combine_producers_with_mapper
: combine_producers_with_mapper_list<type_list::list<Args...>> {
};
template <typename ...Args>
constexpr bool combine_producers_with_mapper_v
= combine_producers_with_mapper<Args...>::value;
template <typename ...Producers, std::size_t ...I>
inline decltype(auto) combine_call(
std::index_sequence<I...>,
Producers &&...producers) {
return combine_implementation(
argument_mapper<I>::call(std::move(producers)...)...);
}
} // namespace details
template <
typename ...Args,
typename = std::enable_if_t<
details::combine_just_producers_v<Args...>
|| details::combine_producers_with_mapper_v<Args...>>>
inline decltype(auto) combine(Args &&...args) {
if constexpr (details::combine_just_producers_v<Args...>) {
return details::combine_implementation(std::move(args)...);
} else if constexpr (details::combine_producers_with_mapper_v<Args...>) {
constexpr auto kProducersCount = sizeof...(Args) - 1;
return details::combine_call(
std::make_index_sequence<kProducersCount>(),
std::forward<Args>(args)...)
| map(details::argument_mapper<kProducersCount>::call(
std::forward<Args>(args)...));
} else {
static_assert(false_(args...), "Bad combine() call.");
}
}
namespace details {
template <typename Value>
struct combine_vector_state {
std::vector<std::optional<Value>> accumulated;
std::vector<Value> latest;
int invalid = 0;
int working = 0;
};
} // namespace details
template <typename Value, typename Error, typename Generator>
inline auto combine(
std::vector<producer<Value, Error, Generator>> &&producers) {
using state_type = details::combine_vector_state<Value>;
return make_producer<std::vector<Value>, Error>([
producers = std::move(producers)
](const auto &consumer) mutable {
auto count = producers.size();
auto state = consumer.template make_state<state_type>();
state->accumulated.resize(count);
state->invalid = count;
state->working = count;
for (auto index = 0; index != count; ++index) {
auto &producer = producers[index];
consumer.add_lifetime(std::move(producer).start(
[consumer, state, index](Value &&value) {
if (state->accumulated.empty()) {
state->latest[index] = std::move(value);
consumer.put_next_copy(state->latest);
} else if (state->accumulated[index]) {
state->accumulated[index] = std::move(value);
} else {
state->accumulated[index] = std::move(value);
if (!--state->invalid) {
state->latest.reserve(
state->accumulated.size());
for (auto &&value : state->accumulated) {
state->latest.push_back(
std::move(*value));
}
details::take(state->accumulated);
consumer.put_next_copy(state->latest);
}
}
}, [consumer](auto &&error) {
consumer.put_error_forward(
std::forward<decltype(error)>(error));
}, [consumer, state] {
if (!--state->working) {
consumer.put_done();
}
}));
}
if (!count) {
consumer.put_done();
}
return lifetime();
});
}
template <
typename Value,
typename Error,
typename Generator,
typename Mapper>
inline auto combine(
std::vector<producer<Value, Error, Generator>> &&producers,
Mapper &&mapper) {
return combine(std::move(producers))
| map(std::forward<Mapper>(mapper));
}
} // namespace rpl