/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestParallelRead {
    static final Log LOG = LogFactory.getLog(TestParallelRead.class);
    static BlockReaderTestUtil util = null;
    static DFSClient dfsClient = null;
    static final int FILE_SIZE_K = 256;
    static Random rand = null;

    @BeforeClass
    public static void setupCluster() throws Exception {
        int REPLICATION_FACTOR = 2;
        util = new BlockReaderTestUtil(2);
        dfsClient = util.getDFSClient();
        rand = new Random(System.currentTimeMillis());
    }

    @Test
    public void testParallelRead() throws IOException {
        if (!this.runParallelRead(1, 4)) {
            Assert.fail((String)"Check log for errors");
        }
        if (!this.runParallelRead(1, 16)) {
            Assert.fail((String)"Check log for errors");
        }
        if (!this.runParallelRead(2, 4)) {
            Assert.fail((String)"Check log for errors");
        }
    }

    boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
        ReadWorker[] workers = new ReadWorker[nFiles * nWorkerEach];
        TestFileInfo[] testInfoArr = new TestFileInfo[nFiles];
        int nWorkers = 0;
        for (int i = 0; i < nFiles; ++i) {
            TestFileInfo testInfo;
            testInfoArr[i] = testInfo = new TestFileInfo();
            testInfo.filepath = new Path("/TestParallelRead.dat." + i);
            testInfo.authenticData = util.writeFile(testInfo.filepath, 256);
            testInfo.dis = dfsClient.open(testInfo.filepath.toString());
            for (int j = 0; j < nWorkerEach; ++j) {
                workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
            }
        }
        long starttime = System.currentTimeMillis();
        for (ReadWorker worker : workers) {
            worker.start();
        }
        for (ReadWorker worker : workers) {
            try {
                worker.join();
            }
            catch (InterruptedException ignored) {
                // empty catch block
            }
        }
        long endtime = System.currentTimeMillis();
        for (TestFileInfo testInfo : testInfoArr) {
            testInfo.dis.close();
        }
        boolean res = true;
        long totalRead = 0L;
        for (ReadWorker worker : workers) {
            long nread = worker.getBytesRead();
            LOG.info((Object)("--- Report: " + worker.getName() + " read " + nread + " B; " + "average " + nread / 1024L + " B per read"));
            totalRead += nread;
            if (!worker.hasError()) continue;
            res = false;
        }
        double timeTakenSec = (double)(endtime - starttime) / 1000.0;
        long totalReadKB = totalRead / 1024L;
        LOG.info((Object)("=== Report: " + nWorkers + " threads read " + totalReadKB + " KB (across " + nFiles + " file(s)) in " + timeTakenSec + "s; average " + (double)totalReadKB / timeTakenSec + " KB/s"));
        return res;
    }

    @AfterClass
    public static void teardownCluster() throws Exception {
        util.shutdown();
    }

    static {
        LogManager.getLogger((String)(DataNode.class.getName() + ".clienttrace")).setLevel(Level.WARN);
    }

    static class ReadWorker
    extends Thread {
        public static final int N_ITERATIONS = 1024;
        private static final double PROPORTION_NON_POSITIONAL_READ = 0.1;
        private TestFileInfo testInfo;
        private long fileSize;
        private long bytesRead;
        private boolean error;

        ReadWorker(TestFileInfo testInfo, int id) {
            super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
            this.testInfo = testInfo;
            this.fileSize = testInfo.dis.getFileLength();
            Assert.assertEquals((long)this.fileSize, (long)testInfo.authenticData.length);
            this.bytesRead = 0L;
            this.error = false;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1024; ++i) {
                int startOff = rand.nextInt((int)this.fileSize);
                int len = 0;
                try {
                    double p = rand.nextDouble();
                    if (p < 0.1) {
                        len = Math.min(rand.nextInt(64), (int)this.fileSize - startOff);
                        this.read(startOff, len);
                        this.bytesRead += (long)len;
                        continue;
                    }
                    len = rand.nextInt((int)(this.fileSize - (long)startOff));
                    this.pRead(startOff, len);
                    this.bytesRead += (long)len;
                    continue;
                }
                catch (Throwable t) {
                    LOG.error((Object)(this.getName() + ": Error while testing read at " + startOff + " length " + len));
                    this.error = true;
                    Assert.fail((String)t.getMessage());
                }
            }
        }

        public long getBytesRead() {
            return this.bytesRead;
        }

        public boolean hasError() {
            return this.error;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void read(int start, int len) throws Exception {
            DFSInputStream dis;
            Assert.assertTrue((String)("Bad args: " + start + " + " + len + " should be <= " + this.fileSize), ((long)(start + len) <= this.fileSize ? 1 : 0) != 0);
            DFSInputStream dFSInputStream = dis = this.testInfo.dis;
            synchronized (dFSInputStream) {
                dis.seek((long)start);
                byte[] buf = new byte[len];
                for (int cnt = 0; cnt < len; cnt += dis.read(buf, cnt, buf.length - cnt)) {
                }
                this.verifyData("Read data corrupted", buf, start, start + len);
            }
        }

        private void pRead(int start, int len) throws Exception {
            Assert.assertTrue((String)("Bad args: " + start + " + " + len + " should be <= " + this.fileSize), ((long)(start + len) <= this.fileSize ? 1 : 0) != 0);
            DFSInputStream dis = this.testInfo.dis;
            byte[] buf = new byte[len];
            for (int cnt = 0; cnt < len; cnt += dis.read((long)start, buf, cnt, buf.length - cnt)) {
            }
            this.verifyData("Pread data corrupted", buf, start, start + len);
        }

        private void verifyData(String msg, byte[] actual, int start, int end) throws Exception {
            byte[] auth = this.testInfo.authenticData;
            if (end > auth.length) {
                throw new Exception(msg + ": Actual array (" + end + ") is past the end of authentic data (" + auth.length + ")");
            }
            int j = start;
            int i = 0;
            while (i < actual.length) {
                if (auth[j] != actual[i]) {
                    throw new Exception(msg + ": Arrays byte " + i + " (at offset " + j + ") differs: expect " + auth[j] + " got " + actual[i]);
                }
                ++i;
                ++j;
            }
        }
    }

    private class TestFileInfo {
        public DFSInputStream dis;
        public Path filepath;
        public byte[] authenticData;

        private TestFileInfo() {
        }
    }
}

