/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.TypedSPIConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuleAlteredJobAPIImpl
extends AbstractPipelineJobAPIImpl
implements RuleAlteredJobAPI {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobAPIImpl.class);
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final Map<String, DataConsistencyCheckAlgorithm> DATA_CONSISTENCY_CHECK_ALGORITHM_MAP = new TreeMap<String, DataConsistencyCheckAlgorithm>(SingletonSPIRegistry.getTypedSingletonInstancesMap(DataConsistencyCheckAlgorithm.class));

    public boolean isDefault() {
        return false;
    }

    public List<JobInfo> list() {
        this.checkModeConfig();
        return this.getJobBriefInfos().map(each -> this.getJobInfo(each.getJobName())).collect(Collectors.toList());
    }

    private void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull((Object)modeConfig, (Object)"Mode configuration is required.");
        Preconditions.checkArgument((boolean)"Cluster".equals(modeConfig.getType()), (Object)"Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"));
    }

    private JobInfo getJobInfo(String jobName) {
        JobInfo result = new JobInfo(jobName);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(result.getJobId());
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        result.setActive(!jobConfigPOJO.isDisabled());
        result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
        result.setTables(jobConfig.getHandleConfig().getLogicTables());
        result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
        result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
        result.setJobParameter(jobConfigPOJO.getJobParameter());
        return result;
    }

    public List<Long> getUncompletedJobIds(String schemaName) {
        return this.getJobBriefInfos().filter(each -> {
            String jobId = each.getJobName();
            return this.isUncompletedJobOfSchema(schemaName, jobId);
        }).map(each -> Long.parseLong(each.getJobName())).collect(Collectors.toList());
    }

    private boolean isUncompletedJobOfSchema(String schemaName, String jobId) {
        WorkflowConfiguration workflowConfig;
        JobConfigurationPOJO jobConfigPOJO;
        try {
            jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        }
        catch (PipelineJobNotFoundException ex) {
            log.warn("scaling job not found, jobId={}", (Object)jobId);
            return false;
        }
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        HandleConfiguration handleConfig = jobConfig.getHandleConfig();
        if (null == handleConfig || null == (workflowConfig = jobConfig.getWorkflowConfig())) {
            log.warn("handleConfig or workflowConfig null, jobId={}", (Object)jobId);
            return false;
        }
        if (!schemaName.equals(workflowConfig.getSchemaName())) {
            return false;
        }
        return !jobConfigPOJO.isDisabled();
    }

    public Optional<String> start(JobConfiguration jobConfig) {
        jobConfig.buildHandleConfig();
        if (jobConfig.getHandleConfig().getJobShardingCount() == 0) {
            log.warn("Invalid scaling job config!");
            throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
        }
        log.info("Start scaling job by {}", (Object)jobConfig.getHandleConfig());
        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("%s/%s", "/scaling", jobConfig.getHandleConfig().getJobId()), RuleAlteredJob.class.getName());
        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("%s/%s/config", "/scaling", jobConfig.getHandleConfig().getJobId()), this.createJobConfig(jobConfig));
        return Optional.of(jobConfig.getHandleConfig().getJobId());
    }

    private String createJobConfig(JobConfiguration jobConfig) {
        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
        jobConfigPOJO.setJobName(jobConfig.getHandleConfig().getJobId());
        jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
        jobConfigPOJO.setJobParameter(YamlEngine.marshal((Object)jobConfig));
        jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal((Object)jobConfigPOJO);
    }

    public Map<Integer, JobProgress> getProgress(String jobId) {
        this.checkModeConfig();
        log.info("getProgress for job {}", (Object)jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.getProgress(jobConfig);
    }

    public Map<Integer, JobProgress> getProgress(JobConfiguration jobConfig) {
        String jobId = jobConfig.getHandleConfig().getJobId();
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        return IntStream.range(0, jobConfig.getHandleConfig().getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
            JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, (int)each);
            if (null != jobProgress) {
                jobProgress.setActive(!jobConfigPOJO.isDisabled());
            }
            map.put(each, jobProgress);
        }, HashMap::putAll);
    }

    public void stopClusterWriteDB(String jobId) {
        this.checkModeConfig();
    }

    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
        this.checkModeConfig();
        return DATA_CONSISTENCY_CHECK_ALGORITHM_MAP.values().stream().map(each -> {
            DataConsistencyCheckAlgorithmInfo algorithmInfo = new DataConsistencyCheckAlgorithmInfo();
            algorithmInfo.setType(each.getType());
            algorithmInfo.setDescription(each.getDescription());
            algorithmInfo.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
            algorithmInfo.setProvider(each.getProvider());
            return algorithmInfo;
        }).collect(Collectors.toList());
    }

    public boolean isDataConsistencyCheckNeeded(String jobId) {
        log.info("isDataConsistencyCheckNeeded for job {}", (Object)jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.isDataConsistencyCheckNeeded(jobConfig);
    }

    public boolean isDataConsistencyCheckNeeded(JobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        return this.isDataConsistencyCheckNeeded(ruleAlteredContext);
    }

    private boolean isDataConsistencyCheckNeeded(RuleAlteredContext ruleAlteredContext) {
        return null != ruleAlteredContext.getDataConsistencyCheckAlgorithm();
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}", (Object)jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.dataConsistencyCheck(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(JobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (!this.isDataConsistencyCheckNeeded(ruleAlteredContext)) {
            log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check is ignored.");
            return Collections.emptyMap();
        }
        return this.dataConsistencyCheck0(jobConfig, ruleAlteredContext.getDataConsistencyCheckAlgorithm());
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}, algorithmType: {}", (Object)jobId, (Object)algorithmType);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        ShardingSphereAlgorithmConfiguration typedSPIConfig = new ShardingSphereAlgorithmConfiguration(algorithmType, new Properties());
        DataConsistencyCheckAlgorithm checkAlgorithm = (DataConsistencyCheckAlgorithm)ShardingSphereAlgorithmFactory.createAlgorithm((TypedSPIConfiguration)typedSPIConfig, DataConsistencyCheckAlgorithm.class);
        return this.dataConsistencyCheck0(jobConfig, checkAlgorithm);
    }

    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(JobConfiguration jobConfig, DataConsistencyCheckAlgorithm checkAlgorithm) {
        String jobId = jobConfig.getHandleConfig().getJobId();
        DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
        Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.checkRecordsCount();
        if (result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched)) {
            Map<String, Boolean> contentCheckResult = dataConsistencyChecker.checkRecordsContent(checkAlgorithm);
            result.forEach((key, value) -> value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false).booleanValue()));
        }
        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", new Object[]{jobId, checkAlgorithm.getClass().getName(), result});
        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, this.aggregateDataConsistencyCheckResults(jobId, result));
        return result;
    }

    public boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResultMap) {
        if (checkResultMap.isEmpty()) {
            return false;
        }
        for (Map.Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
            boolean recordsCountMatched = entry.getValue().isRecordsCountMatched();
            boolean recordsContentMatched = entry.getValue().isRecordsContentMatched();
            if (recordsContentMatched && recordsCountMatched) continue;
            log.error("Scaling job: {}, table: {} data consistency check failed, recordsContentMatched: {}, recordsCountMatched: {}", new Object[]{jobId, entry.getKey(), recordsContentMatched, recordsCountMatched});
            return false;
        }
        return true;
    }

    public void switchClusterConfiguration(String jobId) {
        this.checkModeConfig();
        log.info("Switch cluster configuration for job {}", (Object)jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        this.switchClusterConfiguration(jobConfig);
    }

    public void switchClusterConfiguration(JobConfiguration jobConfig) {
        Optional<Boolean> checkResultOptional;
        String jobId = jobConfig.getHandleConfig().getJobId();
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (!(!this.isDataConsistencyCheckNeeded(ruleAlteredContext) || (checkResultOptional = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobCheckResult(jobId)).isPresent() && checkResultOptional.get().booleanValue())) {
            throw new PipelineDataConsistencyCheckFailedException("Data consistency check not finished or failed.");
        }
        Optional<Collection<RuleAlteredJobContext>> optionalJobContexts = RuleAlteredJobSchedulerCenter.getJobContexts(jobId);
        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
        YamlRootConfiguration yamlRootConfig = (YamlRootConfiguration)YamlEngine.unmarshal((String)jobConfig.getPipelineConfig().getTarget().getParameter(), YamlRootConfiguration.class, (boolean)true);
        WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
        String schemaName = workflowConfig.getSchemaName();
        String ruleCacheId = workflowConfig.getRuleCacheId();
        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(schemaName, yamlRootConfig, ruleCacheId);
        ShardingSphereEventBus.getInstance().post((Object)taskFinishedEvent);
        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> {
            each.setStatus(JobStatus.FINISHED);
            RuleAlteredJobSchedulerCenter.persistJobProgress(each);
        }));
        this.stop(jobId);
    }

    public void reset(String jobId) {
        this.checkModeConfig();
        log.info("Scaling job {} reset target table", (Object)jobId);
        PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJobProgress(jobId);
        try {
            new ScalingEnvironmentManager().cleanupTargetTables(this.getJobConfig(jobId));
        }
        catch (SQLException ex) {
            throw new PipelineJobExecutionException("Reset target table failed for job " + jobId);
        }
    }

    public JobConfiguration getJobConfig(String jobId) {
        return this.getJobConfig(this.getElasticJobConfigPOJO(jobId));
    }

    private JobConfiguration getJobConfig(JobConfigurationPOJO elasticJobConfigPOJO) {
        return (JobConfiguration)YamlEngine.unmarshal((String)elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class, (boolean)true);
    }

    private JobConfigurationPOJO getElasticJobConfigPOJO(String jobId) {
        JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
        if (null == result) {
            throw new PipelineJobNotFoundException(String.format("Can not find scaling job %s", jobId), jobId);
        }
        return result;
    }
}

