/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;

import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobScheduler;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuleAlteredJobSchedulerCenter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobSchedulerCenter.class);
    private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build((String)"scaling-job-persist-%d"));
    private static final GovernanceRepositoryAPI REGISTRY_REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();

    public static void start(RuleAlteredJobContext jobContext) {
        int shardingItem;
        String jobId = jobContext.getJobId();
        Map schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> Maps.newConcurrentMap());
        if (schedulerMap.containsKey(shardingItem = jobContext.getShardingItem())) {
            log.warn("schedulerMap does not contain shardingItem {}, ignore", (Object)shardingItem);
            return;
        }
        log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", (Object)jobId, (Object)shardingItem);
        RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
        jobScheduler.start();
        schedulerMap.put(shardingItem, jobScheduler);
    }

    public static void stop(String jobId) {
        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.remove(jobId);
        if (null == schedulerMap) {
            return;
        }
        for (Map.Entry<Integer, RuleAlteredJobScheduler> entry : schedulerMap.entrySet()) {
            entry.getValue().stop();
        }
    }

    public static Optional<Collection<RuleAlteredJobContext>> getJobContexts(String jobId) {
        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.get(jobId);
        if (null == schedulerMap) {
            return Optional.empty();
        }
        return Optional.of(schedulerMap.values().stream().map(RuleAlteredJobScheduler::getJobContext).collect(Collectors.toList()));
    }

    public static void persistJobProgress(RuleAlteredJobContext jobContext) {
        REGISTRY_REPOSITORY_API.persistJobProgress(jobContext);
    }

    @Generated
    private RuleAlteredJobSchedulerCenter() {
    }

    static {
        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 1L, 1L, TimeUnit.MINUTES);
    }

    private static final class PersistJobContextRunnable
    implements Runnable {
        private PersistJobContextRunnable() {
        }

        @Override
        public void run() {
            for (Map.Entry entry : JOB_SCHEDULER_MAP.entrySet()) {
                try {
                    ((Map)entry.getValue()).forEach((shardingItem, jobScheduler) -> REGISTRY_REPOSITORY_API.persistJobProgress(jobScheduler.getJobContext()));
                }
                catch (Exception ex) {
                    log.error("persist job {} context failed.", entry.getKey(), (Object)ex);
                }
            }
        }
    }
}

