diff --git a/Telegram/SourceFiles/base/algorithm.h b/Telegram/SourceFiles/base/algorithm.h index 44188b05eb..aea8ecd1a5 100644 --- a/Telegram/SourceFiles/base/algorithm.h +++ b/Telegram/SourceFiles/base/algorithm.h @@ -22,6 +22,38 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace base { +// This version of remove_if allows predicate to push_back() items. +// The added items won't be tested for predicate but just left in the container. +template +void push_back_safe_remove_if( + Container &&container, + Predicate &&predicate) { + auto first = size_t(0); + auto count = container.size(); + auto moveFrom = first; + for (; moveFrom != count; ++moveFrom) { + if (predicate(container[moveFrom])) { + break; + } + } + if (moveFrom != count) { + auto moveTo = moveFrom; + for (++moveFrom; moveFrom != count; ++moveFrom) { + if (!predicate(container[moveFrom])) { + container[moveTo++] = std::move(container[moveFrom]); + } + } + + // Move items that we've added while checking the initial items. + count = container.size(); + for (; moveFrom != count; ++moveFrom) { + container[moveTo++] = std::move(container[moveFrom]); + } + + container.erase(container.begin() + moveTo, container.end()); + } +} + template decltype(auto) for_each(Range &&range, Method &&method) { return std::for_each( @@ -37,4 +69,20 @@ decltype(auto) for_each_apply(Method &&method) { }; } +template +decltype(auto) find(Range &&range, Value &&value) { + return std::find( + std::begin(std::forward(range)), + std::end(std::forward(range)), + std::forward(value)); +} + +template +decltype(auto) find_if(Range &&range, Predicate &&predicate) { + return std::find_if( + std::begin(std::forward(range)), + std::end(std::forward(range)), + std::forward(predicate)); +} + } // namespace base diff --git a/Telegram/SourceFiles/base/algorithm_tests.cpp b/Telegram/SourceFiles/base/algorithm_tests.cpp new file mode 100644 index 0000000000..f5eaeea621 --- /dev/null +++ b/Telegram/SourceFiles/base/algorithm_tests.cpp @@ -0,0 +1,54 @@ +/* +This file is part of Telegram Desktop, +the official desktop version of Telegram messaging app, see https://telegram.org + +Telegram Desktop is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +It is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +In addition, as a special exception, the copyright holders give permission +to link the code of portions of this program with the OpenSSL library. + +Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE +Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org +*/ +#include "catch.hpp" + +#include "base/algorithm.h" + +TEST_CASE("push_back_safe_remove_if tests", "[base::algorithm]") { + auto v = std::vector(); + + SECTION("doesn't change an empty vector") { + base::push_back_safe_remove_if(v, [](int) { return true; }); + REQUIRE(v.empty()); + } + + v.insert(v.end(), { 1, 2, 3, 4, 5, 4, 3, 2, 1 }); + + SECTION("allows to push_back from predicate") { + base::push_back_safe_remove_if(v, [&v](int value) { + v.push_back(value); + return (value % 2) == 1; + }); + auto expected = std::vector { 2, 4, 4, 2, 1, 2, 3, 4, 5, 4, 3, 2, 1 }; + REQUIRE(v == expected); + } + + SECTION("allows to push_back while removing all") { + base::push_back_safe_remove_if(v, [&v](int value) { + if (value == 5) { + v.push_back(value); + } + return true; + }); + auto expected = std::vector { 5 }; + REQUIRE(v == expected); + } +} \ No newline at end of file diff --git a/Telegram/SourceFiles/rpl/consumer.h b/Telegram/SourceFiles/rpl/consumer.h index 01529b416a..75601cbfe7 100644 --- a/Telegram/SourceFiles/rpl/consumer.h +++ b/Telegram/SourceFiles/rpl/consumer.h @@ -54,6 +54,25 @@ public: void setLifetime(lifetime &&lifetime) const; void terminate() const; + bool operator==(const consumer &other) const { + return _instance == other._instance; + } + bool operator!=(const consumer &other) const { + return !(*this == other); + } + bool operator<(const consumer &other) const { + return _instance < other._instance; + } + bool operator>(const consumer &other) const { + return other < *this; + } + bool operator<=(const consumer &other) const { + return !(other < *this); + } + bool operator>=(const consumer &other) const { + return !(*this < other); + } + private: class abstract_consumer_instance; @@ -66,7 +85,7 @@ private: OnError &&error, OnDone &&done); - std::shared_ptr _instance; + mutable std::shared_ptr _instance; }; @@ -149,27 +168,43 @@ consumer::consumer( template bool consumer::putNext(Value value) const { - return _instance->putNext(std::move(value)); + if (_instance) { + if (_instance->putNext(std::move(value))) { + return true; + } + _instance = nullptr; + } + return false; } template void consumer::putError(Error error) const { - return _instance->putError(std::move(error)); + if (_instance) { + std::exchange(_instance, nullptr)->putError(std::move(error)); + } } template void consumer::putDone() const { - return _instance->putDone(); + if (_instance) { + std::exchange(_instance, nullptr)->putDone(); + } } template void consumer::setLifetime(lifetime &&lifetime) const { - return _instance->setLifetime(std::move(lifetime)); + if (_instance) { + _instance->setLifetime(std::move(lifetime)); + } else { + lifetime.destroy(); + } } template void consumer::terminate() const { - return _instance->terminate(); + if (_instance) { + std::exchange(_instance, nullptr)->terminate(); + } } template diff --git a/Telegram/SourceFiles/rpl/event_stream.h b/Telegram/SourceFiles/rpl/event_stream.h new file mode 100644 index 0000000000..ef4f1c419e --- /dev/null +++ b/Telegram/SourceFiles/rpl/event_stream.h @@ -0,0 +1,92 @@ +/* +This file is part of Telegram Desktop, +the official desktop version of Telegram messaging app, see https://telegram.org + +Telegram Desktop is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +It is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +In addition, as a special exception, the copyright holders give permission +to link the code of portions of this program with the OpenSSL library. + +Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE +Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org +*/ +#pragma once + +#include "producer.h" +#include "base/algorithm.h" + +namespace rpl { + +template +class event_stream { +public: + event_stream(); + + void fire(Value value); + producer events(); + + ~event_stream(); + +private: + std::weak_ptr weak() const { + return _strong; + } + void addConsumer(consumer &&consumer) { + _consumers.push_back(std::move(consumer)); + } + void removeConsumer(const consumer &consumer) { + auto it = base::find(_consumers, consumer); + if (it != _consumers.end()) { + it->terminate(); + } + } + + std::shared_ptr _strong; + std::vector> _consumers; + +}; + +template +event_stream::event_stream() + : _strong(std::make_shared(this)) { +} + +template +void event_stream::fire(Value value) { + base::push_back_safe_remove_if(_consumers, [&](auto &consumer) { + return !consumer.putNext(value); + }); +} + +template +producer event_stream::events() { + return producer([weak = weak()](consumer consumer) { + if (auto strong = weak.lock()) { + auto result = [weak, consumer] { + if (auto strong = weak.lock()) { + (*strong)->removeConsumer(consumer); + } + }; + (*strong)->addConsumer(std::move(consumer)); + return lifetime(std::move(result)); + } + return lifetime(); + }); +} + +template +event_stream::~event_stream() { + for (auto &consumer : _consumers) { + consumer.putDone(); + } +} + +} // namespace rpl diff --git a/Telegram/SourceFiles/rpl/lifetime.h b/Telegram/SourceFiles/rpl/lifetime.h index 171e1621b1..59f2c39b57 100644 --- a/Telegram/SourceFiles/rpl/lifetime.h +++ b/Telegram/SourceFiles/rpl/lifetime.h @@ -68,8 +68,9 @@ lifetime::lifetime(lifetime &&other) } lifetime &lifetime::operator=(lifetime &&other) { - _destroy = std::exchange(other._destroy, base::lambda_once()); - _nested = std::exchange(other._nested, std::vector()); + std::swap(_destroy, other._destroy); + std::swap(_nested, other._nested); + other.destroy(); return *this; } diff --git a/Telegram/SourceFiles/rpl/producer_tests.cpp b/Telegram/SourceFiles/rpl/producer_tests.cpp index e7861c8a1b..c1d7f38052 100644 --- a/Telegram/SourceFiles/rpl/producer_tests.cpp +++ b/Telegram/SourceFiles/rpl/producer_tests.cpp @@ -21,6 +21,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "catch.hpp" #include "rpl/producer.h" +#include "rpl/event_stream.h" using namespace rpl; @@ -155,5 +156,147 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { saved.destroy(); REQUIRE(*lifetimeEndCount == 3); } + + SECTION("event_stream basic test") { + auto sum = std::make_shared(0); + rpl::event_stream stream; + stream.fire(1); + stream.fire(2); + stream.fire(3); + { + auto lifetime = stream.events().start([=, &stream](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + }); + stream.fire(11); + stream.fire(12); + stream.fire(13); + } + stream.fire(21); + stream.fire(22); + stream.fire(23); + + REQUIRE(11 + 12 + 13); + } + + SECTION("event_stream add in handler test") { + auto sum = std::make_shared(0); + rpl::event_stream stream; + + { + auto composite = lifetime(); + composite = stream.events().start([=, &stream, &composite](int value) { + *sum += value; + composite.add(stream.events().start([=](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + })); + }, [=](no_error) { + }, [=] { + }); + + { + auto inner = lifetime(); + inner = stream.events().start([=, &stream, &inner](int value) { + *sum += value; + inner.add(stream.events().start([=](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + })); + }, [=](no_error) { + }, [=] { + }); + + stream.fire(1); + stream.fire(2); + stream.fire(3); + } + stream.fire(11); + stream.fire(12); + stream.fire(13); + } + stream.fire(21); + stream.fire(22); + stream.fire(23); + + REQUIRE(*sum == + (1 + 1) + + ((2 + 2) + (2 + 2)) + + ((3 + 3 + 3) + (3 + 3 + 3)) + + (11 + 11 + 11 + 11) + + (12 + 12 + 12 + 12 + 12) + + (13 + 13 + 13 + 13 + 13 + 13)); + } + + SECTION("event_stream add and remove in handler test") { + auto sum = std::make_shared(0); + rpl::event_stream stream; + + { + auto composite = lifetime(); + composite = stream.events().start([=, &stream, &composite](int value) { + *sum += value; + composite = stream.events().start([=](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + }); + }, [=](no_error) { + }, [=] { + }); + + { + auto inner = lifetime(); + inner = stream.events().start([=, &stream, &inner](int value) { + *sum += value; + inner = stream.events().start([=](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + }); + }, [=](no_error) { + }, [=] { + }); + + stream.fire(1); + stream.fire(2); + stream.fire(3); + } + stream.fire(11); + stream.fire(12); + stream.fire(13); + } + stream.fire(21); + stream.fire(22); + stream.fire(23); + + REQUIRE(*sum == + (1 + 1) + + (2 + 2) + + (3 + 3) + + (11) + + (12) + + (13)); + } + + SECTION("event_stream ends before handler lifetime") { + auto sum = std::make_shared(0); + lifetime extended; + { + rpl::event_stream stream; + extended = stream.events().start([=](int value) { + *sum += value; + }, [=](no_error) { + }, [=] { + }); + stream.fire(1); + stream.fire(2); + stream.fire(3); + } + REQUIRE(*sum == 1 + 2 + 3); + } } diff --git a/Telegram/gyp/telegram_sources.txt b/Telegram/gyp/telegram_sources.txt index a4d28535dd..34fa5a3083 100644 --- a/Telegram/gyp/telegram_sources.txt +++ b/Telegram/gyp/telegram_sources.txt @@ -394,6 +394,7 @@ <(src_loc)/profile/profile_widget.cpp <(src_loc)/profile/profile_widget.h <(src_loc)/rpl/consumer.h +<(src_loc)/rpl/event_stream.h <(src_loc)/rpl/lifetime.h <(src_loc)/rpl/producer.h <(src_loc)/settings/settings_advanced_widget.cpp diff --git a/Telegram/gyp/tests/tests.gyp b/Telegram/gyp/tests/tests.gyp index 9cef4e41a0..b428e29a25 100644 --- a/Telegram/gyp/tests/tests.gyp +++ b/Telegram/gyp/tests/tests.gyp @@ -55,6 +55,24 @@ ], 'message': 'Running <(RULE_INPUT_ROOT)..', }] + }, { + 'target_name': 'tests_algorithm', + 'includes': [ + 'common_test.gypi', + ], + 'sources': [ + '<(src_loc)/base/algorithm.h', + '<(src_loc)/base/algorithm_tests.cpp', + ], + }, { + 'target_name': 'tests_flags', + 'includes': [ + 'common_test.gypi', + ], + 'sources': [ + '<(src_loc)/base/flags.h', + '<(src_loc)/base/flags_tests.cpp', + ], }, { 'target_name': 'tests_flat_map', 'includes': [ @@ -73,15 +91,6 @@ '<(src_loc)/base/flat_set.h', '<(src_loc)/base/flat_set_tests.cpp', ], - }, { - 'target_name': 'tests_flags', - 'includes': [ - 'common_test.gypi', - ], - 'sources': [ - '<(src_loc)/base/flags.h', - '<(src_loc)/base/flags_tests.cpp', - ], }, { 'target_name': 'tests_producer', 'includes': [ @@ -89,6 +98,7 @@ ], 'sources': [ '<(src_loc)/rpl/consumer.h', + '<(src_loc)/rpl/event_stream.h', '<(src_loc)/rpl/lifetime.h', '<(src_loc)/rpl/producer.h', '<(src_loc)/rpl/producer_tests.cpp', diff --git a/Telegram/gyp/tests/tests_list.txt b/Telegram/gyp/tests/tests_list.txt index a901682b3d..d2220189f9 100644 --- a/Telegram/gyp/tests/tests_list.txt +++ b/Telegram/gyp/tests/tests_list.txt @@ -1,4 +1,5 @@ +tests_algorithm +tests_flags tests_flat_map tests_flat_set -tests_flags tests_producer