diff --git a/reactor-core-2/pom.xml b/reactor-core-2/pom.xml
index 6ba94fc18aa7..95c0745e87ec 100644
--- a/reactor-core-2/pom.xml
+++ b/reactor-core-2/pom.xml
@@ -37,6 +37,17 @@
${lombok.version}
test
+
+ org.openjdk.jmh
+ jmh-core
+ ${jmh-core.version}
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ ${jmh-generator.version}
+ provided
+
diff --git a/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java
new file mode 100644
index 000000000000..3bcddc1bacf5
--- /dev/null
+++ b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/Fibonacci.java
@@ -0,0 +1,8 @@
+package com.baeldung.reactor.flux.parallelflux;
+
+public class Fibonacci {
+ public static long fibonacci(int n) {
+ if (n <= 1) return n;
+ return fibonacci(n - 1) + fibonacci(n - 2);
+ }
+}
diff --git a/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java
new file mode 100644
index 000000000000..e9ac5c3c304c
--- /dev/null
+++ b/reactor-core-2/src/main/java/com/baeldung/reactor/flux/parallelflux/FibonacciFluxParallelFluxBenchmark.java
@@ -0,0 +1,33 @@
+package com.baeldung.reactor.flux.parallelflux;
+
+import org.openjdk.jmh.annotations.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.ParallelFlux;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Thread)
+public class FibonacciFluxParallelFluxBenchmark {
+
+ @Benchmark
+ public List benchMarkParallelFluxSequential() {
+ ParallelFlux parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48)
+ .parallel(3)
+ .runOn(Schedulers.parallel())
+ .map(Fibonacci::fibonacci);
+
+ return parallelFluxFibonacci.sequential().collectList().block();
+ }
+
+ @Benchmark
+ public List benchMarkFluxSequential() {
+ Flux fluxFibonacci = Flux.just(43, 44, 45, 47, 48)
+ .map(Fibonacci::fibonacci);
+
+ return fluxFibonacci.collectList().block();
+ }
+}
diff --git a/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java
new file mode 100644
index 000000000000..fc4a84717af4
--- /dev/null
+++ b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/FluxManualTest.java
@@ -0,0 +1,37 @@
+package com.baeldung.reactor.flux.parallelflux;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.openjdk.jmh.Main;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+
+@Slf4j
+public class FluxManualTest {
+
+ @Test
+ public void givenFibonacciIndices_whenComputingWithFlux_thenRunBenchMarks() throws IOException {
+ Main.main(new String[] {
+ "com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkFluxSequential",
+ "-i", "3",
+ "-wi", "2",
+ "-f", "1"
+ });
+ }
+
+ @Test
+ public void givenFibonacciIndices_whenComputingWithFlux_thenCorrectResults() {
+ Flux fluxFibonacci = Flux.just(43, 44, 45, 47, 48)
+ .publishOn(Schedulers.boundedElastic())
+ .map(Fibonacci::fibonacci);
+
+ StepVerifier.create(fluxFibonacci)
+ .expectNext(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L)
+ .verifyComplete();
+
+ }
+
+}
diff --git a/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java
new file mode 100644
index 000000000000..bcff266916c5
--- /dev/null
+++ b/reactor-core-2/src/test/java/com/baeldung/reactor/flux/parallelflux/ParallelFluxManualTest.java
@@ -0,0 +1,67 @@
+package com.baeldung.reactor.flux.parallelflux;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.openjdk.jmh.Main;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.ParallelFlux;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+@Slf4j
+public class ParallelFluxManualTest {
+
+ @Test
+ public void givenFibonacciIndices_whenComputingWithParallelFlux_thenRunBenchMarks() throws IOException {
+ Main.main(new String[] {
+ "com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkParallelFluxSequential",
+ "-i", "3",
+ "-wi", "2",
+ "-f", "1"
+ });
+ }
+
+ @Test
+ public void givenFibonacciIndices_whenComputingWithParallelFlux_thenCorrectResults() {
+ ParallelFlux parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48)
+ .parallel(3)
+ .runOn(Schedulers.parallel())
+ .map(Fibonacci::fibonacci);
+
+ Flux sequencialParallelFlux = parallelFluxFibonacci.sequential();
+
+ Set expectedSet = new HashSet<>(Set.of(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L));
+
+ StepVerifier.create(sequencialParallelFlux)
+ .expectNextMatches(expectedSet::remove)
+ .expectNextMatches(expectedSet::remove)
+ .expectNextMatches(expectedSet::remove)
+ .expectNextMatches(expectedSet::remove)
+ .expectNextMatches(expectedSet::remove)
+ .verifyComplete();
+
+ }
+
+ @RepeatedTest(5)
+ public void givenListOfIds_whenComputingWithParallelFlux_thenOrderChanges() {
+ ParallelFlux parallelFlux = Flux.just("id1", "id2", "id3")
+ .parallel(2)
+ .runOn(Schedulers.parallel())
+ .map(String::toUpperCase);
+
+ List emitted = new CopyOnWriteArrayList<>();
+
+ StepVerifier.create(parallelFlux.sequential().doOnNext(emitted::add))
+ .expectNextCount(3)
+ .verifyComplete();
+
+ log.info("ParallelFlux emitted order: {}", emitted);
+ }
+}