package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.class */
public class MasterSchedulerThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class);
    private final ExecutorService masterExecService;
    private final ProcessDao processDao;
    private final ZKMasterClient zkMasterClient;
    private int masterExecThreadNum;
    private final Configuration conf;

    public MasterSchedulerThread(ZKMasterClient zKMasterClient, ProcessDao processDao, Configuration configuration, int i) {
        this.processDao = processDao;
        this.zkMasterClient = zKMasterClient;
        this.conf = configuration;
        this.masterExecThreadNum = i;
        this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", i);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (Stopper.isRunning()) {
            InterProcessMutex interProcessMutex = null;
            try {
                if (OSUtils.checkResource(this.conf, true).booleanValue() && this.zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                    interProcessMutex = new InterProcessMutex(this.zkMasterClient.getZkClient(), this.zkMasterClient.getMasterLockPath());
                    interProcessMutex.acquire();
                    int activeCount = ((ThreadPoolExecutor) this.masterExecService).getActiveCount();
                    Command findOneCommand = this.processDao.findOneCommand();
                    if (findOneCommand != null) {
                        logger.info(String.format("find one command: id: %d, type: %s", Integer.valueOf(findOneCommand.getId()), findOneCommand.getCommandType().toString()));
                        int parentCommandId = findOneCommand.getParentCommandId();
                        String subId = findOneCommand.getSubId();
                        try {
                            ProcessInstance handleCommand = this.processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, findOneCommand);
                            if (handleCommand != null) {
                                logger.info("start master exec thread , split DAG ...");
                                if (parentCommandId != -1) {
                                    handleCommand.setCommandId(parentCommandId);
                                } else {
                                    handleCommand.setCommandId(findOneCommand.getId());
                                }
                                if (StringUtils.isNotBlank(subId)) {
                                    handleCommand.setSubId(subId);
                                }
                                this.masterExecService.execute(new MasterExecThread(handleCommand, this.processDao));
                            }
                        } catch (Exception e) {
                            logger.error("scan command error ", e);
                            this.processDao.moveToErrorCommand(findOneCommand, e.toString());
                        }
                    }
                }
                Thread.sleep(1000L);
                interProcessMutex = interProcessMutex;
            } catch (Exception e2) {
                logger.error("master scheduler thread exception : " + e2.getMessage(), e2);
            } finally {
                AbstractZKClient.releaseMutex((InterProcessMutex) null);
            }
        }
    }
}
