这是indexloc提供的服务,不要输入任何密码
SlideShare a Scribd company logo
TOMASZ KOWALCZEWSKI
REACTIVE JAVA
 Reactive view of the world
 Desgining interface for reactive interactions (the cat experiment)
 Rx Java as implementation of that interface
 Lessons learned
STUFF I WILL TALK ABOUT
REACTIVE
“readily responsive to a stimulus”
Merriam-Webster dictionary
SYNCHRONOUS PULL COMMUNICATION
Client Server
Request
Response
Server processing
Network latency
OBSERVABLE ->
OBSERVER ->
SERVICE RETURNING OBSERVABLE
public interface ShrödingersCat {
boolean alive();
}
public interface ShrödingersCat {
Future<Boolean> alive();
}
public interface ShrödingersCat {
Iterator<Boolean> alive();
}
SUBCRIPTIONS AND EVENTS
t
subscribe
onNext*
onCompleted | onError
PULL VS PUSH
Observer Observable
Subscribe
...
onNext
Server processing
Network latency
Maybe this one needs all the data...
RX JAVA BY NETFLIX
 Open source project with Apache License.
 Java implementation of Rx Observables from Microsoft
 The Netflix API uses it to make the entire service layer asynchronous
 Provides a DSL for creating computation flows out of asynchronous sources
using collection of operators for filtering, selecting, transforming and
combining that flows in a lazy manner
 These flows are called Observables – collection of events with push
semantics (as oposed to pull in Iterator)
 Targets the JVM not a language. Currently supports Java, Groovy, Clojure,
and Scala
OBSERVABLE
public interface ShrödingersCat {
Observable<Boolean> alive();
}
SERVICE RETURNING OBSERVABLE
public interface ShrödingersCat {
Observable<Boolean> alive();
}
cat
.alive()
.subscribe(status -> System.out.println(status));
public interface ShrödingersCat {
Observable<Boolean> alive();
}
cat
.alive()
.throttleWithTimeout(250, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.filter(isAlive -> isAlive)
.map(Boolean::toString)
.subscribe(status -> display.display(status));
SERVICE RETURNING OBSERVABLE
 Maybe it executes its logic on subscriber thread?
 Maybe it delegates part of the work to other threads?
 Does it use NIO?
 Maybe its an actor?
 Does it return cached data?
 Observer does not care!
HOW IS THE OBSERVABLE IMPLEMENTED?
MARBLE DIAGRAMS
MAP(FUNC1)
MERGE(OBSERVABLE...)
CONCAT(OBSERVABLE...)
FLATMAP(FUNC)
Observable<ShrödingersCat> cats = listAllCats();
cats
.flatMap(cat ->
Observable
.from(catService.getPicturesFor(cat))
.filter(image -> image.size() < 100 * 1000)
)
).subscribe();
FLATMAP(FUNC)
CACHE
Random random = new Random();
Observable<Integer> observable = Observable
.range(1, 100)
.map(random::nextInt)
.cache();
observable.subscribe(System.out::println);
observable.subscribe(System.out::println);
...
 Always prints same values
INJECTING CUSTOM OPERATORS USING LIFT
class InternStrings implements Observable.Operator<String, String> {
public Subscriber<String> call(Subscriber<String> subscriber) {
return new Subscriber<String>() {
public void onCompleted() { subscriber.onCompleted(); }
public void onError(Throwable e) { subscriber.onError(e); }
public void onNext(String s) {
subscriber.onNext(s.intern()); };
}
}
Observable.from("AB", "CD", "AB", "DE")
.lift(new InternStrings())
.subscribe();
 Valuable for instrumentation
 Inject debug code – see rxjava-contrib/rxjava-debug
 Inject performance counters
ERROR HANDLING
 Correctly implemented observable will not produce any events after
error notification
 Operators available for fixing observables not adhering to this rule
 Pass custom error handling function to subscribe
 Transparently substite failing observable with another one
 Convert error into regular event
 Retry subscription in hope this time it will work...
ESCAPING THE MONAD
Iterable<String> strings = Observable.from(1, 2, 3, 4)
.map(i -> Integer.toString(i))
.toBlockingObservable()
.toIterable();
// or (and many more)
T firstOrDefault(T defaultValue, Func1 predicate)
Iterator<T> getIterator()
Iterable<T> next()
 Inverses the dependency, will wait for next item, then execute
 Usually to interact with other, synchronous APIs
 While migrating to reactive approach in small increments
 To trigger early evaluation while debugging
OBSERVER
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T args);
}
CREATING OBSERVABLES
Observable<Boolean> watchTheCat =
Observable.create(observer -> {
observer.onNext(cat.isAlive());
observer.onCompleted();
});
 create accepts OnSubscribe function
 Executed for every subscriber upon subscription
 This example is not asynchronous
CREATING OBSERVABLES
Observable.create(observer -> {
Future<?> brighterFuture = executorService.submit(() -> {
observer.onNext(cat.isAlive());
observer.onCompleted();
});
subscriber.add(Subscriptions.from(brighterFuture));
});
 Executes code in separate thread (from thread pool executorService)
 Stream of events is delivered by the executor thread
 Thread calling onNext() runs all the operations defined on observable
 Future is cancelled if client unsubscribes
CREATING OBSERVABLES
Observable<Boolean> watchTheCat =
Observable.create(observer -> {
observer.onNext(cat.isAlive());
observer.onCompleted();
})
.subscribeOn(scheduler);
 Subscribe function is executed on supplied scheduler (thin wrapper
over java.util.concurrent.Executor)
SUBSCRIPTION
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
UNSUBSCRIBING
Observable.create(subscriber -> {
for (long i = 0; !subscriber.isUnsubscribed(); i++) {
subscriber.onNext(i);
System.out.println("Emitted: " + i);
}
subscriber.onCompleted();
})
.take(10)
.subscribe(aLong -> {
System.out.println("Got: " + aLong);
});
 Take operator unsubscribes from observable after 10 iterations
CONCURRENCY
 Synchronous vs. asynchonous, single or multiple threaded is
implementation detail of service provider (Observable)
 As long as onNext calls are not executed concurrently
 So the framework does not have to synchronize everything
 Operators combining many Observables ensure serialized access
 In face of misbehaving observable serialize() operator forces
correct behaviour
 Passing pure functions to Rx operators is always the best bet
LESSONS LEARNED
 In our use cases performance profile is dominated by other system
components
 Performance depends on implementation of used operators and
may vary
 Contention points on operators that merge streams
 Carelessly creating 1000s threads (one for each task) when
NewThreadScheduler used. Reaching `ulimit –u` - and system
almost freezes :)
 Current version (0.19) has very sane defaults though
 Debugging and reasoning about subscriptions is not always easy.
 Insert doOnEach or doOnNext calls for debugging
 IDE support not satisfactory, problems in placing breakpoints inside
closures – IntelliJ IDEA 13 has smart step into closures which my
help
MORE INFORMATION
 https://github.com/Netflix/RxJava
 https://github.com/Netflix/RxJava/wiki
 http://www.infoq.com/author/Erik-Meijer
 React conference
http://www.youtube.com/playlist?list=PLSD48HvrE7-
Z1stQ1vIIBumB0wK0s8llY
 Cat picture taken from http://www.teckler.com/en/Rapunzel
Reactive Java (33rd Degree)
source:
flatmapthatshit.com
REMEBER: DON’T ITERATE - FLATMAP

More Related Content

What's hot (20)

PDF
Networking in Java with NIO and Netty
Constantine Slisenka
 
PPTX
How to Migrate from Oracle to EDB Postgres
Ashnikbiz
 
PDF
Models for hierarchical data
Karwin Software Solutions LLC
 
PPTX
Hash table in java
siriindian
 
PDF
Kafka Streams: What it is, and how to use it?
confluent
 
PDF
Big data-cheat-sheet
masoodkhh
 
PDF
Making Apache Spark Better with Delta Lake
Databricks
 
PDF
Apache ZooKeeper
Scott Leberknight
 
PDF
Developing real-time data pipelines with Spring and Kafka
marius_bogoevici
 
PDF
Scalability, Availability & Stability Patterns
Jonas Bonér
 
PPTX
Java Spring framework, Dependency Injection, DI, IoC, Inversion of Control
Arjun Thakur
 
PPTX
Kafka Tutorial - Introduction to Apache Kafka (Part 1)
Jean-Paul Azar
 
ODP
Introduction to Kafka connect
Knoldus Inc.
 
PDF
Introduction to Cassandra Architecture
nickmbailey
 
PDF
Custom DevOps Monitoring System in MelOn (with InfluxDB + Telegraf + Grafana)
Seungmin Yu
 
PDF
Spring I/O 2012: Natural Templating in Spring MVC with Thymeleaf
Thymeleaf
 
PPTX
PostgreSQL.pptx
MAYURUGALE6
 
PPTX
Spring data jpa
Jeevesh Pandey
 
PDF
MySQL InnoDB Cluster - New Features in 8.0 Releases - Best Practices
Kenny Gryp
 
PDF
Kafka 101 and Developer Best Practices
confluent
 
Networking in Java with NIO and Netty
Constantine Slisenka
 
How to Migrate from Oracle to EDB Postgres
Ashnikbiz
 
Models for hierarchical data
Karwin Software Solutions LLC
 
Hash table in java
siriindian
 
Kafka Streams: What it is, and how to use it?
confluent
 
Big data-cheat-sheet
masoodkhh
 
Making Apache Spark Better with Delta Lake
Databricks
 
Apache ZooKeeper
Scott Leberknight
 
Developing real-time data pipelines with Spring and Kafka
marius_bogoevici
 
Scalability, Availability & Stability Patterns
Jonas Bonér
 
Java Spring framework, Dependency Injection, DI, IoC, Inversion of Control
Arjun Thakur
 
Kafka Tutorial - Introduction to Apache Kafka (Part 1)
Jean-Paul Azar
 
Introduction to Kafka connect
Knoldus Inc.
 
Introduction to Cassandra Architecture
nickmbailey
 
Custom DevOps Monitoring System in MelOn (with InfluxDB + Telegraf + Grafana)
Seungmin Yu
 
Spring I/O 2012: Natural Templating in Spring MVC with Thymeleaf
Thymeleaf
 
PostgreSQL.pptx
MAYURUGALE6
 
Spring data jpa
Jeevesh Pandey
 
MySQL InnoDB Cluster - New Features in 8.0 Releases - Best Practices
Kenny Gryp
 
Kafka 101 and Developer Best Practices
confluent
 

Viewers also liked (20)

PPTX
Reactive Programming in Java 8 with Rx-Java
Kasun Indrasiri
 
PPTX
Introduction to Reactive Java
Tomasz Kowalczewski
 
PDF
Building Scalable Stateless Applications with RxJava
Rick Warren
 
PDF
RxJava - introduction & design
allegro.tech
 
PDF
Deep dive reactive java (DevoxxPl)
Tomasz Kowalczewski
 
PPTX
Supercharged java 8 : with cyclops-react
John McClean
 
PDF
Reactive Thinking in Java with RxJava2
Yakov Fain
 
PDF
AWS Java SDK @ scale
Tomasz Kowalczewski
 
PPTX
Tusul bichih argachlal
Copy Mn
 
PDF
Netherlands & Turkey
Roel Palmaers
 
PPTX
Mini training - Reactive Extensions (Rx)
Betclic Everest Group Tech Team
 
PPTX
Rx java in action
Pratama Nur Wijaya
 
PDF
Owner - Java properties reinvented.
Luigi Viggiano
 
PPT
Offline powerpoint
moejarv
 
PDF
Reactive design: languages, and paradigms
Dean Wampler
 
PDF
Java 8 Streams and Rx Java Comparison
José Paumard
 
PDF
Reactive Streams: Handling Data-Flow the Reactive Way
Roland Kuhn
 
PPT
Agile QA presentation
Carl Bruiners
 
PDF
Java 8 Stream API and RxJava Comparison
José Paumard
 
PPTX
RxJS In-Depth - AngularConnect 2015
Ben Lesh
 
Reactive Programming in Java 8 with Rx-Java
Kasun Indrasiri
 
Introduction to Reactive Java
Tomasz Kowalczewski
 
Building Scalable Stateless Applications with RxJava
Rick Warren
 
RxJava - introduction & design
allegro.tech
 
Deep dive reactive java (DevoxxPl)
Tomasz Kowalczewski
 
Supercharged java 8 : with cyclops-react
John McClean
 
Reactive Thinking in Java with RxJava2
Yakov Fain
 
AWS Java SDK @ scale
Tomasz Kowalczewski
 
Tusul bichih argachlal
Copy Mn
 
Netherlands & Turkey
Roel Palmaers
 
Mini training - Reactive Extensions (Rx)
Betclic Everest Group Tech Team
 
Rx java in action
Pratama Nur Wijaya
 
Owner - Java properties reinvented.
Luigi Viggiano
 
Offline powerpoint
moejarv
 
Reactive design: languages, and paradigms
Dean Wampler
 
Java 8 Streams and Rx Java Comparison
José Paumard
 
Reactive Streams: Handling Data-Flow the Reactive Way
Roland Kuhn
 
Agile QA presentation
Carl Bruiners
 
Java 8 Stream API and RxJava Comparison
José Paumard
 
RxJS In-Depth - AngularConnect 2015
Ben Lesh
 
Ad

Similar to Reactive Java (33rd Degree) (20)

PPTX
Reactive Java (GeeCON 2014)
Tomasz Kowalczewski
 
PDF
How to Think in RxJava Before Reacting
IndicThreads
 
PPTX
Rxandroid
Thinh Thanh
 
PPTX
Reactive programming with RxAndroid
Savvycom Savvycom
 
PPTX
RxAndroid
Thinh Thanh
 
PDF
The Mayans Lost Guide to RxJava on Android
Fernando Cejas
 
PPTX
An Introduction to RxJava
Sanjay Acharya
 
PPTX
RxJava 2 Reactive extensions for the JVM
Netesh Kumar
 
PDF
Streams, Streams Everywhere! An Introduction to Rx
Andrzej Sitek
 
PDF
Reactive Programming for a demanding world: building event-driven and respons...
Mario Fusco
 
PDF
Reactive Programming in Java by Mario Fusco - Codemotion Rome 2015
Codemotion
 
PDF
Intro to Rx Java
Syed Awais Mazhar Bukhari
 
PPTX
Intro to Reactive Thinking and RxJava 2
JollyRogers5
 
PPTX
Introduction to RxJava on Android
Chris Arriola
 
PDF
Saving lives with rx java
Shahar Barsheshet
 
PPTX
Intro to Functional Programming with RxJava
Mike Nakhimovich
 
PDF
RxJava@Android
Maxim Volgin
 
PPTX
Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak
HUJAK - Hrvatska udruga Java korisnika / Croatian Java User Association
 
PPTX
Reactive programming with rx java
CongTrung Vnit
 
PPTX
RxJava2 Slides
YarikS
 
Reactive Java (GeeCON 2014)
Tomasz Kowalczewski
 
How to Think in RxJava Before Reacting
IndicThreads
 
Rxandroid
Thinh Thanh
 
Reactive programming with RxAndroid
Savvycom Savvycom
 
RxAndroid
Thinh Thanh
 
The Mayans Lost Guide to RxJava on Android
Fernando Cejas
 
An Introduction to RxJava
Sanjay Acharya
 
RxJava 2 Reactive extensions for the JVM
Netesh Kumar
 
Streams, Streams Everywhere! An Introduction to Rx
Andrzej Sitek
 
Reactive Programming for a demanding world: building event-driven and respons...
Mario Fusco
 
Reactive Programming in Java by Mario Fusco - Codemotion Rome 2015
Codemotion
 
Intro to Rx Java
Syed Awais Mazhar Bukhari
 
Intro to Reactive Thinking and RxJava 2
JollyRogers5
 
Introduction to RxJava on Android
Chris Arriola
 
Saving lives with rx java
Shahar Barsheshet
 
Intro to Functional Programming with RxJava
Mike Nakhimovich
 
RxJava@Android
Maxim Volgin
 
Javantura v3 - Going Reactive with RxJava – Hrvoje Crnjak
HUJAK - Hrvatska udruga Java korisnika / Croatian Java User Association
 
Reactive programming with rx java
CongTrung Vnit
 
RxJava2 Slides
YarikS
 
Ad

More from Tomasz Kowalczewski (11)

PDF
[GeeCON2024] How I learned to stop worrying and love the dark silicon apocalypse
Tomasz Kowalczewski
 
PDF
How I learned to stop worrying and love the dark silicon apocalypse.pdf
Tomasz Kowalczewski
 
PDF
Is writing performant code too expensive?
Tomasz Kowalczewski
 
PDF
Is writing performant code too expensive?
Tomasz Kowalczewski
 
PDF
Is writing performant code too expensive?
Tomasz Kowalczewski
 
PDF
Everybody Lies
Tomasz Kowalczewski
 
PDF
Forgive me for i have allocated
Tomasz Kowalczewski
 
PPTX
Measure to fail
Tomasz Kowalczewski
 
PPTX
Reactive Java at JDD 2014
Tomasz Kowalczewski
 
PPTX
Java 8 jest tuż za rogiem
Tomasz Kowalczewski
 
PPTX
Java gets a closure
Tomasz Kowalczewski
 
[GeeCON2024] How I learned to stop worrying and love the dark silicon apocalypse
Tomasz Kowalczewski
 
How I learned to stop worrying and love the dark silicon apocalypse.pdf
Tomasz Kowalczewski
 
Is writing performant code too expensive?
Tomasz Kowalczewski
 
Is writing performant code too expensive?
Tomasz Kowalczewski
 
Is writing performant code too expensive?
Tomasz Kowalczewski
 
Everybody Lies
Tomasz Kowalczewski
 
Forgive me for i have allocated
Tomasz Kowalczewski
 
Measure to fail
Tomasz Kowalczewski
 
Reactive Java at JDD 2014
Tomasz Kowalczewski
 
Java 8 jest tuż za rogiem
Tomasz Kowalczewski
 
Java gets a closure
Tomasz Kowalczewski
 

Recently uploaded (20)

PPTX
Employee salary prediction using Machine learning Project template.ppt
bhanuk27082004
 
PDF
SAP GUI Installation Guide for macOS (iOS) | Connect to SAP Systems on Mac
SAP Vista, an A L T Z E N Company
 
PDF
10 posting ideas for community engagement with AI prompts
Pankaj Taneja
 
PPTX
API DOCUMENTATION | API INTEGRATION PLATFORM
philipnathen82
 
PPTX
Odoo Migration Services by CandidRoot Solutions
CandidRoot Solutions Private Limited
 
PDF
How to Download and Install ADT (ABAP Development Tools) for Eclipse IDE | SA...
SAP Vista, an A L T Z E N Company
 
PPTX
MiniTool Partition Wizard Crack 12.8 + Serial Key Download Latest [2025]
filmoracrack9001
 
PDF
Introduction to Apache Iceberg™ & Tableflow
Alluxio, Inc.
 
PDF
How AI in Healthcare Apps Can Help You Enhance Patient Care?
Lilly Gracia
 
PPTX
Operations Profile SPDX_Update_20250711_Example_05_03.pptx
Shane Coughlan
 
PDF
Summary Of Odoo 18.1 to 18.4 : The Way For Odoo 19
CandidRoot Solutions Private Limited
 
PDF
Australian Enterprises Need Project Service Automation
Navision India
 
PDF
Top 10 AI Use Cases Every Business Should Know.pdf
nicogonzalez1075
 
PDF
How to get the licensing right for Microsoft Core Infrastructure Server Suite...
Q-Advise
 
PDF
Odoo Customization Services by CandidRoot Solutions
CandidRoot Solutions Private Limited
 
PDF
Optimizing Tiered Storage for Low-Latency Real-Time Analytics at AI Scale
Alluxio, Inc.
 
PPTX
BB FlashBack Pro 5.61.0.4843 With Crack Free Download
cracked shares
 
PPTX
PCC IT Forum 2025 - Legislative Technology Snapshot
Gareth Oakes
 
PDF
Ready Layer One: Intro to the Model Context Protocol
mmckenna1
 
PDF
Notification System for Construction Logistics Application
Safe Software
 
Employee salary prediction using Machine learning Project template.ppt
bhanuk27082004
 
SAP GUI Installation Guide for macOS (iOS) | Connect to SAP Systems on Mac
SAP Vista, an A L T Z E N Company
 
10 posting ideas for community engagement with AI prompts
Pankaj Taneja
 
API DOCUMENTATION | API INTEGRATION PLATFORM
philipnathen82
 
Odoo Migration Services by CandidRoot Solutions
CandidRoot Solutions Private Limited
 
How to Download and Install ADT (ABAP Development Tools) for Eclipse IDE | SA...
SAP Vista, an A L T Z E N Company
 
MiniTool Partition Wizard Crack 12.8 + Serial Key Download Latest [2025]
filmoracrack9001
 
Introduction to Apache Iceberg™ & Tableflow
Alluxio, Inc.
 
How AI in Healthcare Apps Can Help You Enhance Patient Care?
Lilly Gracia
 
Operations Profile SPDX_Update_20250711_Example_05_03.pptx
Shane Coughlan
 
Summary Of Odoo 18.1 to 18.4 : The Way For Odoo 19
CandidRoot Solutions Private Limited
 
Australian Enterprises Need Project Service Automation
Navision India
 
Top 10 AI Use Cases Every Business Should Know.pdf
nicogonzalez1075
 
How to get the licensing right for Microsoft Core Infrastructure Server Suite...
Q-Advise
 
Odoo Customization Services by CandidRoot Solutions
CandidRoot Solutions Private Limited
 
Optimizing Tiered Storage for Low-Latency Real-Time Analytics at AI Scale
Alluxio, Inc.
 
BB FlashBack Pro 5.61.0.4843 With Crack Free Download
cracked shares
 
PCC IT Forum 2025 - Legislative Technology Snapshot
Gareth Oakes
 
Ready Layer One: Intro to the Model Context Protocol
mmckenna1
 
Notification System for Construction Logistics Application
Safe Software
 

Reactive Java (33rd Degree)

  • 2.  Reactive view of the world  Desgining interface for reactive interactions (the cat experiment)  Rx Java as implementation of that interface  Lessons learned STUFF I WILL TALK ABOUT
  • 3. REACTIVE “readily responsive to a stimulus” Merriam-Webster dictionary
  • 4. SYNCHRONOUS PULL COMMUNICATION Client Server Request Response Server processing Network latency
  • 6. SERVICE RETURNING OBSERVABLE public interface ShrödingersCat { boolean alive(); } public interface ShrödingersCat { Future<Boolean> alive(); } public interface ShrödingersCat { Iterator<Boolean> alive(); }
  • 8. PULL VS PUSH Observer Observable Subscribe ... onNext Server processing Network latency Maybe this one needs all the data...
  • 9. RX JAVA BY NETFLIX  Open source project with Apache License.  Java implementation of Rx Observables from Microsoft  The Netflix API uses it to make the entire service layer asynchronous  Provides a DSL for creating computation flows out of asynchronous sources using collection of operators for filtering, selecting, transforming and combining that flows in a lazy manner  These flows are called Observables – collection of events with push semantics (as oposed to pull in Iterator)  Targets the JVM not a language. Currently supports Java, Groovy, Clojure, and Scala
  • 10. OBSERVABLE public interface ShrödingersCat { Observable<Boolean> alive(); }
  • 11. SERVICE RETURNING OBSERVABLE public interface ShrödingersCat { Observable<Boolean> alive(); } cat .alive() .subscribe(status -> System.out.println(status));
  • 12. public interface ShrödingersCat { Observable<Boolean> alive(); } cat .alive() .throttleWithTimeout(250, TimeUnit.MILLISECONDS) .distinctUntilChanged() .filter(isAlive -> isAlive) .map(Boolean::toString) .subscribe(status -> display.display(status)); SERVICE RETURNING OBSERVABLE
  • 13.  Maybe it executes its logic on subscriber thread?  Maybe it delegates part of the work to other threads?  Does it use NIO?  Maybe its an actor?  Does it return cached data?  Observer does not care! HOW IS THE OBSERVABLE IMPLEMENTED?
  • 19. Observable<ShrödingersCat> cats = listAllCats(); cats .flatMap(cat -> Observable .from(catService.getPicturesFor(cat)) .filter(image -> image.size() < 100 * 1000) ) ).subscribe(); FLATMAP(FUNC)
  • 20. CACHE Random random = new Random(); Observable<Integer> observable = Observable .range(1, 100) .map(random::nextInt) .cache(); observable.subscribe(System.out::println); observable.subscribe(System.out::println); ...  Always prints same values
  • 21. INJECTING CUSTOM OPERATORS USING LIFT class InternStrings implements Observable.Operator<String, String> { public Subscriber<String> call(Subscriber<String> subscriber) { return new Subscriber<String>() { public void onCompleted() { subscriber.onCompleted(); } public void onError(Throwable e) { subscriber.onError(e); } public void onNext(String s) { subscriber.onNext(s.intern()); }; } } Observable.from("AB", "CD", "AB", "DE") .lift(new InternStrings()) .subscribe();  Valuable for instrumentation  Inject debug code – see rxjava-contrib/rxjava-debug  Inject performance counters
  • 22. ERROR HANDLING  Correctly implemented observable will not produce any events after error notification  Operators available for fixing observables not adhering to this rule  Pass custom error handling function to subscribe  Transparently substite failing observable with another one  Convert error into regular event  Retry subscription in hope this time it will work...
  • 23. ESCAPING THE MONAD Iterable<String> strings = Observable.from(1, 2, 3, 4) .map(i -> Integer.toString(i)) .toBlockingObservable() .toIterable(); // or (and many more) T firstOrDefault(T defaultValue, Func1 predicate) Iterator<T> getIterator() Iterable<T> next()  Inverses the dependency, will wait for next item, then execute  Usually to interact with other, synchronous APIs  While migrating to reactive approach in small increments  To trigger early evaluation while debugging
  • 24. OBSERVER public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T args); }
  • 25. CREATING OBSERVABLES Observable<Boolean> watchTheCat = Observable.create(observer -> { observer.onNext(cat.isAlive()); observer.onCompleted(); });  create accepts OnSubscribe function  Executed for every subscriber upon subscription  This example is not asynchronous
  • 26. CREATING OBSERVABLES Observable.create(observer -> { Future<?> brighterFuture = executorService.submit(() -> { observer.onNext(cat.isAlive()); observer.onCompleted(); }); subscriber.add(Subscriptions.from(brighterFuture)); });  Executes code in separate thread (from thread pool executorService)  Stream of events is delivered by the executor thread  Thread calling onNext() runs all the operations defined on observable  Future is cancelled if client unsubscribes
  • 27. CREATING OBSERVABLES Observable<Boolean> watchTheCat = Observable.create(observer -> { observer.onNext(cat.isAlive()); observer.onCompleted(); }) .subscribeOn(scheduler);  Subscribe function is executed on supplied scheduler (thin wrapper over java.util.concurrent.Executor)
  • 28. SUBSCRIPTION public interface Subscription { void unsubscribe(); boolean isUnsubscribed(); }
  • 29. UNSUBSCRIBING Observable.create(subscriber -> { for (long i = 0; !subscriber.isUnsubscribed(); i++) { subscriber.onNext(i); System.out.println("Emitted: " + i); } subscriber.onCompleted(); }) .take(10) .subscribe(aLong -> { System.out.println("Got: " + aLong); });  Take operator unsubscribes from observable after 10 iterations
  • 30. CONCURRENCY  Synchronous vs. asynchonous, single or multiple threaded is implementation detail of service provider (Observable)  As long as onNext calls are not executed concurrently  So the framework does not have to synchronize everything  Operators combining many Observables ensure serialized access  In face of misbehaving observable serialize() operator forces correct behaviour  Passing pure functions to Rx operators is always the best bet
  • 31. LESSONS LEARNED  In our use cases performance profile is dominated by other system components  Performance depends on implementation of used operators and may vary  Contention points on operators that merge streams  Carelessly creating 1000s threads (one for each task) when NewThreadScheduler used. Reaching `ulimit –u` - and system almost freezes :)  Current version (0.19) has very sane defaults though  Debugging and reasoning about subscriptions is not always easy.  Insert doOnEach or doOnNext calls for debugging  IDE support not satisfactory, problems in placing breakpoints inside closures – IntelliJ IDEA 13 has smart step into closures which my help
  • 32. MORE INFORMATION  https://github.com/Netflix/RxJava  https://github.com/Netflix/RxJava/wiki  http://www.infoq.com/author/Erik-Meijer  React conference http://www.youtube.com/playlist?list=PLSD48HvrE7- Z1stQ1vIIBumB0wK0s8llY  Cat picture taken from http://www.teckler.com/en/Rapunzel

Editor's Notes

  • #19: Transform the items emitted by an Observable into Observables, then flatten this into a single Observable