diff --git a/reactor-core-2/pom.xml b/reactor-core-2/pom.xml index 6ba94fc18aa7..95c0745e87ec 100644 --- a/reactor-core-2/pom.xml +++ b/reactor-core-2/pom.xml @@ -37,6 +37,17 @@ ${lombok.version} test + + org.openjdk.jmh + jmh-core + ${jmh-core.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh-generator.version} + provided + diff --git a/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java new file mode 100644 index 000000000000..3bcddc1bacf5 --- /dev/null +++ b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java @@ -0,0 +1,8 @@ +package com.baeldung.reactor.flux.parallelflux; + +public class Fibonacci { + public static long fibonacci(int n) { + if (n <= 1) return n; + return fibonacci(n - 1) + fibonacci(n - 2); + } +} diff --git a/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java new file mode 100644 index 000000000000..e9ac5c3c304c --- /dev/null +++ b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java @@ -0,0 +1,33 @@ +package com.baeldung.reactor.flux.parallelflux; + +import org.openjdk.jmh.annotations.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Schedulers; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class FibonacciFluxParallelFluxBenchmark { + + @Benchmark + public List benchMarkParallelFluxSequential() { + ParallelFlux parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48) + .parallel(3) + .runOn(Schedulers.parallel()) + .map(Fibonacci::fibonacci); + + return parallelFluxFibonacci.sequential().collectList().block(); + } + + @Benchmark + public List benchMarkFluxSequential() { + Flux fluxFibonacci = Flux.just(43, 44, 45, 47, 48) + .map(Fibonacci::fibonacci); + + return fluxFibonacci.collectList().block(); + } +} diff --git a/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java new file mode 100644 index 000000000000..fc4a84717af4 --- /dev/null +++ b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java @@ -0,0 +1,37 @@ +package com.baeldung.reactor.flux.parallelflux; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.openjdk.jmh.Main; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import java.io.IOException; + +@Slf4j +public class FluxManualTest { + + @Test + public void givenFibonacciIndices_whenComputingWithFlux_thenRunBenchMarks() throws IOException { + Main.main(new String[] { + "com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkFluxSequential", + "-i", "3", + "-wi", "2", + "-f", "1" + }); + } + + @Test + public void givenFibonacciIndices_whenComputingWithFlux_thenCorrectResults() { + Flux fluxFibonacci = Flux.just(43, 44, 45, 47, 48) + .publishOn(Schedulers.boundedElastic()) + .map(Fibonacci::fibonacci); + + StepVerifier.create(fluxFibonacci) + .expectNext(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L) + .verifyComplete(); + + } + +} diff --git a/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java new file mode 100644 index 000000000000..bcff266916c5 --- /dev/null +++ b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java @@ -0,0 +1,67 @@ +package com.baeldung.reactor.flux.parallelflux; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.openjdk.jmh.Main; +import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +@Slf4j +public class ParallelFluxManualTest { + + @Test + public void givenFibonacciIndices_whenComputingWithParallelFlux_thenRunBenchMarks() throws IOException { + Main.main(new String[] { + "com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkParallelFluxSequential", + "-i", "3", + "-wi", "2", + "-f", "1" + }); + } + + @Test + public void givenFibonacciIndices_whenComputingWithParallelFlux_thenCorrectResults() { + ParallelFlux parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48) + .parallel(3) + .runOn(Schedulers.parallel()) + .map(Fibonacci::fibonacci); + + Flux sequencialParallelFlux = parallelFluxFibonacci.sequential(); + + Set expectedSet = new HashSet<>(Set.of(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L)); + + StepVerifier.create(sequencialParallelFlux) + .expectNextMatches(expectedSet::remove) + .expectNextMatches(expectedSet::remove) + .expectNextMatches(expectedSet::remove) + .expectNextMatches(expectedSet::remove) + .expectNextMatches(expectedSet::remove) + .verifyComplete(); + + } + + @RepeatedTest(5) + public void givenListOfIds_whenComputingWithParallelFlux_thenOrderChanges() { + ParallelFlux parallelFlux = Flux.just("id1", "id2", "id3") + .parallel(2) + .runOn(Schedulers.parallel()) + .map(String::toUpperCase); + + List emitted = new CopyOnWriteArrayList<>(); + + StepVerifier.create(parallelFlux.sequential().doOnNext(emitted::add)) + .expectNextCount(3) + .verifyComplete(); + + log.info("ParallelFlux emitted order: {}", emitted); + } +}