/* This file is part of Telegram Desktop, the official desktop application for the Telegram messaging service. For license and copyright information please follow this link: https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once #include #include #include #include #include #include "base/assertion.h" #include "base/index_based_iterator.h" namespace rpl { // Currently not thread-safe :( template class event_stream { public: event_stream(); event_stream(event_stream &&other); event_stream &operator=(event_stream &&other); template void fire_forward(OtherValue &&value) const; void fire(Value &&value) const { return fire_forward(std::move(value)); } void fire_copy(const Value &value) const { return fire_forward(value); } auto events() const { return make_producer([weak = make_weak()]( const auto &consumer) { if (auto strong = weak.lock()) { auto result = [weak, consumer] { if (auto strong = weak.lock()) { auto it = std::find( strong->consumers.begin(), strong->consumers.end(), consumer); if (it != strong->consumers.end()) { it->terminate(); } } }; strong->consumers.push_back(std::move(consumer)); return lifetime(std::move(result)); } return lifetime(); }); } auto events_starting_with(Value &&value) const { return single(std::move(value)) | then(events()); } auto events_starting_with_copy(const Value &value) const { return single(value) | then(events()); } ~event_stream(); private: struct Data { std::vector> consumers; int depth = 0; }; std::weak_ptr make_weak() const; mutable std::shared_ptr _data; }; template inline event_stream::event_stream() { } template inline event_stream::event_stream(event_stream &&other) : _data(details::take(other._data)) { } template inline event_stream &event_stream::operator=( event_stream &&other) { _data = details::take(other._data); return *this; } template template inline void event_stream::fire_forward( OtherValue &&value) const { auto copy = _data; if (!copy) { return; } ++copy->depth; auto &consumers = copy->consumers; auto begin = base::index_based_begin(consumers); auto end = base::index_based_end(consumers); if (begin != end) { // Copy value for every consumer except the last. auto prev = end - 1; auto removeFrom = std::remove_if(begin, prev, [&](auto &consumer) { return !consumer.put_next_copy(value); }); // Perhaps move value for the last consumer. if (prev->put_next_forward(std::forward(value))) { if (removeFrom != prev) { *removeFrom++ = std::move(*prev); } else { ++removeFrom; } } if (removeFrom != end) { // Move new consumers. auto newEnd = base::index_based_end(consumers); if (newEnd != end) { Assert(newEnd > end); while (end != newEnd) { *removeFrom++ = *end++; } } // Erase stale consumers. if (copy->depth == 1) { consumers.erase(removeFrom.base(), consumers.end()); } } } --copy->depth; } template inline auto event_stream::make_weak() const -> std::weak_ptr { if (!_data) { _data = std::make_shared(); } return _data; } template inline event_stream::~event_stream() { if (auto data = details::take(_data)) { for (auto &consumer : data->consumers) { consumer.put_done(); } } } template inline auto start_to_stream( event_stream &stream, lifetime &alive_while) { return start_with_next([&stream](auto &&value) { stream.fire_forward(std::forward(value)); }, alive_while); } namespace details { class start_spawning_helper { public: start_spawning_helper(lifetime &alive_while) : _lifetime(alive_while) { } template auto operator()(producer &&initial) { auto stream = _lifetime.make_state>(); auto collected = std::vector(); { auto collecting = stream->events().start( [&collected](Value &&value) { collected.push_back(std::move(value)); }, [](const Error &error) {}, [] {}); std::move(initial) | start_to_stream(*stream, _lifetime); } return vector(std::move(collected)) | then(stream->events()); } private: lifetime &_lifetime; }; } // namespace details inline auto start_spawning(lifetime &alive_while) -> details::start_spawning_helper { return details::start_spawning_helper(alive_while); } } // namespace rpl