/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractPipelineJobAPIImpl.class);
    protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();

    @Override
    public final String marshalJobId(PipelineJobId pipelineJobId) {
        return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + this.marshalJobIdLeftPart(pipelineJobId);
    }

    protected abstract String marshalJobIdLeftPart(PipelineJobId var1);

    public List<? extends PipelineJobInfo> list() {
        this.checkModeConfig();
        return this.getJobBriefInfos().map(each -> this.getJobInfo(each.getJobName())).collect(Collectors.toList());
    }

    protected void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull((Object)modeConfig, (Object)"Mode configuration is required.");
        Preconditions.checkArgument((boolean)"Cluster".equalsIgnoreCase(modeConfig.getType()), (Object)"Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_")).filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()) == this.getJobType());
    }

    protected abstract PipelineJobInfo getJobInfo(String var1);

    @Override
    public Optional<String> start(PipelineJobConfiguration jobConfig) {
        String jobId = jobConfig.getJobId();
        ShardingSpherePreconditions.checkState((0 != jobConfig.getJobShardingCount() ? 1 : 0) != 0, () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
        log.info("Start job by {}", (Object)jobConfig);
        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
        if (repositoryAPI.isExisted(jobConfigKey)) {
            log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", (Object)jobConfigKey);
            return Optional.of(jobId);
        }
        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), this.getJobClassName());
        repositoryAPI.persist(jobConfigKey, this.convertJobConfigurationToText(jobConfig));
        return Optional.of(jobId);
    }

    protected abstract String getJobClassName();

    private String convertJobConfigurationToText(PipelineJobConfiguration jobConfig) {
        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
        jobConfigPOJO.setJobName(jobConfig.getJobId());
        jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
        jobConfigPOJO.setJobParameter(YamlEngine.marshal((Object)this.swapToYamlJobConfiguration(jobConfig)));
        jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal((Object)jobConfigPOJO);
    }

    protected abstract YamlPipelineJobConfiguration swapToYamlJobConfiguration(PipelineJobConfiguration var1);

    protected abstract PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO var1);

    public void startDisabledJob(String jobId) {
        log.info("Start disabled pipeline job {}", (Object)jobId);
        this.pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        ShardingSpherePreconditions.checkState((boolean)jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
        jobConfigPOJO.setDisabled(false);
        jobConfigPOJO.getProps().remove("stop_time");
        jobConfigPOJO.getProps().remove("stop_time_millis");
        PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
        String barrierPath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
        this.pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
        this.pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
    }

    public void stop(String jobId) {
        log.info("Stop pipeline job {}", (Object)jobId);
        this.pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        jobConfigPOJO.setDisabled(true);
        jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
        PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
        String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
        this.pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
        this.pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
    }

    protected void dropJob(String jobId) {
        log.info("Drop job {}", (Object)jobId);
        PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
        PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
    }

    protected final JobConfigurationPOJO getElasticJobConfigPOJO(String jobId) {
        JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
        ShardingSpherePreconditions.checkNotNull((Object)result, () -> new PipelineJobNotFoundException(jobId));
        return result;
    }

    public String getType() {
        return this.getJobType().getTypeName();
    }

    @Override
    public String getJobItemErrorMessage(String jobId, int shardingItem) {
        return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMessage(jobId, shardingItem)).orElse("");
    }

    @Override
    public void persistJobItemErrorMessage(String jobId, int shardingItem, Object error) {
        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
        String value = "";
        if (null != error) {
            value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable)((Throwable)error)) : error.toString();
        }
        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
    }

    @Override
    public void cleanJobItemErrorMessage(String jobId, int shardingItem) {
        PipelineAPIFactory.getGovernanceRepositoryAPI().cleanJobItemErrorMessage(jobId, shardingItem);
    }
}

