/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer.async;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import kafka.producer.SyncProducer;
import kafka.producer.async.CallbackHandler;
import kafka.producer.async.EventHandler;
import kafka.producer.async.IllegalQueueStateException;
import kafka.producer.async.ProducerSendThread$;
import kafka.producer.async.QueueItem;
import kafka.serializer.Encoder;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.SystemTime$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.mutable.ListBuffer;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005ed!B\u0001\u0003\u0001\tA!A\u0005)s_\u0012,8-\u001a:TK:$G\u000b\u001b:fC\u0012T!a\u0001\u0003\u0002\u000b\u0005\u001c\u0018P\\2\u000b\u0005\u00151\u0011\u0001\u00039s_\u0012,8-\u001a:\u000b\u0003\u001d\tQa[1gW\u0006,\"!\u0003\u001f\u0014\t\u0001Q!\u0003\u0007\t\u0003\u0017Ai\u0011\u0001\u0004\u0006\u0003\u001b9\tA\u0001\\1oO*\tq\"\u0001\u0003kCZ\f\u0017BA\t\r\u0005\u0019!\u0006N]3bIB\u00111CF\u0007\u0002))\u0011QCB\u0001\u0006kRLGn]\u0005\u0003/Q\u0011q\u0001T8hO&tw\r\u0005\u0002\u001a95\t!DC\u0001\u001c\u0003\u0015\u00198-\u00197b\u0013\ti\"DA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\u0002C\u0010\u0001\u0005\u000b\u0007I\u0011A\u0011\u0002\u0015QD'/Z1e\u001d\u0006lWm\u0001\u0001\u0016\u0003\t\u0002\"a\t\u0014\u000f\u0005e!\u0013BA\u0013\u001b\u0003\u0019\u0001&/\u001a3fM&\u0011q\u0005\u000b\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u0015R\u0002\u0002\u0003\u0016\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0012\u0002\u0017QD'/Z1e\u001d\u0006lW\r\t\u0005\tY\u0001\u0011)\u0019!C\u0001[\u0005)\u0011/^3vKV\ta\u0006E\u00020iYj\u0011\u0001\r\u0006\u0003cI\n!bY8oGV\u0014(/\u001a8u\u0015\t\u0019d\"\u0001\u0003vi&d\u0017BA\u001b1\u00055\u0011En\\2lS:<\u0017+^3vKB\u0019q\u0007\u000f\u001e\u000e\u0003\tI!!\u000f\u0002\u0003\u0013E+X-^3Ji\u0016l\u0007CA\u001e=\u0019\u0001!\u0001\"\u0010\u0001\u0005\u0002\u0003\u0015\rA\u0010\u0002\u0002)F\u0011qH\u0011\t\u00033\u0001K!!\u0011\u000e\u0003\u000f9{G\u000f[5oOB\u0011\u0011dQ\u0005\u0003\tj\u00111!\u00118z\u0011!1\u0005A!A!\u0002\u0013q\u0013AB9vKV,\u0007\u0005\u0003\u0005I\u0001\t\u0015\r\u0011\"\u0001J\u0003)\u0019XM]5bY&TXM]\u000b\u0002\u0015B\u00191*\u0014\u001e\u000e\u00031S!\u0001\u0013\u0004\n\u00059c%aB#oG>$WM\u001d\u0005\t!\u0002\u0011\t\u0011)A\u0005\u0015\u0006Y1/\u001a:jC2L'0\u001a:!\u0011!\u0011\u0006A!b\u0001\n\u0003\u0019\u0016AE;oI\u0016\u0014H._5oOB\u0013x\u000eZ;dKJ,\u0012\u0001\u0016\t\u0003+Zk\u0011\u0001B\u0005\u0003/\u0012\u0011AbU=oGB\u0013x\u000eZ;dKJD\u0001\"\u0017\u0001\u0003\u0002\u0003\u0006I\u0001V\u0001\u0014k:$WM\u001d7zS:<\u0007K]8ek\u000e,'\u000f\t\u0005\t7\u0002\u0011)\u0019!C\u00019\u00069\u0001.\u00198eY\u0016\u0014X#A/\u0011\u0007]r&(\u0003\u0002`\u0005\taQI^3oi\"\u000bg\u000e\u001a7fe\"A\u0011\r\u0001B\u0001B\u0003%Q,\u0001\u0005iC:$G.\u001a:!\u0011!\u0019\u0007A!b\u0001\n\u0003!\u0017AC2cW\"\u000bg\u000e\u001a7feV\tQ\rE\u00028MjJ!a\u001a\u0002\u0003\u001f\r\u000bG\u000e\u001c2bG.D\u0015M\u001c3mKJD\u0001\"\u001b\u0001\u0003\u0002\u0003\u0006I!Z\u0001\fG\n\\\u0007*\u00198eY\u0016\u0014\b\u0005\u0003\u0005l\u0001\t\u0015\r\u0011\"\u0001m\u0003%\tX/Z;f)&lW-F\u0001n!\tIb.\u0003\u0002p5\t!Aj\u001c8h\u0011!\t\bA!A!\u0002\u0013i\u0017AC9vKV,G+[7fA!A1\u000f\u0001BC\u0002\u0013\u0005A/A\u0005cCR\u001c\u0007nU5{KV\tQ\u000f\u0005\u0002\u001am&\u0011qO\u0007\u0002\u0004\u0013:$\b\u0002C=\u0001\u0005\u0003\u0005\u000b\u0011B;\u0002\u0015\t\fGo\u00195TSj,\u0007\u0005\u0003\u0005|\u0001\t\u0015\r\u0011\"\u0001}\u0003=\u0019\b.\u001e;e_^t7i\\7nC:$W#\u0001\"\t\u0011y\u0004!\u0011!Q\u0001\n\t\u000b\u0001c\u001d5vi\u0012|wO\\\"p[6\fg\u000e\u001a\u0011\t\u000f\u0005\u0005\u0001\u0001\"\u0001\u0002\u0004\u00051A(\u001b8jiz\"B#!\u0002\u0002\b\u0005%\u00111BA\u0007\u0003\u001f\t\t\"a\u0005\u0002\u0016\u0005]\u0001cA\u001c\u0001u!)qd a\u0001E!)Af a\u0001]!)\u0001j a\u0001\u0015\")!k a\u0001)\")1l a\u0001;\")1m a\u0001K\")1n a\u0001[\")1o a\u0001k\")1p a\u0001\u0005\"I\u00111\u0004\u0001C\u0002\u0013%\u0011QD\u0001\u000eg\",H\u000fZ8x]2\u000bGo\u00195\u0016\u0005\u0005}\u0001cA\u0018\u0002\"%\u0019\u00111\u0005\u0019\u0003\u001d\r{WO\u001c;E_^tG*\u0019;dQ\"A\u0011q\u0005\u0001!\u0002\u0013\ty\"\u0001\btQV$Hm\\<o\u0019\u0006$8\r\u001b\u0011\t\u000f\u0005-\u0002\u0001\"\u0011\u0002.\u0005\u0019!/\u001e8\u0015\u0005\u0005=\u0002cA\r\u00022%\u0019\u00111\u0007\u000e\u0003\tUs\u0017\u000e\u001e\u0005\b\u0003o\u0001A\u0011AA\u001d\u00035\tw/Y5u'\",H\u000fZ8x]V\u0011\u0011q\u0006\u0005\b\u0003{\u0001A\u0011AA\u001d\u0003!\u0019\b.\u001e;e_^t\u0007bBA!\u0001\u0011%\u00111I\u0001\u000eaJ|7-Z:t\u000bZ,g\u000e^:\u0015\u0005\u0005\u0015\u0003#BA$\u0003/2d\u0002BA%\u0003'rA!a\u0013\u0002R5\u0011\u0011Q\n\u0006\u0004\u0003\u001f\u0002\u0013A\u0002\u001fs_>$h(C\u0001\u001c\u0013\r\t)FG\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tI&a\u0017\u0003\u0007M+\u0017OC\u0002\u0002ViAq!a\u0018\u0001\t\u0003\t\t'A\u0006uef$v\u000eS1oI2,G\u0003BA\u0018\u0003GB\u0001\"!\u001a\u0002^\u0001\u0007\u0011QI\u0001\u0007KZ,g\u000e^:\t\u000f\u0005%\u0004\u0001\"\u0003\u0002l\u0005IAn\\4Fm\u0016tGo\u001d\u000b\u0007\u0003_\ti'!\u001d\t\u000f\u0005=\u0014q\ra\u0001E\u0005\u0019A/Y4\t\u0011\u0005\u0015\u0014q\ra\u0001\u0003g\u0002R!a\u0012\u0002vYJA!a\u001e\u0002\\\tA\u0011\n^3sC\ndW\r")
public class ProducerSendThread<T>
extends Thread
implements Logging,
ScalaObject {
    private final String threadName;
    private final BlockingQueue<QueueItem<T>> queue;
    private final Encoder<T> serializer;
    private final SyncProducer underlyingProducer;
    private final EventHandler<T> handler;
    private final CallbackHandler<T> cbkHandler;
    private final long queueTime;
    private final int batchSize;
    private final Object shutdownCommand;
    private final CountDownLatch shutdownLatch;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    public volatile int bitmap$0;

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Logger logger() {
        if ((this.bitmap$0 & 1) != 0) return this.logger;
        ProducerSendThread producerSendThread = this;
        synchronized (producerSendThread) {
            if ((this.bitmap$0 & 1) == 0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 |= 1;
            }
            return this.logger;
        }
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String string) {
        this.logIdent = string;
    }

    @Override
    public final Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String string) {
        this.loggerName = string;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ log4jController$) {
        this.kafka$utils$Logging$$log4jController = log4jController$;
    }

    public void trace(Function0 msg) {
        Logging$class.trace(this, msg);
    }

    public Object trace(Function0 e) {
        return Logging$class.trace(this, e);
    }

    public void trace(Function0 msg, Function0 e) {
        Logging$class.trace(this, msg, e);
    }

    public void debug(Function0 msg) {
        Logging$class.debug(this, msg);
    }

    public Object debug(Function0 e) {
        return Logging$class.debug(this, e);
    }

    public void debug(Function0 msg, Function0 e) {
        Logging$class.debug(this, msg, e);
    }

    public void info(Function0 msg) {
        Logging$class.info(this, msg);
    }

    public Object info(Function0 e) {
        return Logging$class.info(this, e);
    }

    public void info(Function0 msg, Function0 e) {
        Logging$class.info(this, msg, e);
    }

    public void warn(Function0 msg) {
        Logging$class.warn(this, msg);
    }

    public Object warn(Function0 e) {
        return Logging$class.warn(this, e);
    }

    public void warn(Function0 msg, Function0 e) {
        Logging$class.warn(this, msg, e);
    }

    public void error(Function0 msg) {
        Logging$class.error(this, msg);
    }

    public Object error(Function0 e) {
        return Logging$class.error(this, e);
    }

    public void error(Function0 msg, Function0 e) {
        Logging$class.error(this, msg, e);
    }

    public void fatal(Function0 msg) {
        Logging$class.fatal(this, msg);
    }

    public Object fatal(Function0 e) {
        return Logging$class.fatal(this, e);
    }

    public void fatal(Function0 msg, Function0 e) {
        Logging$class.fatal(this, msg, e);
    }

    public String threadName() {
        return this.threadName;
    }

    public BlockingQueue<QueueItem<T>> queue() {
        return this.queue;
    }

    public Encoder<T> serializer() {
        return this.serializer;
    }

    public SyncProducer underlyingProducer() {
        return this.underlyingProducer;
    }

    public EventHandler<T> handler() {
        return this.handler;
    }

    public CallbackHandler<T> cbkHandler() {
        return this.cbkHandler;
    }

    public long queueTime() {
        return this.queueTime;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public Object shutdownCommand() {
        return this.shutdownCommand;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    @Override
    public void run() {
        try {
            Seq<QueueItem<T>> remainingEvents$1 = this.processEvents();
            this.debug((Function0<String>)new $anonfun$run$1(this, remainingEvents$1));
            if (remainingEvents$1.size() > 0) {
                this.debug((Function0<String>)new $anonfun$run$2(this, remainingEvents$1));
                this.tryToHandle(remainingEvents$1);
            }
            this.shutdownLatch().countDown();
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new $anonfun$run$3(this), (Function0<Throwable>)new $anonfun$run$4(this, throwable));
        }
        return;
        {
            finally {
                this.shutdownLatch().countDown();
            }
        }
    }

    public void awaitShutdown() {
        this.shutdownLatch().await();
    }

    public void shutdown() {
        this.handler().close();
        this.info((Function0<String>)new $anonfun$shutdown$1(this));
    }

    private Seq<QueueItem<T>> processEvents() {
        LongRef lastSend$1 = new LongRef(SystemTime$.MODULE$.milliseconds());
        ObjectRef events$2 = new ObjectRef((Object)new ListBuffer());
        BooleanRef full$1 = new BooleanRef(false);
        package$.MODULE$.Stream().continually((Function0)new $anonfun$processEvents$1(this, lastSend$1)).takeWhile((Function1)new $anonfun$processEvents$2(this)).foreach((Function1)new $anonfun$processEvents$3(this, lastSend$1, events$2, full$1));
        if (this.queue().size() > 0) {
            throw new IllegalQueueStateException(Predef$.MODULE$.augmentString("Invalid queue state! After queue shutdown, %d remaining items in the queue").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.queue().size())})));
        }
        if (this.cbkHandler() != null) {
            this.info((Function0<String>)new $anonfun$processEvents$4(this, events$2));
            scala.collection.mutable.Seq<QueueItem<T>> addedEvents = this.cbkHandler().lastBatchBeforeClose();
            this.logEvents("last batch before close", (Iterable<QueueItem<T>>)addedEvents);
            events$2.elem = (ListBuffer)((ListBuffer)events$2.elem).$plus$plus(addedEvents);
        }
        return (ListBuffer)events$2.elem;
    }

    public void tryToHandle(Seq<QueueItem<T>> events$1) {
        try {
            this.debug((Function0<String>)new $anonfun$tryToHandle$1(this, events$1));
            if (events$1.size() > 0) {
                this.handler().handle(events$1, this.underlyingProducer(), this.serializer());
            }
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new $anonfun$tryToHandle$2(this, events$1), (Function0<Throwable>)new $anonfun$tryToHandle$3(this, throwable));
        }
    }

    private void logEvents(String tag$1, Iterable<QueueItem<T>> events) {
        if (this.logger().isTraceEnabled()) {
            this.trace((Function0<String>)new $anonfun$logEvents$1(this, tag$1));
            events.foreach((Function1)new $anonfun$logEvents$2(this));
        }
    }

    public ProducerSendThread(String threadName, BlockingQueue<QueueItem<T>> queue, Encoder<T> serializer, SyncProducer underlyingProducer, EventHandler<T> handler, CallbackHandler<T> cbkHandler, long queueTime, int batchSize, Object shutdownCommand) {
        this.threadName = threadName;
        this.queue = queue;
        this.serializer = serializer;
        this.underlyingProducer = underlyingProducer;
        this.handler = handler;
        this.cbkHandler = cbkHandler;
        this.queueTime = queueTime;
        this.batchSize = batchSize;
        this.shutdownCommand = shutdownCommand;
        super(threadName);
        Logging$class.$init$(this);
        this.shutdownLatch = new CountDownLatch(1);
    }
}

