/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InventoryTask
implements PipelineTask,
AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InventoryTask.class);
    private final String taskId;
    private final ExecuteEngine inventoryDumperExecuteEngine;
    private final ExecuteEngine inventoryImporterExecuteEngine;
    private final PipelineChannel channel;
    private final Dumper dumper;
    private final Importer importer;
    private volatile IngestPosition<?> position;

    public InventoryTask(InventoryDumperConfiguration inventoryDumperConfig, ImporterConfiguration importerConfig, PipelineChannelCreator pipelineChannelCreator, PipelineDataSourceManager dataSourceManager, DataSource sourceDataSource, PipelineTableMetaDataLoader sourceMetaDataLoader, ExecuteEngine inventoryDumperExecuteEngine, ExecuteEngine inventoryImporterExecuteEngine, PipelineJobProgressListener jobProgressListener) {
        this.taskId = this.generateTaskId(inventoryDumperConfig);
        this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
        this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
        this.channel = this.createChannel(pipelineChannelCreator);
        this.dumper = new InventoryDumper(inventoryDumperConfig, this.channel, sourceDataSource, sourceMetaDataLoader);
        this.importer = ImporterCreatorFactory.getInstance((String)importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig, dataSourceManager, this.channel, jobProgressListener);
        this.position = inventoryDumperConfig.getPosition();
    }

    private String generateTaskId(InventoryDumperConfiguration inventoryDumperConfig) {
        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
        return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
    }

    @Override
    public CompletableFuture<?> start() {
        CompletableFuture<?> dumperFuture = this.inventoryDumperExecuteEngine.submit((LifecycleExecutor)this.dumper, new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("dumper onSuccess, taskId={}", (Object)InventoryTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("dumper onFailure, taskId={}", (Object)InventoryTask.this.taskId);
                InventoryTask.this.stop();
            }
        });
        CompletableFuture<?> importerFuture = this.inventoryImporterExecuteEngine.submit((LifecycleExecutor)this.importer, new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("importer onSuccess, taskId={}", (Object)InventoryTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("importer onFailure, taskId={}", (Object)InventoryTask.this.taskId, (Object)throwable);
                InventoryTask.this.stop();
            }
        });
        return CompletableFuture.allOf(dumperFuture, importerFuture);
    }

    private PipelineChannel createChannel(PipelineChannelCreator pipelineChannelCreator) {
        return pipelineChannelCreator.createPipelineChannel(1, records -> {
            Record lastNormalRecord = this.getLastNormalRecord(records);
            if (null != lastNormalRecord) {
                this.position = lastNormalRecord.getPosition();
            }
        });
    }

    private Record getLastNormalRecord(List<Record> records) {
        for (int index = records.size() - 1; index >= 0; --index) {
            Record record = records.get(index);
            if (record.getPosition() instanceof PlaceholderPosition) continue;
            return record;
        }
        return null;
    }

    @Override
    public void stop() {
        this.dumper.stop();
        this.importer.stop();
    }

    public InventoryTaskProgress getTaskProgress() {
        return new InventoryTaskProgress(this.position);
    }

    @Override
    public void close() {
        this.channel.close();
    }

    @Generated
    public String toString() {
        return "InventoryTask(taskId=" + this.getTaskId() + ", position=" + this.position + ")";
    }

    @Override
    @Generated
    public String getTaskId() {
        return this.taskId;
    }
}

