/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.zookeeper.server.quorum.LeaderBean;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.LearnerSnapshotThrottler;
import org.apache.zookeeper.server.quorum.LearnerSyncRequest;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.StateSummary;
import org.apache.zookeeper.server.quorum.SyncedLearnerTracker;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Leader {
    private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
    private static final boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
    private static final int maxConcurrentSnapshots;
    private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT = "zookeeper.leader.maxConcurrentSnapshotTimeout";
    private static final long maxConcurrentSnapshotTimeout;
    private final LearnerSnapshotThrottler learnerSnapshotThrottler = new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
    final LeaderZooKeeperServer zk;
    final QuorumPeer self;
    private boolean quorumFormed = false;
    volatile LearnerCnxAcceptor cnxAcceptor = null;
    private final HashSet<LearnerHandler> learners = new HashSet();
    private final HashSet<LearnerHandler> forwardingFollowers = new HashSet();
    private final HashSet<LearnerHandler> observingLearners = new HashSet();
    private final HashMap<Long, List<LearnerSyncRequest>> pendingSyncs = new HashMap();
    final AtomicLong followerCounter = new AtomicLong(-1L);
    private final ServerSocket ss;
    static final int DIFF = 13;
    static final int TRUNC = 14;
    static final int SNAP = 15;
    static final int OBSERVERINFO = 16;
    static final int NEWLEADER = 10;
    static final int FOLLOWERINFO = 11;
    static final int UPTODATE = 12;
    public static final int LEADERINFO = 17;
    public static final int ACKEPOCH = 18;
    static final int REQUEST = 1;
    public static final int PROPOSAL = 2;
    static final int ACK = 3;
    static final int COMMIT = 4;
    static final int PING = 5;
    static final int REVALIDATE = 6;
    static final int SYNC = 7;
    static final int INFORM = 8;
    static final int COMMITANDACTIVATE = 9;
    static final int INFORMANDACTIVATE = 19;
    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue();
    private final Proposal newLeaderProposal = new Proposal();
    StateSummary leaderStateSummary;
    long epoch = -1L;
    boolean waitingForNewEpoch = true;
    boolean allowedToCommit = true;
    boolean isShutdown;
    long lastCommitted = -1L;
    long lastProposed;
    private final HashSet<Long> connectingFollowers = new HashSet();
    private final HashSet<Long> electingFollowers = new HashSet();
    private boolean electionFinished = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<LearnerHandler> getLearners() {
        HashSet<LearnerHandler> hashSet = this.learners;
        synchronized (hashSet) {
            return new ArrayList<LearnerHandler>(this.learners);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<LearnerHandler> getForwardingFollowers() {
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            return new ArrayList<LearnerHandler>(this.forwardingFollowers);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addForwardingFollower(LearnerHandler lh) {
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            this.forwardingFollowers.add(lh);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<LearnerHandler> getObservingLearners() {
        HashSet<LearnerHandler> hashSet = this.observingLearners;
        synchronized (hashSet) {
            return new ArrayList<LearnerHandler>(this.observingLearners);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addObserverLearnerHandler(LearnerHandler lh) {
        HashSet<LearnerHandler> hashSet = this.observingLearners;
        synchronized (hashSet) {
            this.observingLearners.add(lh);
        }
    }

    public synchronized int getNumPendingSyncs() {
        return this.pendingSyncs.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addLearnerHandler(LearnerHandler learner) {
        HashSet<LearnerHandler> hashSet = this.learners;
        synchronized (hashSet) {
            this.learners.add(learner);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeLearnerHandler(LearnerHandler peer) {
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            this.forwardingFollowers.remove(peer);
        }
        hashSet = this.learners;
        synchronized (hashSet) {
            this.learners.remove(peer);
        }
        hashSet = this.observingLearners;
        synchronized (hashSet) {
            this.observingLearners.remove(peer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isLearnerSynced(LearnerHandler peer) {
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            return this.forwardingFollowers.contains(peer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isQuorumSynced(QuorumVerifier qv) {
        HashSet<Long> ids = new HashSet<Long>();
        if (qv.getVotingMembers().containsKey(this.self.getId())) {
            ids.add(this.self.getId());
        }
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            for (LearnerHandler learnerHandler : this.forwardingFollowers) {
                if (!learnerHandler.synced() || !qv.getVotingMembers().containsKey(learnerHandler.getSid())) continue;
                ids.add(learnerHandler.getSid());
            }
        }
        return qv.containsQuorum(ids);
    }

    Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        try {
            this.ss = self.getQuorumListenOnAllIPs() ? new ServerSocket(self.getQuorumAddress().getPort()) : new ServerSocket();
            this.ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                this.ss.bind(self.getQuorumAddress());
            }
        }
        catch (BindException e) {
            if (self.getQuorumListenOnAllIPs()) {
                LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), (Throwable)e);
            } else {
                LOG.error("Couldn't bind to " + self.getQuorumAddress(), (Throwable)e);
            }
            throw e;
        }
        this.zk = zk;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void lead() throws IOException, InterruptedException {
        this.self.end_fle = Time.currentElapsedTime();
        LOG.info("LEADING - LEADER ELECTION TOOK - " + (this.self.end_fle - this.self.start_fle) + " " + "MS");
        this.self.start_fle = 0L;
        this.self.end_fle = 0L;
        this.zk.registerJMX(new LeaderBean(this, this.zk), this.self.jmxLocalPeerBean);
        try {
            this.self.tick.set(0);
            this.zk.loadData();
            this.leaderStateSummary = new StateSummary(this.self.getCurrentEpoch(), this.zk.getLastProcessedZxid());
            this.cnxAcceptor = new LearnerCnxAcceptor();
            this.cnxAcceptor.start();
            long epoch = this.getEpochToPropose(this.self.getId(), this.self.getAcceptedEpoch());
            this.zk.setZxid(ZxidUtils.makeZxid(epoch, 0L));
            Leader leader = this;
            synchronized (leader) {
                this.lastProposed = this.zk.getZxid();
            }
            this.newLeaderProposal.packet = new QuorumPacket(10, this.zk.getZxid(), null, null);
            if ((this.newLeaderProposal.packet.getZxid() & 0xFFFFFFFFL) != 0L) {
                LOG.info("NEWLEADER proposal has Zxid of " + Long.toHexString(this.newLeaderProposal.packet.getZxid()));
            }
            QuorumVerifier lastSeenQV = this.self.getLastSeenQuorumVerifier();
            QuorumVerifier curQV = this.self.getQuorumVerifier();
            if (curQV.getVersion() == 0L && curQV.getVersion() == lastSeenQV.getVersion()) {
                try {
                    QuorumVerifier newQV = this.self.configFromString(curQV.toString());
                    newQV.setVersion(this.zk.getZxid());
                    this.self.setLastSeenQuorumVerifier(newQV, true);
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
            }
            this.newLeaderProposal.addQuorumVerifier(this.self.getQuorumVerifier());
            if (this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
                this.newLeaderProposal.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
            }
            this.waitForEpochAck(this.self.getId(), this.leaderStateSummary);
            this.self.setCurrentEpoch(epoch);
            try {
                this.waitForNewLeaderAck(this.self.getId(), this.zk.getZxid(), QuorumPeer.LearnerType.PARTICIPANT);
            }
            catch (InterruptedException e) {
                this.shutdown("Waiting for a quorum of followers, only synced with sids: [ " + this.newLeaderProposal.ackSetsToString() + " ]");
                HashSet<Long> followerSet = new HashSet<Long>();
                for (LearnerHandler f : this.getLearners()) {
                    if (!this.self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) continue;
                    followerSet.add(f.getSid());
                }
                boolean initTicksShouldBeIncreased = true;
                for (SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset : this.newLeaderProposal.qvAcksetPairs) {
                    if (qvAckset.getQuorumVerifier().containsQuorum(followerSet)) continue;
                    initTicksShouldBeIncreased = false;
                    break;
                }
                if (initTicksShouldBeIncreased) {
                    LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
                }
                this.zk.unregisterJMX(this);
                return;
            }
            this.startZkServer();
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                this.zk.setZxid(this.zk.getZxid() & 0xFFFFFFFF00000000L | zxid);
            }
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                this.self.setZooKeeperServer(this.zk);
            }
            this.self.adminServer.setZooKeeperServer(this.zk);
            boolean tickSkip = true;
            String shutdownMessage = null;
            block16: while (true) {
                Object object = this;
                synchronized (object) {
                    long start;
                    long cur = start = Time.currentElapsedTime();
                    long end = start + (long)(this.self.tickTime / 2);
                    while (cur < end) {
                        this.wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }
                    if (!tickSkip) {
                        this.self.tick.incrementAndGet();
                    }
                    SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                    syncedAckSet.addQuorumVerifier(this.self.getQuorumVerifier());
                    if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
                        syncedAckSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
                    }
                    syncedAckSet.addAck(this.self.getId());
                    for (LearnerHandler f : this.getLearners()) {
                        if (!f.synced()) continue;
                        syncedAckSet.addAck(f.getSid());
                    }
                    if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                        shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " + syncedAckSet.ackSetsToString() + " ]";
                        break;
                    }
                    tickSkip = !tickSkip;
                }
                object = this.getLearners().iterator();
                while (true) {
                    if (!object.hasNext()) continue block16;
                    LearnerHandler f = (LearnerHandler)object.next();
                    f.ping();
                }
                break;
            }
            if (shutdownMessage != null) {
                this.shutdown(shutdownMessage);
            }
        }
        finally {
            this.zk.unregisterJMX(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void shutdown(String reason) {
        LOG.info("Shutting down");
        if (this.isShutdown) {
            return;
        }
        LOG.info("Shutdown called", (Throwable)new Exception("shutdown Leader! reason: " + reason));
        if (this.cnxAcceptor != null) {
            this.cnxAcceptor.halt();
        }
        this.self.setZooKeeperServer(null);
        this.self.adminServer.setZooKeeperServer(null);
        try {
            this.ss.close();
        }
        catch (IOException e) {
            LOG.warn("Ignoring unexpected exception during close", (Throwable)e);
        }
        this.self.closeAllConnections();
        if (this.zk != null) {
            this.zk.shutdown();
        }
        HashSet<LearnerHandler> hashSet = this.learners;
        synchronized (hashSet) {
            Iterator<LearnerHandler> it = this.learners.iterator();
            while (it.hasNext()) {
                LearnerHandler f = it.next();
                it.remove();
                f.shutdown();
            }
        }
        this.isShutdown = true;
    }

    private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
        SyncedLearnerTracker.QuorumVerifierAcksetPair newQVAcksetPair = (SyncedLearnerTracker.QuorumVerifierAcksetPair)reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size() - 1);
        if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(this.self.getId()) && newQVAcksetPair.getQuorumVerifier().getVotingMembers().get((Object)Long.valueOf((long)this.self.getId())).addr.equals(this.self.getQuorumAddress())) {
            return this.self.getId();
        }
        HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
        candidates.remove(this.self.getId());
        long curCandidate = candidates.iterator().next();
        long curZxid = zxid + 1L;
        Proposal p = (Proposal)this.outstandingProposals.get(curZxid);
        while (p != null && !candidates.isEmpty()) {
            for (SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset : p.qvAcksetPairs) {
                candidates.retainAll(qvAckset.getAckset());
                if (candidates.isEmpty()) {
                    return curCandidate;
                }
                curCandidate = candidates.iterator().next();
                if (candidates.size() != 1) continue;
                return curCandidate;
            }
            p = (Proposal)this.outstandingProposals.get(++curZxid);
        }
        return curCandidate;
    }

    public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
        if (this.outstandingProposals.containsKey(zxid - 1L)) {
            return false;
        }
        if (!p.hasAllQuorums()) {
            return false;
        }
        if (zxid != this.lastCommitted + 1L) {
            LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr + " not first!");
            LOG.warn("First is " + (this.lastCommitted + 1L));
        }
        this.outstandingProposals.remove(zxid);
        if (p.request != null) {
            this.toBeApplied.add(p);
        }
        if (p.request == null) {
            LOG.warn("Going to commmit null: " + p);
        } else if (p.request.getHdr().getType() == 16) {
            LOG.debug("Committing a reconfiguration! " + this.outstandingProposals.size());
            Long designatedLeader = this.getDesignatedLeader(p, zxid);
            QuorumVerifier newQV = ((SyncedLearnerTracker.QuorumVerifierAcksetPair)p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1)).getQuorumVerifier();
            this.self.processReconfig(newQV, designatedLeader, this.zk.getZxid(), true);
            if (designatedLeader.longValue() != this.self.getId()) {
                this.allowedToCommit = false;
            }
            this.commitAndActivate(zxid, designatedLeader);
            this.informAndActivate(p, designatedLeader);
        } else {
            this.commit(zxid);
            this.inform(p);
        }
        this.zk.commitProcessor.commit(p.request);
        if (this.pendingSyncs.containsKey(zxid)) {
            for (LearnerSyncRequest r : this.pendingSyncs.remove(zxid)) {
                this.sendSync(r);
            }
        }
        return true;
    }

    public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
        if (!this.allowedToCommit) {
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Ack zxid: 0x{}", (Object)Long.toHexString(zxid));
            for (Proposal p : this.outstandingProposals.values()) {
                long packetZxid = p.packet.getZxid();
                LOG.trace("outstanding proposal: 0x{}", (Object)Long.toHexString(packetZxid));
            }
            LOG.trace("outstanding proposals all");
        }
        if ((zxid & 0xFFFFFFFFL) == 0L) {
            return;
        }
        if (this.outstandingProposals.size() == 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("outstanding is 0");
            }
            return;
        }
        if (this.lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", (Object)Long.toHexString(this.lastCommitted), (Object)Long.toHexString(zxid));
            }
            return;
        }
        Proposal p = (Proposal)this.outstandingProposals.get(zxid);
        if (p == null) {
            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", (Object)Long.toHexString(zxid), (Object)followerAddr);
            return;
        }
        p.addAck(sid);
        boolean hasCommitted = this.tryToCommit(p, zxid, followerAddr);
        if (hasCommitted && p.request != null && p.request.getHdr().getType() == 16) {
            long curZxid = zxid;
            while (this.allowedToCommit && hasCommitted && p != null) {
                if ((p = (Proposal)this.outstandingProposals.get(++curZxid)) == null) continue;
                hasCommitted = this.tryToCommit(p, curZxid, null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendPacket(QuorumPacket qp) {
        HashSet<LearnerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            for (LearnerHandler f : this.forwardingFollowers) {
                f.queuePacket(qp);
            }
        }
    }

    void sendObserverPacket(QuorumPacket qp) {
        for (LearnerHandler f : this.getObservingLearners()) {
            f.queuePacket(qp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commit(long zxid) {
        Leader leader = this;
        synchronized (leader) {
            this.lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(4, zxid, null, null);
        this.sendPacket(qp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitAndActivate(long zxid, long designatedLeader) {
        Leader leader = this;
        synchronized (leader) {
            this.lastCommitted = zxid;
        }
        byte[] data = new byte[8];
        ByteBuffer buffer = ByteBuffer.wrap(data);
        buffer.putLong(designatedLeader);
        QuorumPacket qp = new QuorumPacket(9, zxid, data, null);
        this.sendPacket(qp);
    }

    public void inform(Proposal proposal) {
        QuorumPacket qp = new QuorumPacket(8, proposal.request.zxid, proposal.packet.getData(), null);
        this.sendObserverPacket(qp);
    }

    public void informAndActivate(Proposal proposal, long designatedLeader) {
        byte[] proposalData = proposal.packet.getData();
        byte[] data = new byte[proposalData.length + 8];
        ByteBuffer buffer = ByteBuffer.wrap(data);
        buffer.putLong(designatedLeader);
        buffer.put(proposalData);
        QuorumPacket qp = new QuorumPacket(19, proposal.request.zxid, data, null);
        this.sendObserverPacket(qp);
    }

    public long getEpoch() {
        return ZxidUtils.getEpochFromZxid(this.lastProposed);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Proposal propose(Request request) throws XidRolloverException {
        if ((request.zxid & 0xFFFFFFFFL) == 0xFFFFFFFFL) {
            String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            this.shutdown(msg);
            throw new XidRolloverException(msg);
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        try {
            request.getHdr().serialize(boa, "hdr");
            if (request.getTxn() != null) {
                request.getTxn().serialize(boa, "txn");
            }
            baos.close();
        }
        catch (IOException e) {
            LOG.warn("This really should be impossible", (Throwable)e);
        }
        QuorumPacket pp = new QuorumPacket(2, request.zxid, baos.toByteArray(), null);
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        Leader leader = this;
        synchronized (leader) {
            p.addQuorumVerifier(this.self.getQuorumVerifier());
            if (request.getHdr().getType() == 16) {
                this.self.setLastSeenQuorumVerifier(request.qv, true);
            }
            if (this.self.getQuorumVerifier().getVersion() < this.self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Proposing:: " + request);
            }
            this.lastProposed = p.packet.getZxid();
            this.outstandingProposals.put(this.lastProposed, p);
            this.sendPacket(pp);
        }
        return p;
    }

    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
        return this.learnerSnapshotThrottler;
    }

    public synchronized void processSync(LearnerSyncRequest r) {
        if (this.outstandingProposals.isEmpty()) {
            this.sendSync(r);
        } else {
            List<LearnerSyncRequest> l = this.pendingSyncs.get(this.lastProposed);
            if (l == null) {
                l = new ArrayList<LearnerSyncRequest>();
            }
            l.add(r);
            this.pendingSyncs.put(this.lastProposed, l);
        }
    }

    public void sendSync(LearnerSyncRequest r) {
        QuorumPacket qp = new QuorumPacket(7, 0L, null, null);
        r.fh.queuePacket(qp);
    }

    public synchronized long startForwarding(LearnerHandler handler, long lastSeenZxid) {
        if (this.lastProposed > lastSeenZxid) {
            for (Proposal p : this.toBeApplied) {
                if (p.packet.getZxid() <= lastSeenZxid) continue;
                handler.queuePacket(p.packet);
                QuorumPacket qp = new QuorumPacket(4, p.packet.getZxid(), null, null);
                handler.queuePacket(qp);
            }
            if (handler.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
                ArrayList zxids = new ArrayList(this.outstandingProposals.keySet());
                Collections.sort(zxids);
                for (Long zxid : zxids) {
                    if (zxid <= lastSeenZxid) continue;
                    handler.queuePacket(((Proposal)this.outstandingProposals.get((Object)zxid)).packet);
                }
            }
        }
        if (handler.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            this.addForwardingFollower(handler);
        } else {
            this.addObserverLearnerHandler(handler);
        }
        return this.lastProposed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        HashSet<Long> hashSet = this.connectingFollowers;
        synchronized (hashSet) {
            if (!this.waitingForNewEpoch) {
                return this.epoch;
            }
            if (lastAcceptedEpoch >= this.epoch) {
                this.epoch = lastAcceptedEpoch + 1L;
            }
            this.connectingFollowers.add(sid);
            QuorumVerifier verifier = this.self.getQuorumVerifier();
            if (this.connectingFollowers.contains(this.self.getId()) && verifier.containsQuorum(this.connectingFollowers)) {
                this.waitingForNewEpoch = false;
                this.self.setAcceptedEpoch(this.epoch);
                this.connectingFollowers.notifyAll();
            } else {
                long start;
                long cur = start = Time.currentElapsedTime();
                long end = start + (long)(this.self.getInitLimit() * this.self.getTickTime());
                while (this.waitingForNewEpoch && cur < end) {
                    this.connectingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (this.waitingForNewEpoch) {
                    throw new InterruptedException("Timeout while waiting for epoch from quorum");
                }
            }
            return this.epoch;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
        HashSet<Long> hashSet = this.electingFollowers;
        synchronized (hashSet) {
            if (this.electionFinished) {
                return;
            }
            if (ss.getCurrentEpoch() != -1L) {
                if (ss.isMoreRecentThan(this.leaderStateSummary)) {
                    throw new IOException("Follower is ahead of the leader, leader summary: " + this.leaderStateSummary.getCurrentEpoch() + " (current epoch), " + this.leaderStateSummary.getLastZxid() + " (last zxid)");
                }
                this.electingFollowers.add(id);
            }
            QuorumVerifier verifier = this.self.getQuorumVerifier();
            if (this.electingFollowers.contains(this.self.getId()) && verifier.containsQuorum(this.electingFollowers)) {
                this.electionFinished = true;
                this.electingFollowers.notifyAll();
            } else {
                long start;
                long cur = start = Time.currentElapsedTime();
                long end = start + (long)(this.self.getInitLimit() * this.self.getTickTime());
                while (!this.electionFinished && cur < end) {
                    this.electingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (!this.electionFinished) {
                    throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
                }
            }
        }
    }

    private String getSidSetString(Set<Long> sidSet) {
        StringBuilder sids = new StringBuilder();
        Iterator<Long> iter = sidSet.iterator();
        while (iter.hasNext()) {
            sids.append(iter.next());
            if (!iter.hasNext()) break;
            sids.append(",");
        }
        return sids.toString();
    }

    private synchronized void startZkServer() {
        this.lastCommitted = this.zk.getZxid();
        LOG.info("Have quorum of supporters, sids: [ " + this.newLeaderProposal.ackSetsToString() + " ]; starting up and setting last processed zxid: 0x{}", (Object)Long.toHexString(this.zk.getZxid()));
        QuorumVerifier newQV = this.self.getLastSeenQuorumVerifier();
        Long designatedLeader = this.getDesignatedLeader(this.newLeaderProposal, this.zk.getZxid());
        this.self.processReconfig(newQV, designatedLeader, this.zk.getZxid(), true);
        if (designatedLeader.longValue() != this.self.getId()) {
            this.allowedToCommit = false;
        }
        this.zk.startup();
        this.self.updateElectionVote(this.getEpoch());
        this.zk.getZKDatabase().setlastProcessedZxid(this.zk.getZxid());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForNewLeaderAck(long sid, long zxid, QuorumPeer.LearnerType learnerType) throws InterruptedException {
        ArrayList arrayList = this.newLeaderProposal.qvAcksetPairs;
        synchronized (arrayList) {
            if (this.quorumFormed) {
                return;
            }
            long currentZxid = this.newLeaderProposal.packet.getZxid();
            if (zxid != currentZxid) {
                LOG.error("NEWLEADER ACK from sid: " + sid + " is from a different epoch - current 0x" + Long.toHexString(currentZxid) + " receieved 0x" + Long.toHexString(zxid));
                return;
            }
            this.newLeaderProposal.addAck(sid);
            if (this.newLeaderProposal.hasAllQuorums()) {
                this.quorumFormed = true;
                this.newLeaderProposal.qvAcksetPairs.notifyAll();
            } else {
                long start;
                long cur = start = Time.currentElapsedTime();
                long end = start + (long)(this.self.getInitLimit() * this.self.getTickTime());
                while (!this.quorumFormed && cur < end) {
                    this.newLeaderProposal.qvAcksetPairs.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (!this.quorumFormed) {
                    throw new InterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");
                }
            }
        }
    }

    public static String getPacketType(int packetType) {
        switch (packetType) {
            case 13: {
                return "DIFF";
            }
            case 14: {
                return "TRUNC";
            }
            case 15: {
                return "SNAP";
            }
            case 16: {
                return "OBSERVERINFO";
            }
            case 10: {
                return "NEWLEADER";
            }
            case 11: {
                return "FOLLOWERINFO";
            }
            case 12: {
                return "UPTODATE";
            }
            case 17: {
                return "LEADERINFO";
            }
            case 18: {
                return "ACKEPOCH";
            }
            case 1: {
                return "REQUEST";
            }
            case 2: {
                return "PROPOSAL";
            }
            case 3: {
                return "ACK";
            }
            case 4: {
                return "COMMIT";
            }
            case 9: {
                return "COMMITANDACTIVATE";
            }
            case 5: {
                return "PING";
            }
            case 6: {
                return "REVALIDATE";
            }
            case 7: {
                return "SYNC";
            }
            case 8: {
                return "INFORM";
            }
            case 19: {
                return "INFORMANDACTIVATE";
            }
        }
        return "UNKNOWN";
    }

    static {
        LOG.info("TCP NoDelay set to: " + nodelay);
        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
        LOG.info("zookeeper.leader.maxConcurrentSnapshots = " + maxConcurrentSnapshots);
        maxConcurrentSnapshotTimeout = Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5L);
        LOG.info("zookeeper.leader.maxConcurrentSnapshotTimeout = " + maxConcurrentSnapshotTimeout);
    }

    public static class XidRolloverException
    extends Exception {
        public XidRolloverException(String message) {
            super(message);
        }
    }

    static class ToBeAppliedRequestProcessor
    implements RequestProcessor {
        private final RequestProcessor next;
        private final Leader leader;

        ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
            if (!(next instanceof FinalRequestProcessor)) {
                throw new RuntimeException(ToBeAppliedRequestProcessor.class.getName() + " must be connected to " + FinalRequestProcessor.class.getName() + " not " + next.getClass().getName());
            }
            this.leader = leader;
            this.next = next;
        }

        @Override
        public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
            this.next.processRequest(request);
            if (request.getHdr() != null) {
                long zxid = request.getHdr().getZxid();
                Iterator iter = this.leader.toBeApplied.iterator();
                if (iter.hasNext()) {
                    Proposal p = (Proposal)iter.next();
                    if (p.request != null && p.request.zxid == zxid) {
                        iter.remove();
                        return;
                    }
                }
                LOG.error("Committed request not found on toBeApplied: " + request);
            }
        }

        @Override
        public void shutdown() {
            LOG.info("Shutting down");
            this.next.shutdown();
        }
    }

    class LearnerCnxAcceptor
    extends ZooKeeperCriticalThread {
        private volatile boolean stop;

        public LearnerCnxAcceptor() {
            super("LearnerCnxAcceptor-" + Leader.this.ss.getLocalSocketAddress(), Leader.this.zk.getZooKeeperServerListener());
            this.stop = false;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (!this.stop) {
                    try {
                        Socket s = Leader.this.ss.accept();
                        s.setSoTimeout(Leader.this.self.tickTime * Leader.this.self.initLimit);
                        s.setTcpNoDelay(nodelay);
                        LearnerHandler fh = new LearnerHandler(s, Leader.this);
                        fh.start();
                    }
                    catch (SocketException e) {
                        if (!this.stop) throw e;
                        LOG.info("exception while shutting down acceptor: " + e);
                        this.stop = true;
                        continue;
                        return;
                    }
                }
            }
            catch (Exception e) {
                LOG.warn("Exception while accepting follower", (Object)e.getMessage());
                this.handleException(this.getName(), e);
            }
        }

        public void halt() {
            this.stop = true;
        }
    }

    public static class Proposal
    extends SyncedLearnerTracker {
        public QuorumPacket packet;
        public Request request;

        public String toString() {
            return this.packet.getType() + ", " + this.packet.getZxid() + ", " + this.request;
        }
    }
}

