package com.baidu.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.Replicator;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/backend/store/raft/RaftNode.class */
public class RaftNode {
    private static final Logger LOG;
    private final RaftSharedContext context;
    private final Node node;
    private final StoreStateMachine stateMachine;
    private final AtomicReference<LeaderInfo> leaderInfo;
    private final AtomicBoolean started;
    private final AtomicInteger busyCounter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/hugegraph/backend/store/raft/RaftNode$LeaderInfo.class */
    public static class LeaderInfo {
        private static final LeaderInfo NO_LEADER = new LeaderInfo(null, false);
        private final PeerId leaderId;
        private final boolean selfIsLeader;

        public LeaderInfo(PeerId peerId, boolean z) {
            this.leaderId = peerId;
            this.selfIsLeader = z;
        }
    }

    /* loaded from: input_file:com/baidu/hugegraph/backend/store/raft/RaftNode$RaftNodeStateListener.class */
    private class RaftNodeStateListener implements Replicator.ReplicatorStateListener {
        private static final long ERROR_PRINT_INTERVAL = 60000;
        private volatile long lastPrintTime = 0;

        public RaftNodeStateListener() {
        }

        public void onCreated(PeerId peerId) {
            RaftNode.LOG.info("The node {} replicator has created", peerId);
        }

        public void onError(PeerId peerId, Status status) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastPrintTime >= ERROR_PRINT_INTERVAL) {
                RaftNode.LOG.warn("Replicator meet error: {}", status);
                this.lastPrintTime = currentTimeMillis;
            }
            if (isWriteBufferOverflow(status)) {
                RaftNode.LOG.info("Increase busy counter: [{}]", Integer.valueOf(RaftNode.this.busyCounter.incrementAndGet()));
            }
        }

        private boolean isWriteBufferOverflow(Status status) {
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains("maybe write overflow");
        }

        private boolean isRpcTimeout(Status status) {
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains("Invoke timeout");
        }

        public void onDestroyed(PeerId peerId) {
            RaftNode.LOG.warn("Replicator {} prepare to offline", peerId);
        }
    }

    public RaftNode(RaftSharedContext raftSharedContext) {
        this.context = raftSharedContext;
        this.stateMachine = new StoreStateMachine(raftSharedContext);
        try {
            this.node = initRaftNode();
            this.node.addReplicatorStateListener(new RaftNodeStateListener());
            this.leaderInfo = new AtomicReference<>(LeaderInfo.NO_LEADER);
            this.started = new AtomicBoolean(false);
            this.busyCounter = new AtomicInteger();
        } catch (IOException e) {
            throw new BackendException("Failed to init raft node", e);
        }
    }

    public RaftSharedContext context() {
        return this.context;
    }

    public Node node() {
        return this.node;
    }

    public PeerId nodeId() {
        return this.node.getNodeId().getPeerId();
    }

    public PeerId leaderId() {
        return this.leaderInfo.get().leaderId;
    }

    public boolean selfIsLeader() {
        return this.leaderInfo.get().selfIsLeader;
    }

    public void onLeaderInfoChange(PeerId peerId, boolean z) {
        this.leaderInfo.set(new LeaderInfo(peerId != null ? peerId.copy() : null, z));
    }

    public void shutdown() {
        this.node.shutdown();
    }

    public void snapshot() {
        if (this.context.useSnapshot()) {
            RaftClosure raftClosure = new RaftClosure();
            try {
                node().snapshot(raftClosure);
                raftClosure.waitFinished();
            } catch (Throwable th) {
                throw new BackendException("Failed to generate snapshot", th);
            }
        }
    }

    private Node initRaftNode() throws IOException {
        NodeOptions nodeOptions = this.context.nodeOptions();
        nodeOptions.setFsm(this.stateMachine);
        return new RaftGroupService(this.context.group(), this.context.endpoint(), nodeOptions, this.context.rpcServer(), true).start(false);
    }

    private void submitCommand(StoreCommand storeCommand, StoreClosure storeClosure) {
        LeaderInfo waitLeaderElected = waitLeaderElected(-1);
        if (!waitLeaderElected.selfIsLeader) {
            this.context.rpcForwarder().forwardToLeader(waitLeaderElected.leaderId, storeCommand, storeClosure);
            return;
        }
        waitIfBusy();
        Task task = new Task();
        task.setDone(storeClosure);
        ByteBuffer asByteBuffer = LZ4Util.compress(storeCommand.data(), RaftSharedContext.BLOCK_SIZE).forReadWritten().asByteBuffer();
        LOG.debug("The bytes size of command(compressed) {} is {}", storeCommand.action(), Integer.valueOf(asByteBuffer.limit()));
        task.setData(asByteBuffer);
        LOG.debug("submit to raft node {}", this.node);
        this.node.apply(task);
    }

    public Object submitAndWait(StoreCommand storeCommand, StoreClosure storeClosure) {
        submitCommand(storeCommand, storeClosure);
        try {
            return storeClosure.waitFinished();
        } catch (Throwable th) {
            throw new BackendException("Failed to wait store command %s", th, storeCommand);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LeaderInfo waitLeaderElected(int i) {
        String group = this.context.group();
        LeaderInfo leaderInfo = this.leaderInfo.get();
        if (leaderInfo.leaderId != null) {
            return leaderInfo;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (leaderInfo.leaderId == null) {
            try {
                Thread.sleep(3000L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (i > 0 && currentTimeMillis2 >= i) {
                    throw new BackendException("Waiting for raft group '%s' election timeout(%sms)", group, Long.valueOf(currentTimeMillis2));
                }
                LOG.warn("Waiting for raft group '{}' election cost {}s", group, Double.valueOf(currentTimeMillis2 / 1000.0d));
                leaderInfo = this.leaderInfo.get();
                if (!$assertionsDisabled && leaderInfo == null) {
                    throw new AssertionError();
                }
            } catch (InterruptedException e) {
                throw new BackendException("Waiting for raft group '%s' election is interrupted", e, group);
            }
        }
        return leaderInfo;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitStarted(int i) {
        String group = this.context.group();
        ReadIndexClosure readIndexClosure = new ReadIndexClosure() { // from class: com.baidu.hugegraph.backend.store.raft.RaftNode.1
            public void run(Status status, long j, byte[] bArr) {
                RaftNode.this.started.set(status.isOk());
            }
        };
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            this.node.readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
            if (this.started.get()) {
                return;
            }
            try {
                Thread.sleep(3000L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (i > 0 && currentTimeMillis2 >= i) {
                    throw new BackendException("Waiting for raft group '%s' heartbeat timeout(%sms)", group, Long.valueOf(currentTimeMillis2));
                }
                LOG.warn("Waiting for raft group '{}' heartbeat cost {}s", group, Double.valueOf(currentTimeMillis2 / 1000.0d));
            } catch (InterruptedException e) {
                throw new BackendException("Try to sleep a while for waiting heartbeat is interrupted", e);
            }
        }
    }

    private void waitIfBusy() {
        int i = this.busyCounter.get();
        if (i <= 0) {
            return;
        }
        long j = i * 3000;
        LOG.info("The node {} will sleep {} ms", this.node, Long.valueOf(j));
        try {
            try {
                Thread.sleep(j);
                if (this.busyCounter.get() > 0) {
                    synchronized (this) {
                        if (this.busyCounter.get() > 0) {
                            LOG.info("Decrease busy counter: [{}]", Integer.valueOf(this.busyCounter.decrementAndGet()));
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new BackendException("The raft backend store is busy");
            }
        } catch (Throwable th) {
            if (this.busyCounter.get() > 0) {
                synchronized (this) {
                    if (this.busyCounter.get() > 0) {
                        LOG.info("Decrease busy counter: [{}]", Integer.valueOf(this.busyCounter.decrementAndGet()));
                    }
                }
            }
            throw th;
        }
    }

    public String toString() {
        return String.format("[%s-%s]", this.context.group(), nodeId());
    }

    static {
        $assertionsDisabled = !RaftNode.class.desiredAssertionStatus();
        LOG = Log.logger(RaftNode.class);
    }
}
