diff --git a/player/mruby/observable.mrb b/player/mruby/observable.mrb new file mode 100644 index 0000000000..3b97461dd0 --- /dev/null +++ b/player/mruby/observable.mrb @@ -0,0 +1,73 @@ +# inspired by toy-rx: https://github.com/staltz/toy-rx +class Subscriber + attr_writer :subscription + + def initialize(observer) + @observer = observer + end + + def next(value) + @observer.next(value) + end + + def error(error) + @observer.error(error) + end + + def complete + @observer.complete + end + + def unsubscribe + @subscription.unsubscribe unless @subscription.nil? + end +end + +class Observable + def initialize(&subscribe) + @subscribe = subscribe + end + + def self.create(&subscribe) + fail ArgumentError, "`create` needs a subscribe block" unless block_given? + new do |observer| + subscriber = Subscriber.new(observer) + subscription = subscribe.call(subscriber); + subscriber.subscription = subscription + subscription + end + end + + # XXX probably nicer to define operators in another file + def map(&tx) + inobservable = self + Observable.create do |out| + observer = { + next: -> (x) { + begin + out.next(tx.call(x)) + rescue => e + out.error(e) + end + }, + error: -> (e) { out.error(e) }, + complete: -> (e) { out.complete }, + }; + inobservable.subscribe(observer) + end + end + + def subscribe(callbacks = {}, &nextcb) + callbacks[:next] = nextcb if block_given? + noop = -> () {} + + klass = Class.new do + define_method(:next, callbacks.fetch(:next, noop)) + define_method(:error, callbacks.fetch(:error, noop)) + define_method(:complete, callbacks.fetch(:complete, noop)) + end + + observer = klass.new + @subscribe.call(observer) + end +end diff --git a/player/mruby/observable_test.rb b/player/mruby/observable_test.rb new file mode 100644 index 0000000000..1a5cfcac6b --- /dev/null +++ b/player/mruby/observable_test.rb @@ -0,0 +1,27 @@ +# require[_relative] doesn't like non .rb/.so/.a extensions in MRI +load File.expand_path('./observable.mrb', __dir__) + +empty = Observable.create { |observer| observer.complete } +value = -> (v) { Observable.create { |observer| observer.next(v) } } +failure = Observable.create { |observer| observer.error("some error") } + +subscriptions = { + next: -> (x) { puts "next: '#{x}'" }, + error: -> (x) { puts "error: '#{x}'" }, + complete: -> () { puts "complete" } +} + +empty.subscribe(subscriptions) +value.("hello world").subscribe(subscriptions) +value.("hello world").subscribe { |x| puts "block based subscription: '#{x}'" } +value.(2).map { |x| x + 1 } .subscribe(subscriptions) +failure.subscribe(subscriptions) +failure # prints nothing, observables are lazy! + +# output will be: + +# complete +# next: 'hello world' +# block based sub hello world +# next: '3' +# error: 'some error'