476 lines
11 KiB
C++
476 lines
11 KiB
C++
/*
|
|
This file is part of Telegram Desktop,
|
|
the official desktop application for the Telegram messaging service.
|
|
|
|
For license and copyright information please follow this link:
|
|
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
|
|
*/
|
|
#pragma once
|
|
|
|
#include <vector>
|
|
#include <deque>
|
|
#include <rpl/producer.h>
|
|
#include "base/type_traits.h"
|
|
|
|
namespace base {
|
|
namespace internal {
|
|
|
|
using ObservableCallHandlers = Fn<void()>;
|
|
void RegisterPendingObservable(ObservableCallHandlers *handlers);
|
|
void UnregisterActiveObservable(ObservableCallHandlers *handlers);
|
|
void UnregisterObservable(ObservableCallHandlers *handlers);
|
|
|
|
template <typename EventType>
|
|
struct SubscriptionHandlerHelper {
|
|
using type = Fn<void(parameter_type<EventType>)>;
|
|
};
|
|
|
|
template <>
|
|
struct SubscriptionHandlerHelper<void> {
|
|
using type = Fn<void()>;
|
|
};
|
|
|
|
template <typename EventType>
|
|
using SubscriptionHandler = typename SubscriptionHandlerHelper<EventType>::type;
|
|
|
|
class BaseObservableData {
|
|
};
|
|
|
|
template <typename EventType, typename Handler>
|
|
class CommonObservableData;
|
|
|
|
template <typename EventType, typename Handler>
|
|
class ObservableData;
|
|
|
|
} // namespace internal
|
|
|
|
class Subscription {
|
|
public:
|
|
Subscription() = default;
|
|
Subscription(const Subscription &) = delete;
|
|
Subscription &operator=(const Subscription &) = delete;
|
|
Subscription(Subscription &&other) : _node(base::take(other._node)), _removeAndDestroyMethod(other._removeAndDestroyMethod) {
|
|
}
|
|
Subscription &operator=(Subscription &&other) {
|
|
qSwap(_node, other._node);
|
|
qSwap(_removeAndDestroyMethod, other._removeAndDestroyMethod);
|
|
return *this;
|
|
}
|
|
explicit operator bool() const {
|
|
return (_node != nullptr);
|
|
}
|
|
void destroy() {
|
|
if (_node) {
|
|
(*_removeAndDestroyMethod)(base::take(_node));
|
|
}
|
|
}
|
|
~Subscription() {
|
|
destroy();
|
|
}
|
|
|
|
private:
|
|
struct Node {
|
|
Node(const std::shared_ptr<internal::BaseObservableData> &observable)
|
|
: observable(observable) {
|
|
}
|
|
Node *next = nullptr;
|
|
Node *prev = nullptr;
|
|
std::weak_ptr<internal::BaseObservableData> observable;
|
|
};
|
|
using RemoveAndDestroyMethod = void(*)(Node*);
|
|
Subscription(Node *node, RemoveAndDestroyMethod removeAndDestroyMethod)
|
|
: _node(node)
|
|
, _removeAndDestroyMethod(removeAndDestroyMethod) {
|
|
}
|
|
|
|
Node *_node = nullptr;
|
|
RemoveAndDestroyMethod _removeAndDestroyMethod;
|
|
|
|
template <typename EventType, typename Handler>
|
|
friend class internal::CommonObservableData;
|
|
|
|
template <typename EventType, typename Handler>
|
|
friend class internal::ObservableData;
|
|
|
|
};
|
|
|
|
namespace internal {
|
|
|
|
template <typename EventType, typename Handler, bool EventTypeIsSimple>
|
|
class BaseObservable;
|
|
|
|
template <typename EventType, typename Handler>
|
|
class CommonObservable {
|
|
public:
|
|
Subscription add_subscription(Handler &&handler) {
|
|
if (!_data) {
|
|
_data = std::make_shared<ObservableData<EventType, Handler>>(this);
|
|
}
|
|
return _data->append(std::move(handler));
|
|
}
|
|
|
|
private:
|
|
std::shared_ptr<ObservableData<EventType, Handler>> _data;
|
|
|
|
friend class CommonObservableData<EventType, Handler>;
|
|
friend class BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value>;
|
|
|
|
};
|
|
|
|
template <typename EventType, typename Handler>
|
|
class BaseObservable<EventType, Handler, true> : public internal::CommonObservable<EventType, Handler> {
|
|
public:
|
|
void notify(EventType event, bool sync = false) {
|
|
if (this->_data) {
|
|
this->_data->notify(std::move(event), sync);
|
|
}
|
|
}
|
|
|
|
};
|
|
|
|
template <typename EventType, typename Handler>
|
|
class BaseObservable<EventType, Handler, false> : public internal::CommonObservable<EventType, Handler> {
|
|
public:
|
|
void notify(EventType &&event, bool sync = false) {
|
|
if (this->_data) {
|
|
this->_data->notify(std::move(event), sync);
|
|
}
|
|
}
|
|
void notify(const EventType &event, bool sync = false) {
|
|
if (this->_data) {
|
|
auto event_copy = event;
|
|
this->_data->notify(std::move(event_copy), sync);
|
|
}
|
|
}
|
|
|
|
};
|
|
|
|
} // namespace internal
|
|
|
|
namespace internal {
|
|
|
|
template <typename EventType, typename Handler>
|
|
class CommonObservableData : public BaseObservableData {
|
|
public:
|
|
CommonObservableData(CommonObservable<EventType, Handler> *observable) : _observable(observable) {
|
|
}
|
|
|
|
Subscription append(Handler &&handler) {
|
|
auto node = new Node(_observable->_data, std::move(handler));
|
|
if (_begin) {
|
|
_end->next = node;
|
|
node->prev = _end;
|
|
_end = node;
|
|
} else {
|
|
_begin = _end = node;
|
|
}
|
|
return { _end, &CommonObservableData::removeAndDestroyNode };
|
|
}
|
|
|
|
bool empty() const {
|
|
return !_begin;
|
|
}
|
|
|
|
private:
|
|
struct Node : public Subscription::Node {
|
|
Node(
|
|
const std::shared_ptr<BaseObservableData> &observer,
|
|
Handler &&handler)
|
|
: Subscription::Node(observer)
|
|
, handler(std::move(handler)) {
|
|
}
|
|
Handler handler;
|
|
};
|
|
|
|
void remove(Subscription::Node *node) {
|
|
if (node->prev) {
|
|
node->prev->next = node->next;
|
|
}
|
|
if (node->next) {
|
|
node->next->prev = node->prev;
|
|
}
|
|
if (_begin == node) {
|
|
_begin = static_cast<Node*>(node->next);
|
|
}
|
|
if (_end == node) {
|
|
_end = static_cast<Node*>(node->prev);
|
|
}
|
|
if (_current == node) {
|
|
_current = static_cast<Node*>(node->prev);
|
|
} else if (!_begin) {
|
|
_observable->_data.reset();
|
|
}
|
|
}
|
|
|
|
static void removeAndDestroyNode(Subscription::Node *node) {
|
|
if (const auto that = node->observable.lock()) {
|
|
static_cast<CommonObservableData*>(that.get())->remove(node);
|
|
}
|
|
delete static_cast<Node*>(node);
|
|
}
|
|
|
|
template <typename CallCurrent>
|
|
void notifyEnumerate(CallCurrent callCurrent) {
|
|
_current = _begin;
|
|
do {
|
|
callCurrent();
|
|
if (_current) {
|
|
_current = static_cast<Node*>(_current->next);
|
|
} else if (_begin) {
|
|
_current = _begin;
|
|
} else {
|
|
break;
|
|
}
|
|
} while (_current);
|
|
}
|
|
|
|
bool destroyMeIfEmpty() const {
|
|
if (empty()) {
|
|
_observable->_data.reset();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
CommonObservable<EventType, Handler> *_observable = nullptr;
|
|
Node *_begin = nullptr;
|
|
Node *_current = nullptr;
|
|
Node *_end = nullptr;
|
|
ObservableCallHandlers _callHandlers;
|
|
|
|
friend class ObservableData<EventType, Handler>;
|
|
|
|
};
|
|
|
|
template <typename EventType, typename Handler>
|
|
class ObservableData : public CommonObservableData<EventType, Handler> {
|
|
public:
|
|
using CommonObservableData<EventType, Handler>::CommonObservableData;
|
|
|
|
void notify(EventType &&event, bool sync) {
|
|
if (_handling) {
|
|
sync = false;
|
|
}
|
|
if (sync) {
|
|
_events.push_back(std::move(event));
|
|
callHandlers();
|
|
} else {
|
|
if (!this->_callHandlers) {
|
|
this->_callHandlers = [this]() {
|
|
callHandlers();
|
|
};
|
|
}
|
|
if (_events.empty()) {
|
|
RegisterPendingObservable(&this->_callHandlers);
|
|
}
|
|
_events.push_back(std::move(event));
|
|
}
|
|
}
|
|
|
|
~ObservableData() {
|
|
UnregisterObservable(&this->_callHandlers);
|
|
}
|
|
|
|
private:
|
|
void callHandlers() {
|
|
_handling = true;
|
|
auto events = base::take(_events);
|
|
for (auto &event : events) {
|
|
this->notifyEnumerate([this, &event]() {
|
|
this->_current->handler(event);
|
|
});
|
|
if (this->destroyMeIfEmpty()) {
|
|
return;
|
|
}
|
|
}
|
|
_handling = false;
|
|
UnregisterActiveObservable(&this->_callHandlers);
|
|
}
|
|
|
|
std::deque<EventType> _events;
|
|
bool _handling = false;
|
|
|
|
};
|
|
|
|
template <class Handler>
|
|
class ObservableData<void, Handler> : public CommonObservableData<void, Handler> {
|
|
public:
|
|
using CommonObservableData<void, Handler>::CommonObservableData;
|
|
|
|
void notify(bool sync) {
|
|
if (_handling) {
|
|
sync = false;
|
|
}
|
|
if (sync) {
|
|
++_eventsCount;
|
|
callHandlers();
|
|
} else {
|
|
if (!this->_callHandlers) {
|
|
this->_callHandlers = [this]() {
|
|
callHandlers();
|
|
};
|
|
}
|
|
if (!_eventsCount) {
|
|
RegisterPendingObservable(&this->_callHandlers);
|
|
}
|
|
++_eventsCount;
|
|
}
|
|
}
|
|
|
|
~ObservableData() {
|
|
UnregisterObservable(&this->_callHandlers);
|
|
}
|
|
|
|
private:
|
|
void callHandlers() {
|
|
_handling = true;
|
|
auto eventsCount = base::take(_eventsCount);
|
|
for (int i = 0; i != eventsCount; ++i) {
|
|
this->notifyEnumerate([this]() {
|
|
this->_current->handler();
|
|
});
|
|
if (this->destroyMeIfEmpty()) {
|
|
return;
|
|
}
|
|
}
|
|
_handling = false;
|
|
UnregisterActiveObservable(&this->_callHandlers);
|
|
}
|
|
|
|
int _eventsCount = 0;
|
|
bool _handling = false;
|
|
|
|
};
|
|
|
|
template <typename Handler>
|
|
class BaseObservable<void, Handler, base::type_traits<void>::is_fast_copy_type::value> : public internal::CommonObservable<void, Handler> {
|
|
public:
|
|
void notify(bool sync = false) {
|
|
if (this->_data) {
|
|
this->_data->notify(sync);
|
|
}
|
|
}
|
|
|
|
};
|
|
|
|
} // namespace internal
|
|
|
|
template <typename EventType, typename Handler = internal::SubscriptionHandler<EventType>>
|
|
class Observable : public internal::BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value> {
|
|
public:
|
|
Observable() = default;
|
|
Observable(const Observable &other) = delete;
|
|
Observable(Observable &&other) = default;
|
|
Observable &operator=(const Observable &other) = delete;
|
|
Observable &operator=(Observable &&other) = default;
|
|
|
|
};
|
|
|
|
template <typename Type>
|
|
class Variable {
|
|
public:
|
|
Variable(parameter_type<Type> startValue = Type()) : _value(startValue) {
|
|
}
|
|
Variable(Variable &&other) = default;
|
|
Variable &operator=(Variable &&other) = default;
|
|
|
|
parameter_type<Type> value() const {
|
|
return _value;
|
|
}
|
|
|
|
void setForced(parameter_type<Type> newValue, bool sync = false) {
|
|
_value = newValue;
|
|
changed().notify(_value, sync);
|
|
}
|
|
|
|
void set(parameter_type<Type> newValue, bool sync = false) {
|
|
if (_value != newValue) {
|
|
setForced(newValue, sync);
|
|
}
|
|
}
|
|
|
|
template <typename Callback>
|
|
void process(Callback callback, bool sync = false) {
|
|
callback(_value);
|
|
changed().notify(_value, sync);
|
|
}
|
|
|
|
Observable<Type> &changed() const {
|
|
return _changed;
|
|
}
|
|
|
|
private:
|
|
Type _value;
|
|
mutable Observable<Type> _changed;
|
|
|
|
};
|
|
|
|
class Subscriber {
|
|
protected:
|
|
template <typename EventType, typename Handler, typename Lambda>
|
|
int subscribe(base::Observable<EventType, Handler> &observable, Lambda &&handler) {
|
|
_subscriptions.push_back(observable.add_subscription(std::forward<Lambda>(handler)));
|
|
return _subscriptions.size();
|
|
}
|
|
|
|
template <typename EventType, typename Handler, typename Lambda>
|
|
int subscribe(base::Observable<EventType, Handler> *observable, Lambda &&handler) {
|
|
return subscribe(*observable, std::forward<Lambda>(handler));
|
|
}
|
|
|
|
template <typename Type, typename Lambda>
|
|
int subscribe(const base::Variable<Type> &variable, Lambda &&handler) {
|
|
return subscribe(variable.changed(), std::forward<Lambda>(handler));
|
|
}
|
|
|
|
template <typename Type, typename Lambda>
|
|
int subscribe(const base::Variable<Type> *variable, Lambda &&handler) {
|
|
return subscribe(variable->changed(), std::forward<Lambda>(handler));
|
|
}
|
|
|
|
void unsubscribe(int index) {
|
|
if (!index) return;
|
|
auto count = static_cast<int>(_subscriptions.size());
|
|
Assert(index > 0 && index <= count);
|
|
_subscriptions[index - 1].destroy();
|
|
if (index == count) {
|
|
while (index > 0 && !_subscriptions[--index]) {
|
|
_subscriptions.pop_back();
|
|
}
|
|
}
|
|
}
|
|
|
|
~Subscriber() {
|
|
auto subscriptions = base::take(_subscriptions);
|
|
for (auto &subscription : subscriptions) {
|
|
subscription.destroy();
|
|
}
|
|
}
|
|
|
|
private:
|
|
std::vector<base::Subscription> _subscriptions;
|
|
|
|
};
|
|
|
|
void HandleObservables();
|
|
|
|
template <
|
|
typename Type,
|
|
typename = std::enable_if_t<!std::is_same_v<Type, void>>>
|
|
inline auto ObservableViewer(base::Observable<Type> &observable) {
|
|
return rpl::make_producer<Type>([&observable](
|
|
const auto &consumer) {
|
|
auto lifetime = rpl::lifetime();
|
|
lifetime.make_state<base::Subscription>(
|
|
observable.add_subscription([consumer](auto &&update) {
|
|
consumer.put_next_forward(
|
|
std::forward<decltype(update)>(update));
|
|
}));
|
|
return lifetime;
|
|
});
|
|
}
|
|
|
|
rpl::producer<> ObservableViewer(base::Observable<void> &observable);
|
|
|
|
} // namespace base
|