/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.Replicator;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.raft.RaftClosure;
import com.baidu.hugegraph.backend.store.raft.RaftSharedContext;
import com.baidu.hugegraph.backend.store.raft.StoreClosure;
import com.baidu.hugegraph.backend.store.raft.StoreCommand;
import com.baidu.hugegraph.backend.store.raft.StoreStateMachine;
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public class RaftNode {
    private static final Logger LOG = Log.logger(RaftNode.class);
    private final RaftSharedContext context;
    private final Node node;
    private final StoreStateMachine stateMachine;
    private final AtomicReference<LeaderInfo> leaderInfo;
    private final AtomicBoolean started;
    private final AtomicInteger busyCounter;

    public RaftNode(RaftSharedContext context) {
        this.context = context;
        this.stateMachine = new StoreStateMachine(context);
        try {
            this.node = this.initRaftNode();
        }
        catch (IOException e) {
            throw new BackendException("Failed to init raft node", e);
        }
        this.node.addReplicatorStateListener((Replicator.ReplicatorStateListener)new RaftNodeStateListener());
        this.leaderInfo = new AtomicReference<LeaderInfo>(LeaderInfo.NO_LEADER);
        this.started = new AtomicBoolean(false);
        this.busyCounter = new AtomicInteger();
    }

    public RaftSharedContext context() {
        return this.context;
    }

    public Node node() {
        return this.node;
    }

    public PeerId nodeId() {
        return this.node.getNodeId().getPeerId();
    }

    public PeerId leaderId() {
        return this.leaderInfo.get().leaderId;
    }

    public boolean selfIsLeader() {
        return this.leaderInfo.get().selfIsLeader;
    }

    public void onLeaderInfoChange(PeerId leaderId, boolean selfIsLeader) {
        leaderId = leaderId != null ? leaderId.copy() : null;
        this.leaderInfo.set(new LeaderInfo(leaderId, selfIsLeader));
    }

    public void shutdown() {
        this.node.shutdown();
    }

    public void snapshot() {
        if (!this.context.useSnapshot()) {
            return;
        }
        RaftClosure future = new RaftClosure();
        try {
            this.node().snapshot(future);
            future.waitFinished();
        }
        catch (Throwable e) {
            throw new BackendException("Failed to generate snapshot", e);
        }
    }

    private Node initRaftNode() throws IOException {
        NodeOptions nodeOptions = this.context.nodeOptions();
        nodeOptions.setFsm((StateMachine)this.stateMachine);
        String groupId = this.context.group();
        PeerId endpoint = this.context.endpoint();
        RpcServer rpcServer = this.context.rpcServer();
        RaftGroupService raftGroupService = new RaftGroupService(groupId, endpoint, nodeOptions, rpcServer, true);
        return raftGroupService.start(false);
    }

    private void submitCommand(StoreCommand command, StoreClosure closure) {
        LeaderInfo leaderInfo = this.waitLeaderElected(-1);
        if (!leaderInfo.selfIsLeader) {
            this.context.rpcForwarder().forwardToLeader(leaderInfo.leaderId, command, closure);
            return;
        }
        this.waitIfBusy();
        Task task = new Task();
        task.setDone((Closure)closure);
        ByteBuffer buffer = LZ4Util.compress(command.data(), 4096).forReadWritten().asByteBuffer();
        LOG.debug("The bytes size of command(compressed) {} is {}", (Object)command.action(), (Object)buffer.limit());
        task.setData(buffer);
        LOG.debug("submit to raft node {}", (Object)this.node);
        this.node.apply(task);
    }

    public Object submitAndWait(StoreCommand command, StoreClosure future) {
        this.submitCommand(command, future);
        try {
            return future.waitFinished();
        }
        catch (Throwable e) {
            throw new BackendException("Failed to wait store command %s", e, command);
        }
    }

    protected LeaderInfo waitLeaderElected(int timeout) {
        String group = this.context.group();
        LeaderInfo leaderInfo = this.leaderInfo.get();
        if (leaderInfo.leaderId != null) {
            return leaderInfo;
        }
        long beginTime = System.currentTimeMillis();
        while (leaderInfo.leaderId == null) {
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                throw new BackendException("Waiting for raft group '%s' election is interrupted", (Throwable)e, group);
            }
            long consumedTime = System.currentTimeMillis() - beginTime;
            if (timeout > 0 && consumedTime >= (long)timeout) {
                throw new BackendException("Waiting for raft group '%s' election timeout(%sms)", group, consumedTime);
            }
            LOG.warn("Waiting for raft group '{}' election cost {}s", (Object)group, (Object)((double)consumedTime / 1000.0));
            leaderInfo = this.leaderInfo.get();
            assert (leaderInfo != null);
        }
        return leaderInfo;
    }

    protected void waitStarted(int timeout) {
        String group = this.context.group();
        ReadIndexClosure readIndexClosure = new ReadIndexClosure(){

            public void run(Status status, long index, byte[] reqCtx) {
                RaftNode.this.started.set(status.isOk());
            }
        };
        long beginTime = System.currentTimeMillis();
        while (true) {
            this.node.readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
            if (this.started.get()) break;
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                throw new BackendException("Try to sleep a while for waiting heartbeat is interrupted", e);
            }
            long consumedTime = System.currentTimeMillis() - beginTime;
            if (timeout > 0 && consumedTime >= (long)timeout) {
                throw new BackendException("Waiting for raft group '%s' heartbeat timeout(%sms)", group, consumedTime);
            }
            LOG.warn("Waiting for raft group '{}' heartbeat cost {}s", (Object)group, (Object)((double)consumedTime / 1000.0));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitIfBusy() {
        int counter = this.busyCounter.get();
        if (counter <= 0) {
            return;
        }
        long time = counter * 3000;
        LOG.info("The node {} will sleep {} ms", (Object)this.node, (Object)time);
        try {
            Thread.sleep(time);
        }
        catch (InterruptedException e) {
            throw new BackendException("The raft backend store is busy");
        }
        finally {
            if (this.busyCounter.get() > 0) {
                RaftNode raftNode = this;
                synchronized (raftNode) {
                    if (this.busyCounter.get() > 0) {
                        counter = this.busyCounter.decrementAndGet();
                        LOG.info("Decrease busy counter: [{}]", (Object)counter);
                    }
                }
            }
        }
    }

    public String toString() {
        return String.format("[%s-%s]", this.context.group(), this.nodeId());
    }

    private static class LeaderInfo {
        private static final LeaderInfo NO_LEADER = new LeaderInfo(null, false);
        private final PeerId leaderId;
        private final boolean selfIsLeader;

        public LeaderInfo(PeerId leaderId, boolean selfIsLeader) {
            this.leaderId = leaderId;
            this.selfIsLeader = selfIsLeader;
        }
    }

    private class RaftNodeStateListener
    implements Replicator.ReplicatorStateListener {
        private static final long ERROR_PRINT_INTERVAL = 60000L;
        private volatile long lastPrintTime = 0L;

        public void onCreated(PeerId peer) {
            LOG.info("The node {} replicator has created", (Object)peer);
        }

        public void onError(PeerId peer, Status status) {
            long now = System.currentTimeMillis();
            if (now - this.lastPrintTime >= 60000L) {
                LOG.warn("Replicator meet error: {}", (Object)status);
                this.lastPrintTime = now;
            }
            if (this.isWriteBufferOverflow(status)) {
                int count = RaftNode.this.busyCounter.incrementAndGet();
                LOG.info("Increase busy counter: [{}]", (Object)count);
            }
        }

        private boolean isWriteBufferOverflow(Status status) {
            String expectMsg = "maybe write overflow";
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains(expectMsg);
        }

        private boolean isRpcTimeout(Status status) {
            String expectMsg = "Invoke timeout";
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains(expectMsg);
        }

        public void onDestroyed(PeerId peer) {
            LOG.warn("Replicator {} prepare to offline", (Object)peer);
        }
    }
}

