mirror of https://github.com/mpv-player/mpv
mruby: add a basic ruby implementation of obsevables
This commit is contained in:
parent
200f336376
commit
2d4add3fb7
|
@ -0,0 +1,73 @@
|
||||||
|
# inspired by toy-rx: https://github.com/staltz/toy-rx
|
||||||
|
class Subscriber
|
||||||
|
attr_writer :subscription
|
||||||
|
|
||||||
|
def initialize(observer)
|
||||||
|
@observer = observer
|
||||||
|
end
|
||||||
|
|
||||||
|
def next(value)
|
||||||
|
@observer.next(value)
|
||||||
|
end
|
||||||
|
|
||||||
|
def error(error)
|
||||||
|
@observer.error(error)
|
||||||
|
end
|
||||||
|
|
||||||
|
def complete
|
||||||
|
@observer.complete
|
||||||
|
end
|
||||||
|
|
||||||
|
def unsubscribe
|
||||||
|
@subscription.unsubscribe unless @subscription.nil?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Observable
|
||||||
|
def initialize(&subscribe)
|
||||||
|
@subscribe = subscribe
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.create(&subscribe)
|
||||||
|
fail ArgumentError, "`create` needs a subscribe block" unless block_given?
|
||||||
|
new do |observer|
|
||||||
|
subscriber = Subscriber.new(observer)
|
||||||
|
subscription = subscribe.call(subscriber);
|
||||||
|
subscriber.subscription = subscription
|
||||||
|
subscription
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# XXX probably nicer to define operators in another file
|
||||||
|
def map(&tx)
|
||||||
|
inobservable = self
|
||||||
|
Observable.create do |out|
|
||||||
|
observer = {
|
||||||
|
next: -> (x) {
|
||||||
|
begin
|
||||||
|
out.next(tx.call(x))
|
||||||
|
rescue => e
|
||||||
|
out.error(e)
|
||||||
|
end
|
||||||
|
},
|
||||||
|
error: -> (e) { out.error(e) },
|
||||||
|
complete: -> (e) { out.complete },
|
||||||
|
};
|
||||||
|
inobservable.subscribe(observer)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def subscribe(callbacks = {}, &nextcb)
|
||||||
|
callbacks[:next] = nextcb if block_given?
|
||||||
|
noop = -> () {}
|
||||||
|
|
||||||
|
klass = Class.new do
|
||||||
|
define_method(:next, callbacks.fetch(:next, noop))
|
||||||
|
define_method(:error, callbacks.fetch(:error, noop))
|
||||||
|
define_method(:complete, callbacks.fetch(:complete, noop))
|
||||||
|
end
|
||||||
|
|
||||||
|
observer = klass.new
|
||||||
|
@subscribe.call(observer)
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,27 @@
|
||||||
|
# require[_relative] doesn't like non .rb/.so/.a extensions in MRI
|
||||||
|
load File.expand_path('./observable.mrb', __dir__)
|
||||||
|
|
||||||
|
empty = Observable.create { |observer| observer.complete }
|
||||||
|
value = -> (v) { Observable.create { |observer| observer.next(v) } }
|
||||||
|
failure = Observable.create { |observer| observer.error("some error") }
|
||||||
|
|
||||||
|
subscriptions = {
|
||||||
|
next: -> (x) { puts "next: '#{x}'" },
|
||||||
|
error: -> (x) { puts "error: '#{x}'" },
|
||||||
|
complete: -> () { puts "complete" }
|
||||||
|
}
|
||||||
|
|
||||||
|
empty.subscribe(subscriptions)
|
||||||
|
value.("hello world").subscribe(subscriptions)
|
||||||
|
value.("hello world").subscribe { |x| puts "block based subscription: '#{x}'" }
|
||||||
|
value.(2).map { |x| x + 1 } .subscribe(subscriptions)
|
||||||
|
failure.subscribe(subscriptions)
|
||||||
|
failure # prints nothing, observables are lazy!
|
||||||
|
|
||||||
|
# output will be:
|
||||||
|
|
||||||
|
# complete
|
||||||
|
# next: 'hello world'
|
||||||
|
# block based sub hello world
|
||||||
|
# next: '3'
|
||||||
|
# error: 'some error'
|
Loading…
Reference in New Issue