/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class IncrementalTask
implements PipelineTask,
AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(IncrementalTask.class);
    private final String taskId;
    private final ExecuteEngine incrementalExecuteEngine;
    private final PipelineChannel channel;
    private final Dumper dumper;
    private final Collection<Importer> importers;
    private final IncrementalTaskProgress taskProgress;

    public IncrementalTask(int concurrency, DumperConfiguration dumperConfig, ImporterConfiguration importerConfig, PipelineChannelCreator pipelineChannelCreator, PipelineDataSourceManager dataSourceManager, PipelineTableMetaDataLoader sourceMetaDataLoader, ExecuteEngine incrementalExecuteEngine, PipelineJobProgressListener jobProgressListener) {
        this.taskId = dumperConfig.getDataSourceName();
        this.incrementalExecuteEngine = incrementalExecuteEngine;
        IngestPosition position = dumperConfig.getPosition();
        this.taskProgress = this.createIncrementalTaskProgress(position);
        this.channel = this.createChannel(concurrency, pipelineChannelCreator, this.taskProgress);
        this.dumper = IncrementalDumperCreatorFactory.getInstance((String)dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, this.channel, sourceMetaDataLoader);
        this.importers = this.createImporters(concurrency, importerConfig, dataSourceManager, this.channel, jobProgressListener);
    }

    private IncrementalTaskProgress createIncrementalTaskProgress(IngestPosition<?> position) {
        IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
        incrementalTaskProgress.setPosition(position);
        return incrementalTaskProgress;
    }

    private Collection<Importer> createImporters(int concurrency, ImporterConfiguration importerConfig, PipelineDataSourceManager dataSourceManager, PipelineChannel channel, PipelineJobProgressListener jobProgressListener) {
        LinkedList<Importer> result = new LinkedList<Importer>();
        for (int i = 0; i < concurrency; ++i) {
            result.add(ImporterCreatorFactory.getInstance((String)importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig, dataSourceManager, channel, jobProgressListener));
        }
        return result;
    }

    private PipelineChannel createChannel(int concurrency, PipelineChannelCreator pipelineChannelCreator, IncrementalTaskProgress progress) {
        return pipelineChannelCreator.createPipelineChannel(concurrency, records -> {
            Record lastHandledRecord = (Record)records.get(records.size() - 1);
            if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
                progress.setPosition(lastHandledRecord.getPosition());
                progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
            }
            progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        });
    }

    @Override
    public CompletableFuture<?> start() {
        this.taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        CompletableFuture<?> dumperFuture = this.incrementalExecuteEngine.submit((LifecycleExecutor)this.dumper, new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("incremental dumper onSuccess, taskId={}", (Object)IncrementalTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("incremental dumper onFailure, taskId={}", (Object)IncrementalTask.this.taskId);
                IncrementalTask.this.stop();
            }
        });
        CompletableFuture<?> importerFuture = this.incrementalExecuteEngine.submitAll(this.importers, new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("importer onSuccess, taskId={}", (Object)IncrementalTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("importer onFailure, taskId={}", (Object)IncrementalTask.this.taskId, (Object)throwable);
                IncrementalTask.this.stop();
            }
        });
        return CompletableFuture.allOf(dumperFuture, importerFuture);
    }

    @Override
    public void stop() {
        this.dumper.stop();
        for (Importer each : this.importers) {
            each.stop();
        }
    }

    @Override
    public void close() {
        this.channel.close();
    }

    @Generated
    public String toString() {
        return "IncrementalTask(taskId=" + this.getTaskId() + ")";
    }

    @Override
    @Generated
    public String getTaskId() {
        return this.taskId;
    }

    @Generated
    public IncrementalTaskProgress getTaskProgress() {
        return this.taskProgress;
    }
}

