rx java - How do you unsubscribe from a live stream -


i trying understand how unsubscribe observable created live feed. here more or less code:

somefeed feed = new somefeed(); observable<pricetick> observable = observable.create(s ->   feed.register(new somelistener() {     @override     public void pricetick(pricetick event) {       s.onnext(event);     }      @override     public void error(throwable throwable) {       s.onerror(throwable);     }   }) ); subscription subscription = observable.subscribe(system.out::println); subscription.unsubscribe(); system.out.println("is unsubscribed:" + subscription.isunsubscribed()); // prints true 

i finding after subscription unsubscribed, subscriber still outputting event stream.

how can unsubscribe remove subscriber receiving further notifications?

the observable creating doesn't obey observable contract. not handling unsubscribtion , backpreasure. observable.create makes easy create such observables.

instead can use other techniques create observables. can create observables defer operator or observable.fromcallable.

for code can use:

observable<pricetick> observable = observable.create(s ->   feed.register(new somelistener() {     @override     public void pricetick(pricetick event) {       if(!s.isunsbscribed()) {         s.onnext(event);       }     }      @override     public void error(throwable throwable) {       if(!s.isunsbscribed()) {         s.onerror(throwable);       }     }   }) ); 

this still doesn't handle backpreasure.

more creating observables here.


Comments

Popular posts from this blog

mysql - Dreamhost PyCharm Django Python 3 Launching a Site -

java - Sending SMS with SMSLib and Web Services -

java - How to resolve The method toString() in the type Object is not applicable for the arguments (InputStream) -