/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.LinkedHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestBPOfferService {
    private static final String FAKE_BPID = "fake bpid";
    private static final String FAKE_CLUSTERID = "fake cluster";
    protected static final Log LOG = LogFactory.getLog(TestBPOfferService.class);
    private static final ExtendedBlock FAKE_BLOCK = new ExtendedBlock("fake bpid", 12345L);
    private DatanodeProtocolClientSideTranslatorPB mockNN1;
    private DatanodeProtocolClientSideTranslatorPB mockNN2;
    private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
    private int[] heartbeatCounts = new int[2];
    private DataNode mockDn;
    private FSDatasetInterface mockFSDataset;

    @Before
    public void setupMocks() throws Exception {
        this.mockNN1 = this.setupNNMock(0);
        this.mockNN2 = this.setupNNMock(1);
        this.mockDn = (DataNode)Mockito.mock(DataNode.class);
        ((DataNode)Mockito.doReturn((Object)true).when((Object)this.mockDn)).shouldRun();
        Configuration conf = new Configuration();
        ((DataNode)Mockito.doReturn((Object)conf).when((Object)this.mockDn)).getConf();
        ((DataNode)Mockito.doReturn((Object)new DNConf(conf)).when((Object)this.mockDn)).getDnConf();
        ((DataNode)Mockito.doReturn((Object)DataNodeMetrics.create((Configuration)conf, (String)"fake dn")).when((Object)this.mockDn)).getMetrics();
        this.mockFSDataset = (FSDatasetInterface)Mockito.spy((Object)new SimulatedFSDataset(conf));
        this.mockFSDataset.addBlockPool(FAKE_BPID, conf);
        ((DataNode)Mockito.doReturn((Object)this.mockFSDataset).when((Object)this.mockDn)).getFSDataset();
    }

    private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) throws Exception {
        DatanodeProtocolClientSideTranslatorPB mock = (DatanodeProtocolClientSideTranslatorPB)Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0L, HdfsConstants.LAYOUT_VERSION)).when((Object)mock)).versionRequest();
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new DatanodeRegistration("fake-node")).when((Object)mock)).registerDatanode((DatanodeRegistration)Mockito.any(DatanodeRegistration.class));
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new HeartbeatAnswer(nnIdx)).when((Object)mock)).sendHeartbeat((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt());
        this.mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.STANDBY, 0L);
        return mock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBasicFunctionality() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN1)).registerDatanode((DatanodeRegistration)Mockito.anyObject());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN2)).registerDatanode((DatanodeRegistration)Mockito.anyObject());
            this.waitForBlockReport(this.mockNN1);
            this.waitForBlockReport(this.mockNN2);
            bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
            ReceivedDeletedBlockInfo[] ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN1);
            Assert.assertEquals((long)1L, (long)ret.length);
            Assert.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
            ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN2);
            Assert.assertEquals((long)1L, (long)ret.length);
            Assert.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
        }
        finally {
            bpos.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIgnoreDeletionsFromNonActive() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new BlockCommand(2, FAKE_BPID, new Block[]{FAKE_BLOCK.getLocalBlock()})).when((Object)this.mockNN2)).blockReport((DatanodeRegistration)Mockito.anyObject(), (String)Mockito.eq((Object)FAKE_BPID), (long[])Mockito.anyObject());
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            this.waitForBlockReport(this.mockNN1);
            this.waitForBlockReport(this.mockNN2);
        }
        finally {
            bpos.stop();
        }
        ((FSDatasetInterface)Mockito.verify((Object)this.mockFSDataset, (VerificationMode)Mockito.never())).invalidate((String)Mockito.eq((Object)FAKE_BPID), (Block[])Mockito.anyObject());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNNsFromDifferentClusters() throws Exception {
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0L, HdfsConstants.LAYOUT_VERSION)).when((Object)this.mockNN1)).versionRequest();
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForOneToFail(bpos);
        }
        finally {
            bpos.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPickActiveNameNode() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assert.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.ACTIVE, 1L);
            this.waitForHeartbeats(bpos);
            Assert.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.ACTIVE, 2L);
            this.waitForHeartbeats(bpos);
            Assert.assertSame((Object)this.mockNN2, (Object)bpos.getActiveNN());
            this.waitForHeartbeats(bpos);
            Assert.assertSame((Object)this.mockNN2, (Object)bpos.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.STANDBY, 2L);
            this.waitForHeartbeats(bpos);
            Assert.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.ACTIVE, 3L);
            this.waitForHeartbeats(bpos);
            Assert.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
        }
        finally {
            bpos.stop();
        }
    }

    private void waitForOneToFail(final BPOfferService bpos) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            public Boolean get() {
                return bpos.countNameNodes() == 1;
            }
        }, 100, 10000);
    }

    private BPOfferService setupBPOSForNNs(DatanodeProtocolClientSideTranslatorPB ... nns) {
        final LinkedHashMap nnMap = Maps.newLinkedHashMap();
        for (int port = 0; port < nns.length; ++port) {
            nnMap.put(new InetSocketAddress(port), nns[port]);
        }
        return new BPOfferService(Lists.newArrayList(nnMap.keySet()), this.mockDn){

            DatanodeProtocolClientSideTranslatorPB connectToNN(InetSocketAddress nnAddr) throws IOException {
                DatanodeProtocolClientSideTranslatorPB nn = (DatanodeProtocolClientSideTranslatorPB)nnMap.get(nnAddr);
                if (nn == null) {
                    throw new AssertionError((Object)("bad NN addr: " + nnAddr));
                }
                return nn;
            }
        };
    }

    private void waitForInitialization(final BPOfferService bpos) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            public Boolean get() {
                return bpos.isAlive() && bpos.isInitialized();
            }
        }, 100, 10000);
    }

    private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN)).blockReport((DatanodeRegistration)Mockito.anyObject(), (String)Mockito.eq((Object)TestBPOfferService.FAKE_BPID), (long[])Mockito.anyObject());
                    return true;
                }
                catch (Throwable t) {
                    LOG.info((Object)("waiting on block report: " + t.getMessage()));
                    return false;
                }
            }
        }, 500, 10000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForHeartbeats(BPOfferService bpos) throws Exception {
        int[] nArray = this.heartbeatCounts;
        synchronized (this.heartbeatCounts) {
            final int[] countAtStart = Arrays.copyOf(this.heartbeatCounts, this.heartbeatCounts.length);
            // ** MonitorExit[var3_2] (shouldn't be in output)
            bpos.triggerHeartbeatForTests();
            GenericTestUtils.waitFor(new Supplier<Boolean>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public Boolean get() {
                    int[] nArray = TestBPOfferService.this.heartbeatCounts;
                    synchronized (nArray) {
                        for (int i = 0; i < countAtStart.length; ++i) {
                            if (TestBPOfferService.this.heartbeatCounts[i] > countAtStart[i]) continue;
                            return false;
                        }
                        return true;
                    }
                }
            }, 200, 10000);
            return;
        }
    }

    private ReceivedDeletedBlockInfo[] waitForBlockReceived(ExtendedBlock fakeBlock, DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
        final ArgumentCaptor captor = ArgumentCaptor.forClass(ReceivedDeletedBlockInfo[].class);
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)TestBPOfferService.this.mockNN1)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.anyObject(), (String)Mockito.eq((Object)TestBPOfferService.FAKE_BPID), (ReceivedDeletedBlockInfo[])captor.capture());
                    return true;
                }
                catch (Throwable t) {
                    return false;
                }
            }
        }, 100, 10000);
        return (ReceivedDeletedBlockInfo[])captor.getValue();
    }

    static {
        ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
    }

    private class HeartbeatAnswer
    implements Answer<HeartbeatResponse> {
        private final int nnIdx;

        public HeartbeatAnswer(int nnIdx) {
            this.nnIdx = nnIdx;
        }

        public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
            int[] nArray = TestBPOfferService.this.heartbeatCounts;
            int n = this.nnIdx;
            nArray[n] = nArray[n] + 1;
            return new HeartbeatResponse(new DatanodeCommand[0], TestBPOfferService.this.mockHaStatuses[this.nnIdx]);
        }
    }
}

