rx java - How do you unsubscribe from a live stream -
i trying understand how unsubscribe observable created live feed. here more or less code:
somefeed feed = new somefeed(); observable<pricetick> observable = observable.create(s -> feed.register(new somelistener() { @override public void pricetick(pricetick event) { s.onnext(event); } @override public void error(throwable throwable) { s.onerror(throwable); } }) ); subscription subscription = observable.subscribe(system.out::println); subscription.unsubscribe(); system.out.println("is unsubscribed:" + subscription.isunsubscribed()); // prints true
i finding after subscription unsubscribed, subscriber still outputting event stream.
how can unsubscribe remove subscriber receiving further notifications?
the observable creating doesn't obey observable contract. not handling unsubscribtion , backpreasure. observable.create
makes easy create such observables.
instead can use other techniques create observables. can create observables defer
operator or observable.fromcallable
.
for code can use:
observable<pricetick> observable = observable.create(s -> feed.register(new somelistener() { @override public void pricetick(pricetick event) { if(!s.isunsbscribed()) { s.onnext(event); } } @override public void error(throwable throwable) { if(!s.isunsbscribed()) { s.onerror(throwable); } } }) );
this still doesn't handle backpreasure.
more creating observables here.
Comments
Post a Comment