package com.baidu.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests;
import com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder;
import com.baidu.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Events;
import com.baidu.hugegraph.util.Log;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/backend/store/raft/RaftSharedContext.class */
public final class RaftSharedContext {
    private static final Logger LOG;
    public static final int NO_TIMEOUT = -1;
    public static final int POLL_INTERVAL = 3000;
    public static final int WAIT_RAFT_LOG_TIMEOUT = 1800000;
    public static final int WAIT_LEADER_TIMEOUT = 300000;
    public static final int BUSY_SLEEP_FACTOR = 3000;
    public static final int WAIT_RPC_TIMEOUT = 1800000;
    public static final int BLOCK_SIZE = 4096;
    public static final String DEFAULT_GROUP = "default";
    private final HugeGraphParams params;
    private final String schemaStoreName;
    private final String graphStoreName;
    private final String systemStoreName;
    private final RaftBackendStore[] stores;
    private final RpcServer rpcServer;
    private final ExecutorService readIndexExecutor;
    private final ExecutorService snapshotExecutor;
    private final ExecutorService backendExecutor;
    private RaftNode raftNode;
    private RaftGroupManager raftGroupManager;
    private RpcForwarder rpcForwarder;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RaftSharedContext(HugeGraphParams hugeGraphParams) {
        this.params = hugeGraphParams;
        HugeConfig configuration = hugeGraphParams.configuration();
        this.schemaStoreName = (String) configuration.get(CoreOptions.STORE_SCHEMA);
        this.graphStoreName = (String) configuration.get(CoreOptions.STORE_GRAPH);
        this.systemStoreName = (String) configuration.get(CoreOptions.STORE_SYSTEM);
        this.stores = new RaftBackendStore[RaftRequests.StoreType.ALL.getNumber()];
        this.rpcServer = initAndStartRpcServer();
        if (((Boolean) configuration.get(CoreOptions.RAFT_SAFE_READ)).booleanValue()) {
            this.readIndexExecutor = createReadIndexExecutor(((Integer) configuration.get(CoreOptions.RAFT_READ_INDEX_THREADS)).intValue());
        } else {
            this.readIndexExecutor = null;
        }
        if (((Boolean) configuration.get(CoreOptions.RAFT_USE_SNAPSHOT)).booleanValue()) {
            this.snapshotExecutor = createSnapshotExecutor(4);
        } else {
            this.snapshotExecutor = null;
        }
        this.backendExecutor = createBackendExecutor(((Integer) configuration.get(CoreOptions.RAFT_BACKEND_THREADS)).intValue());
        this.raftNode = null;
        this.raftGroupManager = null;
        this.rpcForwarder = null;
        registerRpcRequestProcessors();
    }

    private void registerRpcRequestProcessors() {
        this.rpcServer.registerProcessor(new StoreCommandProcessor(this));
        this.rpcServer.registerProcessor(new SetLeaderProcessor(this));
        this.rpcServer.registerProcessor(new ListPeersProcessor(this));
    }

    public void initRaftNode() {
        this.raftNode = new RaftNode(this);
        this.rpcForwarder = new RpcForwarder(this.raftNode);
        this.raftGroupManager = new RaftGroupManagerImpl(this);
    }

    public void waitRaftNodeStarted() {
        RaftNode node = node();
        node.waitLeaderElected(WAIT_LEADER_TIMEOUT);
        if (node.selfIsLeader()) {
            node.waitStarted(-1);
        }
    }

    public void close() {
        LOG.info("Stopping raft nodes");
        this.rpcServer.shutdown();
    }

    public RaftNode node() {
        return this.raftNode;
    }

    public RpcForwarder rpcForwarder() {
        return this.rpcForwarder;
    }

    public RaftGroupManager raftNodeManager(String str) {
        E.checkArgument(DEFAULT_GROUP.equals(str), "The group must be '%s' now, actual is '%s'", new Object[]{DEFAULT_GROUP, str});
        return this.raftGroupManager;
    }

    public RpcServer rpcServer() {
        return this.rpcServer;
    }

    public String group() {
        return DEFAULT_GROUP;
    }

    public void addStore(RaftRequests.StoreType storeType, RaftBackendStore raftBackendStore) {
        this.stores[storeType.getNumber()] = raftBackendStore;
    }

    public RaftRequests.StoreType storeType(String str) {
        if (this.schemaStoreName.equals(str)) {
            return RaftRequests.StoreType.SCHEMA;
        }
        if (this.graphStoreName.equals(str)) {
            return RaftRequests.StoreType.GRAPH;
        }
        if ($assertionsDisabled || this.systemStoreName.equals(str)) {
            return RaftRequests.StoreType.SYSTEM;
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RaftBackendStore[] stores() {
        return this.stores;
    }

    public BackendStore originStore(RaftRequests.StoreType storeType) {
        RaftBackendStore raftBackendStore = this.stores[storeType.getNumber()];
        E.checkState(raftBackendStore != null, "The raft store of type %s shouldn't be null", new Object[]{storeType});
        return raftBackendStore.originStore();
    }

    public NodeOptions nodeOptions() throws IOException {
        HugeConfig config = config();
        new PeerId().parse((String) config.get(CoreOptions.RAFT_ENDPOINT));
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setEnableMetrics(false);
        nodeOptions.setRpcProcessorThreadPoolSize(((Integer) config.get(CoreOptions.RAFT_RPC_THREADS)).intValue());
        nodeOptions.setRpcConnectTimeoutMs(((Integer) config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)).intValue());
        nodeOptions.setRpcDefaultTimeout(((Integer) config.get(CoreOptions.RAFT_RPC_TIMEOUT)).intValue());
        nodeOptions.setElectionTimeoutMs(((Integer) config.get(CoreOptions.RAFT_ELECTION_TIMEOUT)).intValue());
        nodeOptions.setDisableCli(false);
        nodeOptions.setSnapshotIntervalSecs(((Integer) config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL)).intValue());
        Configuration configuration = new Configuration();
        String str = (String) config.get(CoreOptions.RAFT_GROUP_PEERS);
        if (!configuration.parse(str)) {
            throw new HugeException("Failed to parse group peers %s", str);
        }
        nodeOptions.setInitialConf(configuration);
        String str2 = (String) config.get(CoreOptions.RAFT_PATH);
        String path = Paths.get(str2, "log").toString();
        FileUtils.forceMkdir(new File(path));
        nodeOptions.setLogUri(path);
        String path2 = Paths.get(str2, "meta").toString();
        FileUtils.forceMkdir(new File(path2));
        nodeOptions.setRaftMetaUri(path2);
        if (((Boolean) config.get(CoreOptions.RAFT_USE_SNAPSHOT)).booleanValue()) {
            String path3 = Paths.get(str2, "snapshot").toString();
            FileUtils.forceMkdir(new File(path3));
            nodeOptions.setSnapshotUri(path3);
        }
        RaftOptions raftOptions = nodeOptions.getRaftOptions();
        raftOptions.setApplyBatch(((Integer) config.get(CoreOptions.RAFT_APPLY_BATCH)).intValue());
        raftOptions.setDisruptorBufferSize(((Integer) config.get(CoreOptions.RAFT_QUEUE_SIZE)).intValue());
        raftOptions.setDisruptorPublishEventWaitTimeoutSecs(((Integer) config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT)).intValue());
        raftOptions.setReplicatorPipeline(((Boolean) config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE)).booleanValue());
        raftOptions.setOpenStatistics(false);
        return nodeOptions;
    }

    public void clearCache() {
        notifyCache("clear", HugeType.VERTEX_LABEL, null);
        notifyCache("clear", HugeType.VERTEX, null);
    }

    public void notifyCache(String str, HugeType hugeType, Id id) {
        EventHub schemaEventHub;
        if (hugeType.isGraph()) {
            schemaEventHub = this.params.graphEventHub();
        } else if (!hugeType.isSchema()) {
            return;
        } else {
            schemaEventHub = this.params.schemaEventHub();
        }
        try {
            schemaEventHub.notify(Events.CACHE, new Object[]{str, hugeType, id});
        } catch (RejectedExecutionException e) {
            LOG.warn("Can't update cache due to EventHub is too busy");
        }
    }

    public PeerId endpoint() {
        PeerId peerId = new PeerId();
        String str = (String) config().get(CoreOptions.RAFT_ENDPOINT);
        if (peerId.parse(str)) {
            return peerId;
        }
        throw new HugeException("Failed to parse endpoint %s", str);
    }

    public boolean isSafeRead() {
        return ((Boolean) config().get(CoreOptions.RAFT_SAFE_READ)).booleanValue();
    }

    public boolean useSnapshot() {
        return ((Boolean) config().get(CoreOptions.RAFT_USE_SNAPSHOT)).booleanValue();
    }

    public ExecutorService snapshotExecutor() {
        return this.snapshotExecutor;
    }

    public ExecutorService backendExecutor() {
        return this.backendExecutor;
    }

    public GraphMode graphMode() {
        return this.params.mode();
    }

    private HugeConfig config() {
        return this.params.configuration();
    }

    private RpcServer initAndStartRpcServer() {
        Whitebox.setInternalState(BoltRaftRpcFactory.class, "CHANNEL_WRITE_BUF_LOW_WATER_MARK", config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK));
        Whitebox.setInternalState(BoltRaftRpcFactory.class, "CHANNEL_WRITE_BUF_HIGH_WATER_MARK", config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK));
        PeerId peerId = new PeerId();
        peerId.parse((String) config().get(CoreOptions.RAFT_ENDPOINT));
        RpcServer createAndStartRaftRpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(peerId.getEndpoint());
        LOG.info("RPC server is started successfully");
        return createAndStartRaftRpcServer;
    }

    private ExecutorService createReadIndexExecutor(int i) {
        return newPool(i, i << 2, "store-read-index-callback", new ThreadPoolExecutor.AbortPolicy());
    }

    private ExecutorService createSnapshotExecutor(int i) {
        return newPool(i, i << 2, "store-snapshot-executor", new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private ExecutorService createBackendExecutor(int i) {
        return newPool(i, i, "store-backend-executor", new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static ExecutorService newPool(int i, int i2, String str, RejectedExecutionHandler rejectedExecutionHandler) {
        return ThreadPoolUtil.newBuilder().poolName(str).enableMetric(false).coreThreads(Integer.valueOf(i)).maximumThreads(Integer.valueOf(i2)).keepAliveSeconds(300L).workQueue(new LinkedBlockingQueue()).threadFactory(new NamedThreadFactory(str, true)).rejectedHandler(rejectedExecutionHandler).build();
    }

    static {
        $assertionsDisabled = !RaftSharedContext.class.desiredAssertionStatus();
        LOG = Log.logger(RaftSharedContext.class);
    }
}
