diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/internal/react/stream/Runner.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/internal/react/stream/Runner.java index 932d2e7f9a..5ef73e9b00 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/internal/react/stream/Runner.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/internal/react/stream/Runner.java @@ -39,7 +39,9 @@ public boolean run(final LazyStreamWrapper lastActive, final EmptyCollector it = lastActive.injectFutures() .iterator(); @@ -48,12 +50,23 @@ public Continuation runContinuations(final LazyStreamWrapper lastActive, final E final Continuation finish = new Continuation( () -> { + collector.afterResults(()->{ + runnable.run(); + throw new ClosedQueueException(); + }); + return Continuation.empty(); - collector.getResults(); - runnable.run(); - throw new ClosedQueueException(); }); + final Continuation blockingFinish = new Continuation( + () -> { + + collector.getResults(); + runnable.run(); + throw new ClosedQueueException(); + + + }); final Continuation finishNoCollect = new Continuation( () -> { runnable.run(); @@ -70,7 +83,7 @@ public Continuation runContinuations(final LazyStreamWrapper lastActive, final E final FastFuture f = it.next(); - handleFilter(cont, f);//if completableFuture has been filtered out, we need to move to the next one instead + handleFilter(cont, f);//if FastFuture has been filtered out, we need to move to the next one instead collector.accept(f); } @@ -78,7 +91,7 @@ public Continuation runContinuations(final LazyStreamWrapper lastActive, final E if (it.hasNext()) return cont[0]; else { - return finish.proceed(); + return blocking ? blockingFinish.proceed() : finish.proceed(); } } catch (final SimpleReactProcessingException e) { diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/react/collectors/lazy/EmptyCollector.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/react/collectors/lazy/EmptyCollector.java index feb086105f..0fd2cf00bf 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/react/collectors/lazy/EmptyCollector.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/react/collectors/lazy/EmptyCollector.java @@ -96,19 +96,36 @@ public void block(final Function, T> safeJoin) { } - /* - * @return zero list - * @see com.oath.cyclops.react.collectors.lazy.LazyResultConsumer#getResults() - */ - @Override - public Collection> getResults() { - active.stream() - .forEach(cf -> safeJoin.apply(cf)); - active.clear(); - return new ArrayList<>(); + + + public void afterResults(Runnable r){ + + if (active.size() > 0) { + FastFuture.allOf(()->{ + active.stream() + .filter(cf -> cf.isDone()) + .peek(this::handleExceptions) + .collect(Collectors.toList()); + r.run(); + }, active.toArray(new FastFuture[0])); + }else{ + active.stream() + .filter(cf -> cf.isDone()) + .peek(this::handleExceptions) + .collect(Collectors.toList()); + r.run(); + } } + @Override + public Collection> getResults() { + active.stream() + .forEach(cf -> safeJoin.apply(cf)); + active.clear(); + return new ArrayList<>(); + } - /* + + /* * @return zero list * @see com.oath.cyclops.react.collectors.lazy.LazyResultConsumer#getAllResults() */ diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java index 17ce9d8f4a..283a76ee89 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/BlockingStreamHelper.java @@ -85,6 +85,20 @@ static Void captureGeneral(final Throwable t, final Optional errorHandler.ifPresent((handler) -> handler.accept(t)); return null; } + @SuppressWarnings("rawtypes") + public static Object throwSafe(final FastFuture next, final Optional> errorHandler) { + try { + return next.join(); + } catch (final SimpleReactCompletionException e) { + capture(e.getCause(), errorHandler); + } catch (final RuntimeException e) { + throw e; + } catch (final Exception e) { + capture(e, errorHandler); + } + + return MissingValue.MISSING_VALUE; + } @SuppressWarnings("rawtypes") public static Object getSafe(final FastFuture next, final Optional> errorHandler) { diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java deleted file mode 100644 index dbc720890a..0000000000 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.oath.cyclops.types.futurestream; - -import java.util.function.Supplier; - -import lombok.AllArgsConstructor; - -@AllArgsConstructor -public class Continuation { - - private final Supplier remainderOfWorkToBeDone; - - public Continuation proceed() { - return remainderOfWorkToBeDone.get(); - } - - public static Continuation empty() { - - return new Continuation( - () -> empty()); - } -} diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java index 05b502c45b..4ca03db32b 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyStream.java @@ -70,7 +70,15 @@ default Continuation runContinuation(final Runnable r) { return new Runner( r).runContinuations(getLastActive(), new EmptyCollector( - getMaxActive(), safeJoin)); + getMaxActive(), safeJoin),false); + + } + default Continuation blockingContinuation(final Runnable r) { + final Function safeJoin = (final FastFuture cf) -> (U) BlockingStreamHelper.getSafe(cf, getErrorHandler()); + return new Runner( + r).runContinuations(getLastActive(), + new EmptyCollector( + getMaxActive(), safeJoin),true); } @@ -150,6 +158,7 @@ default void forEach(final Consumer c) { if (getLastActive().isSequential()) { //if single threaded we can simply push from each Future into the toX to be returned try { + this.getLastActive() .operation(f -> f.peek(c)) .injectFutures() diff --git a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyToQueue.java b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyToQueue.java index 5bdbe8ba85..d7e85c242a 100644 --- a/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyToQueue.java +++ b/cyclops-futurestream/src/main/java/com/oath/cyclops/types/futurestream/LazyToQueue.java @@ -73,15 +73,19 @@ default Queue toQueue(final Function fn) { @Override default void addToQueue(final Queue queue) { - - final Continuation continuation = thenSync(queue::add).self(s -> { + FutureStream str = thenSync(queue::add).self(s -> { if (this.getPopulator() - .isPoolingActive()) + .isPoolingActive()) s.peekSync(v -> { throw new CompletedException( - v); + v); }); - }).runContinuation(() -> {throw new ClosedQueueException(); + }); + + + final Continuation continuation = queue.getContinuationStrategy().isBlocking() ? str.blockingContinuation(() -> { + throw new ClosedQueueException(); + }) : str.runContinuation(() -> {throw new ClosedQueueException(); } ); queue.addContinuation(continuation); diff --git a/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java b/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java index cbf4d6ef20..33810d17eb 100644 --- a/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java +++ b/cyclops-futurestream/src/main/java/cyclops/futurestream/FutureStream.java @@ -2265,12 +2265,12 @@ default ReactiveSeq dropUntilInclusive(final Predicate p){ @Override default FutureStream takeWhile(final Predicate predicate) { return fromStream(ReactiveSeq.oneShotStream(stream()) - .takeWhile(predicate)); + .takeWhile(predicate)); } @Override default FutureStream takeWhileInclusive(final Predicate predicate) { return fromStream(ReactiveSeq.oneShotStream(stream()) - .takeWhileInclusive(predicate)); + .takeWhileInclusive(predicate)); } /** @@ -2283,7 +2283,7 @@ default FutureStream takeWhileInclusive(final Predicate predicate) */ @Override default FutureStream takeUntil(final Predicate predicate) { - return fromStream(LazyFutureStreamFunctions.takeUntil(this, predicate)); + return fromStream(LazyFutureStreamFunctions.takeUntil(this,predicate)); } @Override default FutureStream takeUntilInclusive(final Predicate p) { diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqAutoOptimizeTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqAutoOptimizeTest.java index 52883bf1a7..15d0587cd9 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqAutoOptimizeTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqAutoOptimizeTest.java @@ -6,6 +6,7 @@ import static org.junit.Assert.assertTrue; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import cyclops.futurestream.FutureStream; @@ -55,12 +56,14 @@ public void longRunForEach(){ } @Test public void longRun(){ + AtomicInteger count = new AtomicInteger(0); new LazyReact().autoOptimizeOn().range(0, 1_000_000) .map(i->i+2) .map(i->Thread.currentThread().getId()) - // .peek(System.out::println) + .peek(i->count.incrementAndGet()) + //.peek(System.out::println) .runOnCurrent(); - System.out.println("Finished!"); + System.out.println("Finished! " + count.get()); } @Override public void testSkipUntilWithNullsInclusive() { diff --git a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqTest.java b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqTest.java index 6de8f6ee13..b14a5881f3 100644 --- a/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqTest.java +++ b/cyclops-futurestream/src/test/java/cyclops/futurestream/react/lazy/LazySeqTest.java @@ -14,6 +14,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -44,6 +45,24 @@ public void testCycleLong() { assertEquals(asList(1, 2, 1, 2, 1, 2).size(),of(1, 2).cycle(3).to(ReactiveConvertableSequence::converter).listX().size()); assertEquals(asList(1, 2, 3, 1, 2, 3).size(), of(1, 2, 3).cycle(2).to(ReactiveConvertableSequence::converter).listX().size()); } + + @Test + public void takeWhileWithSleep(){ + new LazyReact().fromIterableAsync(Arrays.asList(()->1,()->{ + System.out.println("Sleep one! " + Thread.currentThread().getId()); + sleep(1000000000); + return 200; + })).map(i->{ + sleep(100); + System.out.println("Map " + Thread.currentThread().getId()); + return i+1; + }).takeWhile(i->{ + System.out.println("Take while " + Thread.currentThread().getId()); + return false; + }).forEach(System.out::println); + + // sleep(10000); + } @Test public void copy(){ of(1,2,3,4,5,6) @@ -67,9 +86,13 @@ public void lazyCollection(){ } @Test public void switchOnNextMultiple(){ - for(int i=0;i<500;i++) { - assertThat(react(() -> 1, () -> 2).mergeLatest(react(() -> 'a', () -> 'b'), - react(() -> 100, () -> 200)).toList().size(), equalTo(6)); + + for(int i=0;i<5000;i++) { + List list = react(() -> 1, () -> 2).mergeLatest(react(() -> 'a', () -> 'b'), + react(() -> 100, () -> 200)).toList(); + System.out.println(list); + + assertThat(list.size(), equalTo(6)); } } diff --git a/cyclops/src/main/java/com/oath/cyclops/async/adapters/AdaptersModule.java b/cyclops/src/main/java/com/oath/cyclops/async/adapters/AdaptersModule.java index 88763b0128..d9e8331f2d 100644 --- a/cyclops/src/main/java/com/oath/cyclops/async/adapters/AdaptersModule.java +++ b/cyclops/src/main/java/com/oath/cyclops/async/adapters/AdaptersModule.java @@ -19,6 +19,7 @@ import java.util.stream.Stream; import com.oath.cyclops.util.ExceptionSoftener; +import cyclops.control.Option; import cyclops.reactive.ReactiveSeq; @@ -26,6 +27,7 @@ import com.oath.cyclops.types.futurestream.Continuation; import lombok.AllArgsConstructor; +import sun.invoke.empty.Empty; public interface AdaptersModule { @@ -43,21 +45,27 @@ public void addContinuation(final Continuation c) { } + public boolean isBlocking(){ + return true; + } @Override public void handleContinuation() { continuation = ReactiveSeq.fromIterable(continuation) - .> map(c -> { + .concatMap(c -> { try { - return Optional.of(c.proceed()); - } catch (final Queue.ClosedQueueException e) { + Continuation next = c.proceed(); + if(next instanceof Continuation.EmptyRunnableContinuation) { + ((Continuation.EmptyRunnableContinuation)next).run(); + return Option.some(next); + } - return Optional.empty(); + return Option.some(next); + } catch (final Queue.ClosedQueueException e) { + return Option.none(); } }) - .filter(Optional::isPresent) - .map(Optional::get) .toList(); if (continuation.size() == 0) { diff --git a/cyclops/src/main/java/com/oath/cyclops/async/adapters/ContinuationStrategy.java b/cyclops/src/main/java/com/oath/cyclops/async/adapters/ContinuationStrategy.java index bb32df6ceb..3a7296ccd3 100644 --- a/cyclops/src/main/java/com/oath/cyclops/async/adapters/ContinuationStrategy.java +++ b/cyclops/src/main/java/com/oath/cyclops/async/adapters/ContinuationStrategy.java @@ -7,4 +7,8 @@ public interface ContinuationStrategy { public void addContinuation(Continuation c); public void handleContinuation(); + + default boolean isBlocking(){ + return false; + } } diff --git a/cyclops/src/main/java/com/oath/cyclops/async/adapters/Queue.java b/cyclops/src/main/java/com/oath/cyclops/async/adapters/Queue.java index 57c8404170..6284334a86 100644 --- a/cyclops/src/main/java/com/oath/cyclops/async/adapters/Queue.java +++ b/cyclops/src/main/java/com/oath/cyclops/async/adapters/Queue.java @@ -104,6 +104,7 @@ public class Queue implements Adapter { private volatile Signal sizeSignal; private volatile Continueable sub; + @Getter private ContinuationStrategy continuationStrategy; private volatile boolean shuttingDown = false; @@ -472,6 +473,7 @@ public ClosedQueueException(List currentData) { public ClosedQueueException() { currentData = null; + } public boolean isDataPresent() { @@ -482,7 +484,6 @@ public boolean isDataPresent() { public Throwable fillInStackTrace() { return this; } - } /** diff --git a/cyclops/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java b/cyclops/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java index 41a542fe55..98193c0e7e 100644 --- a/cyclops/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java +++ b/cyclops/src/main/java/com/oath/cyclops/types/futurestream/Continuation.java @@ -13,9 +13,33 @@ public Continuation proceed() { return remainderOfWorkToBeDone.get(); } - public static Continuation empty() { + public static Empty empty() { - return new Continuation( - () -> empty()); + return new Empty(); + } + public static EmptyRunnableContinuation emptyRunnable(Runnable r) { + + return new EmptyRunnableContinuation(r); + } + + public static class Empty extends Continuation { + + public Empty() { + super(() -> empty()); + } + } + + public static class EmptyRunnableContinuation extends Continuation implements Runnable { + final Runnable r; + public EmptyRunnableContinuation(Runnable r) { + super(() ->empty()); + this.r = r; + } + + @Override + public void run() { + + r.run(); + } } } diff --git a/gradle.properties b/gradle.properties index a2a89bb017..4205f38be0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version = 10.3.2 +version = 10.3.3 agronaVersion=0.9.33 reactiveStreamsVersion=1.0.2 reactorVersion=3.2.6.RELEASE