/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcPayloadHeader;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestDFSClientRetries
extends TestCase {
    private static final String ADDRESS = "0.0.0.0";
    private static final int PING_INTERVAL = 1000;
    private static final int MIN_SLEEP_TIME = 1000;
    public static final Log LOG = LogFactory.getLog((String)TestDFSClientRetries.class.getName());
    private static final Configuration conf = new HdfsConfiguration();

    private static void writeData(OutputStream out, int len) throws IOException {
        byte[] buf = new byte[65536];
        while (len > 0) {
            int toWrite = Math.min(len, buf.length);
            out.write(buf, 0, toWrite);
            len -= toWrite;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWriteTimeoutAtDataNode() throws IOException, InterruptedException {
        int writeTimeout = 100;
        conf.setInt("dfs.datanode.socket.write.timeout", 100);
        int blockSize = 0xA00000;
        conf.setInt("dfs.blocksize", 0xA00000);
        conf.setInt("dfs.client.max.block.acquire.failures", 1);
        int bufferSize = 4096;
        conf.setInt("io.file.buffer.size", 4096);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
        try {
            cluster.waitActive();
            FileSystem fs = cluster.getFileSystem();
            Path filePath = new Path("/testWriteTimeoutAtDataNode");
            FSDataOutputStream out = fs.create(filePath, true, 4096);
            TestDFSClientRetries.writeData((OutputStream)out, 0x1400000);
            out.close();
            byte[] buf = new byte[0x100000];
            FSDataInputStream in = fs.open(filePath, 4096);
            IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)2048);
            for (int i = 0; i < 10; ++i) {
                Thread.sleep(200L);
                IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)buf.length);
            }
            in.close();
        }
        finally {
            cluster.shutdown();
        }
    }

    public void testNotYetReplicatedErrors() throws IOException {
        String exceptionMsg = "Nope, not replicated yet...";
        boolean maxRetries = true;
        conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
        NamenodeProtocols mockNN = (NamenodeProtocols)Mockito.mock(NamenodeProtocols.class);
        ThrowsException answer = new ThrowsException(new IOException()){
            int retryCount;
            {
                this.retryCount = 0;
            }

            public Object answer(InvocationOnMock invocation) throws Throwable {
                ++this.retryCount;
                System.out.println("addBlock has been called " + this.retryCount + " times");
                if (this.retryCount > 2) {
                    throw new IOException("Retried too many times: " + this.retryCount);
                }
                throw new RemoteException(NotReplicatedYetException.class.getName(), "Nope, not replicated yet...");
            }
        };
        Mockito.when((Object)mockNN.addBlock(Matchers.anyString(), Matchers.anyString(), (ExtendedBlock)Matchers.any(ExtendedBlock.class), (DatanodeInfo[])Matchers.any(DatanodeInfo[].class))).thenAnswer((Answer)answer);
        DFSClient client = new DFSClient(null, (ClientProtocol)mockNN, conf, null);
        OutputStream os = client.create("testfile", true);
        os.write(20);
        try {
            os.close();
        }
        catch (Exception e) {
            TestDFSClientRetries.assertTrue((String)("Retries are not being stopped correctly: " + e.getMessage()), (boolean)e.getMessage().equals("Nope, not replicated yet..."));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testFailuresArePerOperation() throws Exception {
        long fileSize = 4096L;
        Path file = new Path("/testFile");
        conf.setInt("dfs.client.retry.window.base", 10);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
        try {
            cluster.waitActive();
            FileSystem fs = cluster.getFileSystem();
            NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
            NamenodeProtocols spyNN = (NamenodeProtocols)Mockito.spy((Object)preSpyNN);
            DFSClient client = new DFSClient(null, (ClientProtocol)spyNN, conf, null);
            int maxBlockAcquires = client.getMaxBlockAcquireFailures();
            TestDFSClientRetries.assertTrue((maxBlockAcquires > 0 ? 1 : 0) != 0);
            DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L);
            ((NamenodeProtocols)Mockito.doAnswer((Answer)new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1)).when((Object)spyNN)).getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong());
            try {
                IOUtils.copyBytes((InputStream)client.open(file.toString()), (OutputStream)new IOUtils.NullOutputStream(), (Configuration)conf, (boolean)true);
                TestDFSClientRetries.fail((String)"Didn't get exception");
            }
            catch (IOException ioe) {
                DFSClient.LOG.info((Object)"Got expected exception", (Throwable)ioe);
            }
            ((NamenodeProtocols)Mockito.doAnswer((Answer)new FailNTimesAnswer(preSpyNN, maxBlockAcquires)).when((Object)spyNN)).getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong());
            IOUtils.copyBytes((InputStream)client.open(file.toString()), (OutputStream)new IOUtils.NullOutputStream(), (Configuration)conf, (boolean)true);
            DFSClient.LOG.info((Object)"Starting test case for failure reset");
            ((NamenodeProtocols)Mockito.doAnswer((Answer)new FailNTimesAnswer(preSpyNN, maxBlockAcquires)).when((Object)spyNN)).getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong());
            DFSInputStream is = client.open(file.toString());
            byte[] buf = new byte[10];
            IOUtils.readFully((InputStream)is, (byte[])buf, (int)0, (int)buf.length);
            DFSClient.LOG.info((Object)"First read successful after some failures.");
            ((NamenodeProtocols)Mockito.doAnswer((Answer)new FailNTimesAnswer(preSpyNN, maxBlockAcquires)).when((Object)spyNN)).getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong());
            is.openInfo();
            is.seek(0L);
            IOUtils.readFully((InputStream)is, (byte[])buf, (int)0, (int)buf.length);
        }
        finally {
            cluster.shutdown();
        }
    }

    public void testDFSClientRetriesOnBusyBlocks() throws IOException {
        System.out.println("Testing DFSClient random waiting on busy blocks.");
        int xcievers = 2;
        int fileLen = 0x600000;
        int threads = 50;
        int retries = 3;
        int timeWin = 300;
        long timestamp = System.currentTimeMillis();
        boolean pass = this.busyTest(xcievers, threads, fileLen, timeWin, retries);
        long timestamp2 = System.currentTimeMillis();
        if (pass) {
            LOG.info((Object)("Test 1 succeeded! Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
        } else {
            LOG.warn((Object)("Test 1 failed, but relax. Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
        }
        retries = 50;
        timestamp = System.currentTimeMillis();
        pass = this.busyTest(xcievers, threads, fileLen, timeWin, retries);
        timestamp2 = System.currentTimeMillis();
        TestDFSClientRetries.assertTrue((String)"Something wrong! Test 2 got Exception with maxmum retries!", (boolean)pass);
        LOG.info((Object)("Test 2 succeeded! Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
        retries = 3;
        timeWin = 1000;
        timestamp = System.currentTimeMillis();
        pass = this.busyTest(xcievers, threads, fileLen, timeWin, retries);
        timestamp2 = System.currentTimeMillis();
        if (pass) {
            LOG.info((Object)("Test 3 succeeded! Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
        } else {
            LOG.warn((Object)("Test 3 failed, but relax. Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
        }
        retries = 50;
        timeWin = 1000;
        timestamp = System.currentTimeMillis();
        pass = this.busyTest(xcievers, threads, fileLen, timeWin, retries);
        timestamp2 = System.currentTimeMillis();
        TestDFSClientRetries.assertTrue((String)"Something wrong! Test 4 got Exception with maxmum retries!", (boolean)pass);
        LOG.info((Object)("Test 4 succeeded! Time spent: " + (double)(timestamp2 - timestamp) / 1000.0 + " sec."));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, int retries) throws IOException {
        boolean ret = true;
        short replicationFactor = 1;
        long blockSize = 0x8000000L;
        int bufferSize = 4096;
        conf.setInt("dfs.datanode.max.transfer.threads", xcievers);
        conf.setInt("dfs.client.max.block.acquire.failures", retries);
        conf.setInt("dfs.client.retry.window.base", timeWin);
        conf.setInt("dfs.datanode.socket.reuse.keepalive", 0);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(replicationFactor).build();
        cluster.waitActive();
        FileSystem fs = cluster.getFileSystem();
        Path file1 = new Path("test_data.dat");
        file1 = file1.makeQualified(fs.getUri(), fs.getWorkingDirectory());
        try {
            int i;
            FSDataOutputStream stm = fs.create(file1, true, bufferSize, replicationFactor, blockSize);
            TestDFSClientRetries.assertTrue((String)(file1 + " should be a file"), (boolean)fs.getFileStatus(file1).isFile());
            System.out.println("Path : \"" + file1 + "\"");
            LOG.info((Object)("Path : \"" + file1 + "\""));
            byte[] buffer = AppendTestUtil.randomBytes(System.currentTimeMillis(), fileLen);
            stm.write(buffer, 0, fileLen);
            stm.close();
            long len = fs.getFileStatus(file1).getLen();
            TestDFSClientRetries.assertTrue((String)(file1 + " should be of size " + fileLen + " but found to be of size " + len), (len == (long)fileLen ? 1 : 0) != 0);
            byte[] read_buf = new byte[fileLen];
            FSDataInputStream in = fs.open(file1, fileLen);
            IOUtils.readFully((InputStream)in, (byte[])read_buf, (int)0, (int)fileLen);
            assert (Arrays.equals(buffer, read_buf));
            in.close();
            read_buf = null;
            MessageDigest m = MessageDigest.getInstance("SHA");
            m.update(buffer, 0, fileLen);
            byte[] hash_sha = m.digest();
            Thread[] readers = new Thread[threads];
            Counter counter = new Counter(0);
            for (i = 0; i < threads; ++i) {
                DFSClientReader reader = new DFSClientReader(file1, cluster, hash_sha, fileLen, counter);
                readers[i] = new Thread(reader);
                readers[i].start();
            }
            for (i = 0; i < threads; ++i) {
                readers[i].join();
            }
            ret = counter.get() == threads;
        }
        catch (InterruptedException e) {
            System.out.println("Thread got InterruptedException.");
            e.printStackTrace();
            ret = false;
        }
        catch (Exception e) {
            e.printStackTrace();
            ret = false;
        }
        finally {
            fs.delete(file1, false);
            cluster.shutdown();
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testGetFileChecksum() throws Exception {
        String f = "/testGetFileChecksum";
        Path p = new Path("/testGetFileChecksum");
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
        try {
            cluster.waitActive();
            FileSystem fs = cluster.getFileSystem();
            DFSTestUtil.createFile(fs, p, 0x100000L, (short)3, 20100402L);
            FileChecksum cs1 = fs.getFileChecksum(p);
            TestDFSClientRetries.assertTrue((cs1 != null ? 1 : 0) != 0);
            List locatedblocks = DFSClient.callGetBlockLocations((ClientProtocol)cluster.getNameNodeRpc(), (String)"/testGetFileChecksum", (long)0L, (long)Long.MAX_VALUE).getLocatedBlocks();
            DatanodeInfo first = ((LocatedBlock)locatedblocks.get(0)).getLocations()[0];
            cluster.stopDataNode(first.getName());
            FileChecksum cs2 = fs.getFileChecksum(p);
            TestDFSClientRetries.assertEquals((Object)cs1, (Object)cs2);
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testClientDNProtocolTimeout() throws IOException {
        TestServer server = new TestServer(1, true);
        server.start();
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        DatanodeID fakeDnId = new DatanodeID("localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
        ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
        LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
        ClientDatanodeProtocol proxy = null;
        try {
            proxy = DFSUtil.createClientDatanodeProtocolProxy((DatanodeID)fakeDnId, (Configuration)conf, (int)500, (LocatedBlock)fakeBlock);
            proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1L));
            TestDFSClientRetries.fail((String)"Did not get expected exception: SocketTimeoutException");
        }
        catch (SocketTimeoutException e) {
            LOG.info((Object)"Got the expected Exception: SocketTimeoutException");
        }
        finally {
            if (proxy != null) {
                RPC.stopProxy((Object)proxy);
            }
            server.stop();
        }
    }

    class Counter {
        int counter;

        Counter(int n) {
            this.counter = n;
        }

        public synchronized void inc() {
            ++this.counter;
        }

        public int get() {
            return this.counter;
        }
    }

    class DFSClientReader
    implements Runnable {
        DFSClient client;
        Configuration conf;
        byte[] expected_sha;
        FileSystem fs;
        Path filePath;
        MiniDFSCluster cluster;
        int len;
        Counter counter;

        DFSClientReader(Path file, MiniDFSCluster cluster, byte[] hash_sha, int fileLen, Counter cnt) {
            this.filePath = file;
            this.cluster = cluster;
            this.counter = cnt;
            this.len = fileLen;
            this.conf = new HdfsConfiguration();
            this.expected_sha = hash_sha;
            try {
                cluster.waitActive();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            try {
                this.fs = this.cluster.getNewFileSystemInstance(0);
                int bufferSize = this.len;
                byte[] buf = new byte[bufferSize];
                FSDataInputStream in = this.fs.open(this.filePath, bufferSize);
                IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)bufferSize);
                MessageDigest m = MessageDigest.getInstance("SHA");
                m.update(buf, 0, bufferSize);
                byte[] hash_sha = m.digest();
                buf = null;
                in.close();
                this.fs.close();
                Assert.assertTrue((String)"hashed keys are not the same size", (hash_sha.length == this.expected_sha.length ? 1 : 0) != 0);
                Assert.assertTrue((String)"hashed keys are not equal", (boolean)Arrays.equals(hash_sha, this.expected_sha));
                this.counter.inc();
                LOG.info((Object)"Thread correctly read the block.");
            }
            catch (BlockMissingException e) {
                LOG.info((Object)"Bad - BlockMissingException is caught.");
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class FailNTimesAnswer
    implements Answer<LocatedBlocks> {
        private int failuresLeft;
        private NamenodeProtocols realNN;

        public FailNTimesAnswer(NamenodeProtocols preSpyNN, int timesToFail) {
            this.failuresLeft = timesToFail;
            this.realNN = preSpyNN;
        }

        public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
            Object[] args = invocation.getArguments();
            LocatedBlocks realAnswer = this.realNN.getBlockLocations((String)args[0], ((Long)args[1]).longValue(), ((Long)args[2]).longValue());
            if (this.failuresLeft-- > 0) {
                NameNode.LOG.info((Object)"FailNTimesAnswer injecting failure.");
                return this.makeBadBlockList(realAnswer);
            }
            NameNode.LOG.info((Object)"FailNTimesAnswer no longer failing.");
            return realAnswer;
        }

        private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
            LocatedBlock goodLocatedBlock = goodBlockList.get(0);
            LocatedBlock badLocatedBlock = new LocatedBlock(goodLocatedBlock.getBlock(), new DatanodeInfo[]{new DatanodeInfo(new DatanodeID("255.255.255.255:234"))}, goodLocatedBlock.getStartOffset(), false);
            ArrayList<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
            badBlocks.add(badLocatedBlock);
            return new LocatedBlocks(goodBlockList.getFileLength(), false, badBlocks, null, true);
        }
    }

    private static class TestServer
    extends Server {
        private boolean sleep;
        private Class<? extends Writable> responseClass;

        public TestServer(int handlerCount, boolean sleep) throws IOException {
            this(handlerCount, sleep, LongWritable.class, null);
        }

        public TestServer(int handlerCount, boolean sleep, Class<? extends Writable> paramClass, Class<? extends Writable> responseClass) throws IOException {
            super(TestDFSClientRetries.ADDRESS, 0, paramClass, handlerCount, conf);
            this.sleep = sleep;
            this.responseClass = responseClass;
        }

        public Writable call(RpcPayloadHeader.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException {
            if (this.sleep) {
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
            }
            if (this.responseClass != null) {
                try {
                    return this.responseClass.newInstance();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return param;
        }
    }
}

