Add event_stream for events with many consumers.

This commit is contained in:
John Preston 2017-09-03 18:57:51 +03:00
parent ebe4bbbf0f
commit 101fdb1fba
9 changed files with 403 additions and 18 deletions

View File

@ -22,6 +22,38 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
namespace base {
// This version of remove_if allows predicate to push_back() items.
// The added items won't be tested for predicate but just left in the container.
template <typename Container, typename Predicate>
void push_back_safe_remove_if(
Container &&container,
Predicate &&predicate) {
auto first = size_t(0);
auto count = container.size();
auto moveFrom = first;
for (; moveFrom != count; ++moveFrom) {
if (predicate(container[moveFrom])) {
break;
}
}
if (moveFrom != count) {
auto moveTo = moveFrom;
for (++moveFrom; moveFrom != count; ++moveFrom) {
if (!predicate(container[moveFrom])) {
container[moveTo++] = std::move(container[moveFrom]);
}
}
// Move items that we've added while checking the initial items.
count = container.size();
for (; moveFrom != count; ++moveFrom) {
container[moveTo++] = std::move(container[moveFrom]);
}
container.erase(container.begin() + moveTo, container.end());
}
}
template <typename Range, typename Method>
decltype(auto) for_each(Range &&range, Method &&method) {
return std::for_each(
@ -37,4 +69,20 @@ decltype(auto) for_each_apply(Method &&method) {
};
}
template <typename Range, typename Value>
decltype(auto) find(Range &&range, Value &&value) {
return std::find(
std::begin(std::forward<Range>(range)),
std::end(std::forward<Range>(range)),
std::forward<Value>(value));
}
template <typename Range, typename Predicate>
decltype(auto) find_if(Range &&range, Predicate &&predicate) {
return std::find_if(
std::begin(std::forward<Range>(range)),
std::end(std::forward<Range>(range)),
std::forward<Predicate>(predicate));
}
} // namespace base

View File

@ -0,0 +1,54 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#include "catch.hpp"
#include "base/algorithm.h"
TEST_CASE("push_back_safe_remove_if tests", "[base::algorithm]") {
auto v = std::vector<int>();
SECTION("doesn't change an empty vector") {
base::push_back_safe_remove_if(v, [](int) { return true; });
REQUIRE(v.empty());
}
v.insert(v.end(), { 1, 2, 3, 4, 5, 4, 3, 2, 1 });
SECTION("allows to push_back from predicate") {
base::push_back_safe_remove_if(v, [&v](int value) {
v.push_back(value);
return (value % 2) == 1;
});
auto expected = std::vector<int> { 2, 4, 4, 2, 1, 2, 3, 4, 5, 4, 3, 2, 1 };
REQUIRE(v == expected);
}
SECTION("allows to push_back while removing all") {
base::push_back_safe_remove_if(v, [&v](int value) {
if (value == 5) {
v.push_back(value);
}
return true;
});
auto expected = std::vector<int> { 5 };
REQUIRE(v == expected);
}
}

View File

@ -54,6 +54,25 @@ public:
void setLifetime(lifetime &&lifetime) const;
void terminate() const;
bool operator==(const consumer &other) const {
return _instance == other._instance;
}
bool operator!=(const consumer &other) const {
return !(*this == other);
}
bool operator<(const consumer &other) const {
return _instance < other._instance;
}
bool operator>(const consumer &other) const {
return other < *this;
}
bool operator<=(const consumer &other) const {
return !(other < *this);
}
bool operator>=(const consumer &other) const {
return !(*this < other);
}
private:
class abstract_consumer_instance;
@ -66,7 +85,7 @@ private:
OnError &&error,
OnDone &&done);
std::shared_ptr<abstract_consumer_instance> _instance;
mutable std::shared_ptr<abstract_consumer_instance> _instance;
};
@ -149,27 +168,43 @@ consumer<Value, Error>::consumer(
template <typename Value, typename Error>
bool consumer<Value, Error>::putNext(Value value) const {
return _instance->putNext(std::move(value));
if (_instance) {
if (_instance->putNext(std::move(value))) {
return true;
}
_instance = nullptr;
}
return false;
}
template <typename Value, typename Error>
void consumer<Value, Error>::putError(Error error) const {
return _instance->putError(std::move(error));
if (_instance) {
std::exchange(_instance, nullptr)->putError(std::move(error));
}
}
template <typename Value, typename Error>
void consumer<Value, Error>::putDone() const {
return _instance->putDone();
if (_instance) {
std::exchange(_instance, nullptr)->putDone();
}
}
template <typename Value, typename Error>
void consumer<Value, Error>::setLifetime(lifetime &&lifetime) const {
return _instance->setLifetime(std::move(lifetime));
if (_instance) {
_instance->setLifetime(std::move(lifetime));
} else {
lifetime.destroy();
}
}
template <typename Value, typename Error>
void consumer<Value, Error>::terminate() const {
return _instance->terminate();
if (_instance) {
std::exchange(_instance, nullptr)->terminate();
}
}
template <typename Value, typename Error>

View File

@ -0,0 +1,92 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include "producer.h"
#include "base/algorithm.h"
namespace rpl {
template <typename Value>
class event_stream {
public:
event_stream();
void fire(Value value);
producer<Value, no_error> events();
~event_stream();
private:
std::weak_ptr<event_stream*> weak() const {
return _strong;
}
void addConsumer(consumer<Value, no_error> &&consumer) {
_consumers.push_back(std::move(consumer));
}
void removeConsumer(const consumer<Value, no_error> &consumer) {
auto it = base::find(_consumers, consumer);
if (it != _consumers.end()) {
it->terminate();
}
}
std::shared_ptr<event_stream*> _strong;
std::vector<consumer<Value, no_error>> _consumers;
};
template <typename Value>
event_stream<Value>::event_stream()
: _strong(std::make_shared<event_stream*>(this)) {
}
template <typename Value>
void event_stream<Value>::fire(Value value) {
base::push_back_safe_remove_if(_consumers, [&](auto &consumer) {
return !consumer.putNext(value);
});
}
template <typename Value>
producer<Value, no_error> event_stream<Value>::events() {
return producer<Value, no_error>([weak = weak()](consumer<Value, no_error> consumer) {
if (auto strong = weak.lock()) {
auto result = [weak, consumer] {
if (auto strong = weak.lock()) {
(*strong)->removeConsumer(consumer);
}
};
(*strong)->addConsumer(std::move(consumer));
return lifetime(std::move(result));
}
return lifetime();
});
}
template <typename Value>
event_stream<Value>::~event_stream() {
for (auto &consumer : _consumers) {
consumer.putDone();
}
}
} // namespace rpl

View File

@ -68,8 +68,9 @@ lifetime::lifetime(lifetime &&other)
}
lifetime &lifetime::operator=(lifetime &&other) {
_destroy = std::exchange(other._destroy, base::lambda_once<void()>());
_nested = std::exchange(other._nested, std::vector<lifetime>());
std::swap(_destroy, other._destroy);
std::swap(_nested, other._nested);
other.destroy();
return *this;
}

View File

@ -21,6 +21,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "catch.hpp"
#include "rpl/producer.h"
#include "rpl/event_stream.h"
using namespace rpl;
@ -155,5 +156,147 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
saved.destroy();
REQUIRE(*lifetimeEndCount == 3);
}
SECTION("event_stream basic test") {
auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream;
stream.fire(1);
stream.fire(2);
stream.fire(3);
{
auto lifetime = stream.events().start([=, &stream](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
stream.fire(11);
stream.fire(12);
stream.fire(13);
}
stream.fire(21);
stream.fire(22);
stream.fire(23);
REQUIRE(11 + 12 + 13);
}
SECTION("event_stream add in handler test") {
auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream;
{
auto composite = lifetime();
composite = stream.events().start([=, &stream, &composite](int value) {
*sum += value;
composite.add(stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
}));
}, [=](no_error) {
}, [=] {
});
{
auto inner = lifetime();
inner = stream.events().start([=, &stream, &inner](int value) {
*sum += value;
inner.add(stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
}));
}, [=](no_error) {
}, [=] {
});
stream.fire(1);
stream.fire(2);
stream.fire(3);
}
stream.fire(11);
stream.fire(12);
stream.fire(13);
}
stream.fire(21);
stream.fire(22);
stream.fire(23);
REQUIRE(*sum ==
(1 + 1) +
((2 + 2) + (2 + 2)) +
((3 + 3 + 3) + (3 + 3 + 3)) +
(11 + 11 + 11 + 11) +
(12 + 12 + 12 + 12 + 12) +
(13 + 13 + 13 + 13 + 13 + 13));
}
SECTION("event_stream add and remove in handler test") {
auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream;
{
auto composite = lifetime();
composite = stream.events().start([=, &stream, &composite](int value) {
*sum += value;
composite = stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, [=](no_error) {
}, [=] {
});
{
auto inner = lifetime();
inner = stream.events().start([=, &stream, &inner](int value) {
*sum += value;
inner = stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
}, [=](no_error) {
}, [=] {
});
stream.fire(1);
stream.fire(2);
stream.fire(3);
}
stream.fire(11);
stream.fire(12);
stream.fire(13);
}
stream.fire(21);
stream.fire(22);
stream.fire(23);
REQUIRE(*sum ==
(1 + 1) +
(2 + 2) +
(3 + 3) +
(11) +
(12) +
(13));
}
SECTION("event_stream ends before handler lifetime") {
auto sum = std::make_shared<int>(0);
lifetime extended;
{
rpl::event_stream<int> stream;
extended = stream.events().start([=](int value) {
*sum += value;
}, [=](no_error) {
}, [=] {
});
stream.fire(1);
stream.fire(2);
stream.fire(3);
}
REQUIRE(*sum == 1 + 2 + 3);
}
}

View File

@ -394,6 +394,7 @@
<(src_loc)/profile/profile_widget.cpp
<(src_loc)/profile/profile_widget.h
<(src_loc)/rpl/consumer.h
<(src_loc)/rpl/event_stream.h
<(src_loc)/rpl/lifetime.h
<(src_loc)/rpl/producer.h
<(src_loc)/settings/settings_advanced_widget.cpp

View File

@ -55,6 +55,24 @@
],
'message': 'Running <(RULE_INPUT_ROOT)..',
}]
}, {
'target_name': 'tests_algorithm',
'includes': [
'common_test.gypi',
],
'sources': [
'<(src_loc)/base/algorithm.h',
'<(src_loc)/base/algorithm_tests.cpp',
],
}, {
'target_name': 'tests_flags',
'includes': [
'common_test.gypi',
],
'sources': [
'<(src_loc)/base/flags.h',
'<(src_loc)/base/flags_tests.cpp',
],
}, {
'target_name': 'tests_flat_map',
'includes': [
@ -73,15 +91,6 @@
'<(src_loc)/base/flat_set.h',
'<(src_loc)/base/flat_set_tests.cpp',
],
}, {
'target_name': 'tests_flags',
'includes': [
'common_test.gypi',
],
'sources': [
'<(src_loc)/base/flags.h',
'<(src_loc)/base/flags_tests.cpp',
],
}, {
'target_name': 'tests_producer',
'includes': [
@ -89,6 +98,7 @@
],
'sources': [
'<(src_loc)/rpl/consumer.h',
'<(src_loc)/rpl/event_stream.h',
'<(src_loc)/rpl/lifetime.h',
'<(src_loc)/rpl/producer.h',
'<(src_loc)/rpl/producer_tests.cpp',

View File

@ -1,4 +1,5 @@
tests_algorithm
tests_flags
tests_flat_map
tests_flat_set
tests_flags
tests_producer