/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftBackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftGroupManager;
import com.baidu.hugegraph.backend.store.raft.RaftGroupManagerImpl;
import com.baidu.hugegraph.backend.store.raft.RaftNode;
import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests;
import com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder;
import com.baidu.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;

public final class RaftSharedContext {
    private static final Logger LOG = Log.logger(RaftSharedContext.class);
    public static final int NO_TIMEOUT = -1;
    public static final int POLL_INTERVAL = 3000;
    public static final int WAIT_RAFT_LOG_TIMEOUT = 1800000;
    public static final int WAIT_LEADER_TIMEOUT = 300000;
    public static final int BUSY_SLEEP_FACTOR = 3000;
    public static final int WAIT_RPC_TIMEOUT = 1800000;
    public static final int BLOCK_SIZE = 4096;
    public static final String DEFAULT_GROUP = "default";
    private final HugeGraphParams params;
    private final String schemaStoreName;
    private final String graphStoreName;
    private final String systemStoreName;
    private final RaftBackendStore[] stores;
    private final RpcServer rpcServer;
    private final ExecutorService readIndexExecutor;
    private final ExecutorService snapshotExecutor;
    private final ExecutorService backendExecutor;
    private RaftNode raftNode;
    private RaftGroupManager raftGroupManager;
    private RpcForwarder rpcForwarder;

    public RaftSharedContext(HugeGraphParams params) {
        this.params = params;
        HugeConfig config = params.configuration();
        this.schemaStoreName = (String)config.get(CoreOptions.STORE_SCHEMA);
        this.graphStoreName = (String)config.get(CoreOptions.STORE_GRAPH);
        this.systemStoreName = (String)config.get(CoreOptions.STORE_SYSTEM);
        this.stores = new RaftBackendStore[RaftRequests.StoreType.ALL.getNumber()];
        this.rpcServer = this.initAndStartRpcServer();
        if (((Boolean)config.get(CoreOptions.RAFT_SAFE_READ)).booleanValue()) {
            int readIndexThreads = (Integer)config.get(CoreOptions.RAFT_READ_INDEX_THREADS);
            this.readIndexExecutor = this.createReadIndexExecutor(readIndexThreads);
        } else {
            this.readIndexExecutor = null;
        }
        this.snapshotExecutor = (Boolean)config.get(CoreOptions.RAFT_USE_SNAPSHOT) != false ? this.createSnapshotExecutor(4) : null;
        int backendThreads = (Integer)config.get(CoreOptions.RAFT_BACKEND_THREADS);
        this.backendExecutor = this.createBackendExecutor(backendThreads);
        this.raftNode = null;
        this.raftGroupManager = null;
        this.rpcForwarder = null;
        this.registerRpcRequestProcessors();
    }

    private void registerRpcRequestProcessors() {
        this.rpcServer.registerProcessor((RpcProcessor)new StoreCommandProcessor(this));
        this.rpcServer.registerProcessor((RpcProcessor)new SetLeaderProcessor(this));
        this.rpcServer.registerProcessor((RpcProcessor)new ListPeersProcessor(this));
    }

    public void initRaftNode() {
        this.raftNode = new RaftNode(this);
        this.rpcForwarder = new RpcForwarder(this.raftNode);
        this.raftGroupManager = new RaftGroupManagerImpl(this);
    }

    public void waitRaftNodeStarted() {
        RaftNode node = this.node();
        node.waitLeaderElected(300000);
        if (node.selfIsLeader()) {
            node.waitStarted(-1);
        }
    }

    public void close() {
        LOG.info("Stopping raft nodes");
        this.rpcServer.shutdown();
    }

    public RaftNode node() {
        return this.raftNode;
    }

    public RpcForwarder rpcForwarder() {
        return this.rpcForwarder;
    }

    public RaftGroupManager raftNodeManager(String group) {
        E.checkArgument((boolean)DEFAULT_GROUP.equals(group), (String)"The group must be '%s' now, actual is '%s'", (Object[])new Object[]{DEFAULT_GROUP, group});
        return this.raftGroupManager;
    }

    public RpcServer rpcServer() {
        return this.rpcServer;
    }

    public String group() {
        return DEFAULT_GROUP;
    }

    public void addStore(RaftRequests.StoreType type, RaftBackendStore store) {
        this.stores[type.getNumber()] = store;
    }

    public RaftRequests.StoreType storeType(String store) {
        if (this.schemaStoreName.equals(store)) {
            return RaftRequests.StoreType.SCHEMA;
        }
        if (this.graphStoreName.equals(store)) {
            return RaftRequests.StoreType.GRAPH;
        }
        assert (this.systemStoreName.equals(store));
        return RaftRequests.StoreType.SYSTEM;
    }

    protected RaftBackendStore[] stores() {
        return this.stores;
    }

    public BackendStore originStore(RaftRequests.StoreType storeType) {
        RaftBackendStore raftStore = this.stores[storeType.getNumber()];
        E.checkState((raftStore != null ? 1 : 0) != 0, (String)"The raft store of type %s shouldn't be null", (Object[])new Object[]{storeType});
        return raftStore.originStore();
    }

    public NodeOptions nodeOptions() throws IOException {
        HugeConfig config = this.config();
        PeerId selfId = new PeerId();
        selfId.parse((String)config.get(CoreOptions.RAFT_ENDPOINT));
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setEnableMetrics(false);
        nodeOptions.setRpcProcessorThreadPoolSize(((Integer)config.get(CoreOptions.RAFT_RPC_THREADS)).intValue());
        nodeOptions.setRpcConnectTimeoutMs(((Integer)config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)).intValue());
        nodeOptions.setRpcDefaultTimeout(((Integer)config.get(CoreOptions.RAFT_RPC_TIMEOUT)).intValue());
        int electionTimeout = (Integer)config.get(CoreOptions.RAFT_ELECTION_TIMEOUT);
        nodeOptions.setElectionTimeoutMs(electionTimeout);
        nodeOptions.setDisableCli(false);
        int snapshotInterval = (Integer)config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL);
        nodeOptions.setSnapshotIntervalSecs(snapshotInterval);
        Configuration groupPeers = new Configuration();
        String groupPeersStr = (String)config.get(CoreOptions.RAFT_GROUP_PEERS);
        if (!groupPeers.parse(groupPeersStr)) {
            throw new HugeException("Failed to parse group peers %s", groupPeersStr);
        }
        nodeOptions.setInitialConf(groupPeers);
        String raftPath = (String)config.get(CoreOptions.RAFT_PATH);
        String logUri = Paths.get(raftPath, "log").toString();
        FileUtils.forceMkdir((File)new File(logUri));
        nodeOptions.setLogUri(logUri);
        String metaUri = Paths.get(raftPath, "meta").toString();
        FileUtils.forceMkdir((File)new File(metaUri));
        nodeOptions.setRaftMetaUri(metaUri);
        if (((Boolean)config.get(CoreOptions.RAFT_USE_SNAPSHOT)).booleanValue()) {
            String snapshotUri = Paths.get(raftPath, "snapshot").toString();
            FileUtils.forceMkdir((File)new File(snapshotUri));
            nodeOptions.setSnapshotUri(snapshotUri);
        }
        RaftOptions raftOptions = nodeOptions.getRaftOptions();
        raftOptions.setApplyBatch(((Integer)config.get(CoreOptions.RAFT_APPLY_BATCH)).intValue());
        raftOptions.setDisruptorBufferSize(((Integer)config.get(CoreOptions.RAFT_QUEUE_SIZE)).intValue());
        raftOptions.setDisruptorPublishEventWaitTimeoutSecs(((Integer)config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT)).intValue());
        raftOptions.setReplicatorPipeline(((Boolean)config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE)).booleanValue());
        raftOptions.setOpenStatistics(false);
        return nodeOptions;
    }

    public void clearCache() {
        this.notifyCache("clear", HugeType.VERTEX_LABEL, null);
        this.notifyCache("clear", HugeType.VERTEX, null);
    }

    public void notifyCache(String action, HugeType type, Id id) {
        EventHub eventHub;
        if (type.isGraph()) {
            eventHub = this.params.graphEventHub();
        } else if (type.isSchema()) {
            eventHub = this.params.schemaEventHub();
        } else {
            return;
        }
        try {
            eventHub.notify("cache", new Object[]{action, type, id});
        }
        catch (RejectedExecutionException e) {
            LOG.warn("Can't update cache due to EventHub is too busy");
        }
    }

    public PeerId endpoint() {
        PeerId endpoint = new PeerId();
        String endpointStr = (String)this.config().get(CoreOptions.RAFT_ENDPOINT);
        if (!endpoint.parse(endpointStr)) {
            throw new HugeException("Failed to parse endpoint %s", endpointStr);
        }
        return endpoint;
    }

    public boolean isSafeRead() {
        return (Boolean)this.config().get(CoreOptions.RAFT_SAFE_READ);
    }

    public boolean useSnapshot() {
        return (Boolean)this.config().get(CoreOptions.RAFT_USE_SNAPSHOT);
    }

    public ExecutorService snapshotExecutor() {
        return this.snapshotExecutor;
    }

    public ExecutorService backendExecutor() {
        return this.backendExecutor;
    }

    public GraphMode graphMode() {
        return this.params.mode();
    }

    private HugeConfig config() {
        return this.params.configuration();
    }

    private RpcServer initAndStartRpcServer() {
        Whitebox.setInternalState(BoltRaftRpcFactory.class, (String)"CHANNEL_WRITE_BUF_LOW_WATER_MARK", (Object)this.config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK));
        Whitebox.setInternalState(BoltRaftRpcFactory.class, (String)"CHANNEL_WRITE_BUF_HIGH_WATER_MARK", (Object)this.config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK));
        PeerId serverId = new PeerId();
        serverId.parse((String)this.config().get(CoreOptions.RAFT_ENDPOINT));
        RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer((Endpoint)serverId.getEndpoint());
        LOG.info("RPC server is started successfully");
        return rpcServer;
    }

    private ExecutorService createReadIndexExecutor(int coreThreads) {
        int maxThreads = coreThreads << 2;
        String name = "store-read-index-callback";
        ThreadPoolExecutor.AbortPolicy handler = new ThreadPoolExecutor.AbortPolicy();
        return RaftSharedContext.newPool(coreThreads, maxThreads, name, handler);
    }

    private ExecutorService createSnapshotExecutor(int coreThreads) {
        int maxThreads = coreThreads << 2;
        String name = "store-snapshot-executor";
        ThreadPoolExecutor.CallerRunsPolicy handler = new ThreadPoolExecutor.CallerRunsPolicy();
        return RaftSharedContext.newPool(coreThreads, maxThreads, name, handler);
    }

    private ExecutorService createBackendExecutor(int threads) {
        String name = "store-backend-executor";
        ThreadPoolExecutor.CallerRunsPolicy handler = new ThreadPoolExecutor.CallerRunsPolicy();
        return RaftSharedContext.newPool(threads, threads, name, handler);
    }

    private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) {
        LinkedBlockingQueue workQueue = new LinkedBlockingQueue();
        return ThreadPoolUtil.newBuilder().poolName(name).enableMetric(Boolean.valueOf(false)).coreThreads(Integer.valueOf(coreThreads)).maximumThreads(Integer.valueOf(maxThreads)).keepAliveSeconds(Long.valueOf(300L)).workQueue(workQueue).threadFactory((ThreadFactory)new NamedThreadFactory(name, true)).rejectedHandler(handler).build();
    }
}

