这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public boolean run(final LazyStreamWrapper<U> lastActive, final EmptyCollector<U

}

public Continuation runContinuations(final LazyStreamWrapper lastActive, final EmptyCollector collector) {
public Continuation

runContinuations(final LazyStreamWrapper lastActive, final EmptyCollector collector, boolean blocking) {

final Iterator<FastFuture> it = lastActive.injectFutures()
.iterator();
Expand All @@ -48,12 +50,23 @@ public Continuation runContinuations(final LazyStreamWrapper lastActive, final E

final Continuation finish = new Continuation(
() -> {
collector.afterResults(()->{
runnable.run();
throw new ClosedQueueException();
});
return Continuation.empty();

collector.getResults();
runnable.run();
throw new ClosedQueueException();

});
final Continuation blockingFinish = new Continuation(
() -> {

collector.getResults();
runnable.run();
throw new ClosedQueueException();


});
final Continuation finishNoCollect = new Continuation(
() -> {
runnable.run();
Expand All @@ -70,15 +83,15 @@ public Continuation runContinuations(final LazyStreamWrapper lastActive, final E

final FastFuture f = it.next();

handleFilter(cont, f);//if completableFuture has been filtered out, we need to move to the next one instead
handleFilter(cont, f);//if FastFuture has been filtered out, we need to move to the next one instead

collector.accept(f);
}

if (it.hasNext())
return cont[0];
else {
return finish.proceed();
return blocking ? blockingFinish.proceed() : finish.proceed();
}
} catch (final SimpleReactProcessingException e) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,36 @@ public void block(final Function<FastFuture<T>, T> safeJoin) {

}

/*
* @return zero list
* @see com.oath.cyclops.react.collectors.lazy.LazyResultConsumer#getResults()
*/
@Override
public Collection<FastFuture<T>> getResults() {
active.stream()
.forEach(cf -> safeJoin.apply(cf));
active.clear();
return new ArrayList<>();


public void afterResults(Runnable r){

if (active.size() > 0) {
FastFuture.allOf(()->{
active.stream()
.filter(cf -> cf.isDone())
.peek(this::handleExceptions)
.collect(Collectors.toList());
r.run();
}, active.toArray(new FastFuture[0]));
}else{
active.stream()
.filter(cf -> cf.isDone())
.peek(this::handleExceptions)
.collect(Collectors.toList());
r.run();
}
}
@Override
public Collection<FastFuture<T>> getResults() {
active.stream()
.forEach(cf -> safeJoin.apply(cf));
active.clear();
return new ArrayList<>();
}

/*

/*
* @return zero list
* @see com.oath.cyclops.react.collectors.lazy.LazyResultConsumer#getAllResults()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ static Void captureGeneral(final Throwable t, final Optional<Consumer<Throwable>
errorHandler.ifPresent((handler) -> handler.accept(t));
return null;
}
@SuppressWarnings("rawtypes")
public static Object throwSafe(final FastFuture next, final Optional<Consumer<Throwable>> errorHandler) {
try {
return next.join();
} catch (final SimpleReactCompletionException e) {
capture(e.getCause(), errorHandler);
} catch (final RuntimeException e) {
throw e;
} catch (final Exception e) {
capture(e, errorHandler);
}

return MissingValue.MISSING_VALUE;
}

@SuppressWarnings("rawtypes")
public static Object getSafe(final FastFuture next, final Optional<Consumer<Throwable>> errorHandler) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ default Continuation runContinuation(final Runnable r) {
return new Runner(
r).runContinuations(getLastActive(),
new EmptyCollector(
getMaxActive(), safeJoin));
getMaxActive(), safeJoin),false);

}
default Continuation blockingContinuation(final Runnable r) {
final Function<FastFuture, U> safeJoin = (final FastFuture cf) -> (U) BlockingStreamHelper.getSafe(cf, getErrorHandler());
return new Runner(
r).runContinuations(getLastActive(),
new EmptyCollector(
getMaxActive(), safeJoin),true);

}

Expand Down Expand Up @@ -150,6 +158,7 @@ default void forEach(final Consumer<? super U> c) {
if (getLastActive().isSequential()) {
//if single threaded we can simply push from each Future into the toX to be returned
try {

this.getLastActive()
.operation(f -> f.peek(c))
.injectFutures()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,19 @@ default Queue<U> toQueue(final Function<Queue, Queue> fn) {

@Override
default void addToQueue(final Queue queue) {

final Continuation continuation = thenSync(queue::add).self(s -> {
FutureStream str = thenSync(queue::add).self(s -> {
if (this.getPopulator()
.isPoolingActive())
.isPoolingActive())
s.peekSync(v -> {
throw new CompletedException(
v);
v);
});
}).runContinuation(() -> {throw new ClosedQueueException();
});


final Continuation continuation = queue.getContinuationStrategy().isBlocking() ? str.blockingContinuation(() -> {
throw new ClosedQueueException();
}) : str.runContinuation(() -> {throw new ClosedQueueException();
}
);
queue.addContinuation(continuation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2265,12 +2265,12 @@ default ReactiveSeq<U> dropUntilInclusive(final Predicate<? super U> p){
@Override
default FutureStream<U> takeWhile(final Predicate<? super U> predicate) {
return fromStream(ReactiveSeq.oneShotStream(stream())
.takeWhile(predicate));
.takeWhile(predicate));
}
@Override
default FutureStream<U> takeWhileInclusive(final Predicate<? super U> predicate) {
return fromStream(ReactiveSeq.oneShotStream(stream())
.takeWhileInclusive(predicate));
.takeWhileInclusive(predicate));
}

/**
Expand All @@ -2283,7 +2283,7 @@ default FutureStream<U> takeWhileInclusive(final Predicate<? super U> predicate)
*/
@Override
default FutureStream<U> takeUntil(final Predicate<? super U> predicate) {
return fromStream(LazyFutureStreamFunctions.takeUntil(this, predicate));
return fromStream(LazyFutureStreamFunctions.takeUntil(this,predicate));
}
@Override
default FutureStream<U> takeUntilInclusive(final Predicate<? super U> p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.junit.Assert.assertTrue;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import cyclops.futurestream.FutureStream;
Expand Down Expand Up @@ -55,12 +56,14 @@ public void longRunForEach(){
}
@Test
public void longRun(){
AtomicInteger count = new AtomicInteger(0);
new LazyReact().autoOptimizeOn().range(0, 1_000_000)
.map(i->i+2)
.map(i->Thread.currentThread().getId())
// .peek(System.out::println)
.peek(i->count.incrementAndGet())
//.peek(System.out::println)
.runOnCurrent();
System.out.println("Finished!");
System.out.println("Finished! " + count.get());
}
@Override
public void testSkipUntilWithNullsInclusive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -44,6 +45,24 @@ public void testCycleLong() {
assertEquals(asList(1, 2, 1, 2, 1, 2).size(),of(1, 2).cycle(3).to(ReactiveConvertableSequence::converter).listX().size());
assertEquals(asList(1, 2, 3, 1, 2, 3).size(), of(1, 2, 3).cycle(2).to(ReactiveConvertableSequence::converter).listX().size());
}

@Test
public void takeWhileWithSleep(){
new LazyReact().<Integer>fromIterableAsync(Arrays.asList(()->1,()->{
System.out.println("Sleep one! " + Thread.currentThread().getId());
sleep(1000000000);
return 200;
})).map(i->{
sleep(100);
System.out.println("Map " + Thread.currentThread().getId());
return i+1;
}).takeWhile(i->{
System.out.println("Take while " + Thread.currentThread().getId());
return false;
}).forEach(System.out::println);

// sleep(10000);
}
@Test
public void copy(){
of(1,2,3,4,5,6)
Expand All @@ -67,9 +86,13 @@ public void lazyCollection(){
}
@Test
public void switchOnNextMultiple(){
for(int i=0;i<500;i++) {
assertThat(react(() -> 1, () -> 2).mergeLatest(react(() -> 'a', () -> 'b'),
react(() -> 100, () -> 200)).toList().size(), equalTo(6));

for(int i=0;i<5000;i++) {
List<Object> list = react(() -> 1, () -> 2).mergeLatest(react(() -> 'a', () -> 'b'),
react(() -> 100, () -> 200)).toList();
System.out.println(list);

assertThat(list.size(), equalTo(6));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.stream.Stream;

import com.oath.cyclops.util.ExceptionSoftener;
import cyclops.control.Option;
import cyclops.reactive.ReactiveSeq;


import com.oath.cyclops.react.async.subscription.Continueable;
import com.oath.cyclops.types.futurestream.Continuation;

import lombok.AllArgsConstructor;
import sun.invoke.empty.Empty;

public interface AdaptersModule {

Expand All @@ -43,21 +45,27 @@ public void addContinuation(final Continuation c) {

}

public boolean isBlocking(){
return true;
}
@Override
public void handleContinuation() {

continuation = ReactiveSeq.fromIterable(continuation)
.<Optional<Continuation>> map(c -> {
.concatMap(c -> {
try {
return Optional.of(c.proceed());
} catch (final Queue.ClosedQueueException e) {
Continuation next = c.proceed();
if(next instanceof Continuation.EmptyRunnableContinuation) {
((Continuation.EmptyRunnableContinuation)next).run();
return Option.some(next);
}

return Optional.empty();
return Option.some(next);
} catch (final Queue.ClosedQueueException e) {
return Option.none();
}

})
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if (continuation.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ public interface ContinuationStrategy {
public void addContinuation(Continuation c);

public void handleContinuation();

default boolean isBlocking(){
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class Queue<T> implements Adapter<T> {
private volatile Signal<Integer> sizeSignal;

private volatile Continueable sub;
@Getter
private ContinuationStrategy continuationStrategy;
private volatile boolean shuttingDown = false;

Expand Down Expand Up @@ -472,6 +473,7 @@ public ClosedQueueException(List currentData) {

public ClosedQueueException() {
currentData = null;

}

public boolean isDataPresent() {
Expand All @@ -482,7 +484,6 @@ public boolean isDataPresent() {
public Throwable fillInStackTrace() {
return this;
}

}

/**
Expand Down
Loading