/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.common.util.concurrent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext;

public abstract class AsyncIOProcessor<Item> {
    private final Logger logger;
    private final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
    private final ThreadContext threadContext;
    private final Semaphore promiseSemaphore = new Semaphore(1);

    protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) {
        this.logger = logger;
        this.queue = new ArrayBlockingQueue(queueSize);
        this.threadContext = threadContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void put(Item item, Consumer<Exception> listener) {
        Objects.requireNonNull(item, "item must not be null");
        Objects.requireNonNull(listener, "listener must not be null");
        boolean promised = this.promiseSemaphore.tryAcquire();
        if (!promised) {
            try {
                this.queue.put(new Tuple<Item, Consumer<Exception>>(item, this.preserveContext(listener)));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                listener.accept(e);
            }
        }
        if (promised || this.promiseSemaphore.tryAcquire()) {
            ArrayList<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<Tuple<Item, Consumer<Exception>>>();
            try {
                if (promised) {
                    candidates.add(new Tuple<Item, Consumer<Exception>>(item, listener));
                }
                this.drainAndProcess(candidates);
            }
            finally {
                this.promiseSemaphore.release();
            }
            while (!this.queue.isEmpty() && this.promiseSemaphore.tryAcquire()) {
                try {
                    this.drainAndProcess(candidates);
                }
                finally {
                    this.promiseSemaphore.release();
                }
            }
        }
    }

    private void drainAndProcess(List<Tuple<Item, Consumer<Exception>>> candidates) {
        this.queue.drainTo(candidates);
        this.processList(candidates);
        candidates.clear();
    }

    private void processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
        Exception exception = null;
        if (!candidates.isEmpty()) {
            try {
                this.write(candidates);
            }
            catch (Exception ex) {
                this.logger.debug("failed to write candidates", (Throwable)ex);
                exception = ex;
            }
        }
        for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
            Consumer<Exception> consumer = tuple.v2();
            try {
                consumer.accept(exception);
            }
            catch (Exception ex) {
                this.logger.warn("failed to notify callback", (Throwable)ex);
            }
        }
    }

    private Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
        Supplier<ThreadContext.StoredContext> restorableContext = this.threadContext.newRestorableContext(false);
        return e -> {
            try (ThreadContext.StoredContext ignore = (ThreadContext.StoredContext)restorableContext.get();){
                consumer.accept((Exception)e);
            }
        };
    }

    protected abstract void write(List<Tuple<Item, Consumer<Exception>>> var1) throws IOException;
}

