/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.check.consistency;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SingleTableInventoryDataConsistencyChecker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SingleTableInventoryDataConsistencyChecker.class);
    private final String jobId;
    private final PipelineDataSourceWrapper sourceDataSource;
    private final PipelineDataSourceWrapper targetDataSource;
    private final SchemaTableName sourceTable;
    private final SchemaTableName targetTable;
    private final PipelineColumnMetaData uniqueKey;
    private final PipelineTableMetaDataLoader metaDataLoader;
    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
    private final ConsistencyCheckJobItemContext consistencyCheckJobItemContext;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DataConsistencyCheckResult check(DataConsistencyCalculateAlgorithm calculateAlgorithm) {
        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build((String)("job-" + this.getJobIdDigest(this.jobId) + "-check-%d"));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory);
        try {
            DataConsistencyCheckResult dataConsistencyCheckResult = this.check(calculateAlgorithm, executor, this.consistencyCheckJobItemContext);
            return dataConsistencyCheckResult;
        }
        finally {
            executor.shutdown();
            executor.shutdownNow();
        }
    }

    private DataConsistencyCheckResult check(DataConsistencyCalculateAlgorithm calculateAlgorithm, ThreadPoolExecutor executor, ConsistencyCheckJobItemContext checkJobItemContext) {
        String sourceDatabaseType = this.sourceDataSource.getDatabaseType().getType();
        String targetDatabaseType = this.targetDataSource.getDatabaseType().getType();
        String sourceTableName = this.sourceTable.getTableName().getOriginal();
        String schemaName = this.sourceTable.getSchemaName().getOriginal();
        PipelineTableMetaData tableMetaData = this.metaDataLoader.getTableMetaData(schemaName, sourceTableName);
        if (null != checkJobItemContext) {
            checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI((String)PipelineJobIdUtils.parseJobType(this.jobId).getTypeName());
            Map jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(this.jobId);
            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
            checkJobItemContext.setRecordsCount(recordsCount);
            log.info("check, get records count: {}", (Object)recordsCount);
        }
        ShardingSpherePreconditions.checkNotNull((Object)tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
        List columnNames = tableMetaData.getColumnNames();
        DataConsistencyCalculateParameter sourceParameter = this.buildParameter(this.sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, this.uniqueKey);
        DataConsistencyCalculateParameter targetParameter = this.buildParameter(this.targetDataSource, this.targetTable.getSchemaName().getOriginal(), this.targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, this.uniqueKey);
        Iterator sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
        Iterator targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
        long sourceRecordsCount = 0L;
        long targetRecordsCount = 0L;
        boolean contentMatched = true;
        while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
            if (null != this.readRateLimitAlgorithm) {
                this.readRateLimitAlgorithm.intercept(JobOperationType.SELECT, (Number)1);
            }
            Future<DataConsistencyCalculatedResult> sourceFuture = executor.submit(sourceCalculatedResults::next);
            Future<DataConsistencyCalculatedResult> targetFuture = executor.submit(targetCalculatedResults::next);
            DataConsistencyCalculatedResult sourceCalculatedResult = this.waitFuture(sourceFuture);
            DataConsistencyCalculatedResult targetCalculatedResult = this.waitFuture(targetFuture);
            sourceRecordsCount += (long)sourceCalculatedResult.getRecordsCount();
            targetRecordsCount += (long)targetCalculatedResult.getRecordsCount();
            contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
            if (!contentMatched) {
                log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", new Object[]{this.jobId, this.sourceTable, this.targetTable, this.uniqueKey});
                break;
            }
            if (null == checkJobItemContext) continue;
            checkJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
        }
        if (null != checkJobItemContext) {
            checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
        }
        return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
    }

    private String getJobIdDigest(String jobId) {
        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
    }

    private DataConsistencyCalculateParameter buildParameter(PipelineDataSourceWrapper sourceDataSource, String schemaName, String tableName, Collection<String> columnNames, String sourceDatabaseType, String targetDatabaseType, PipelineColumnMetaData uniqueKey) {
        return new DataConsistencyCalculateParameter(sourceDataSource, schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
    }

    private <T> T waitFuture(Future<T> future) {
        try {
            return future.get();
        }
        catch (InterruptedException | ExecutionException ex) {
            if (ex.getCause() instanceof PipelineSQLException) {
                throw (PipelineSQLException)((Object)ex.getCause());
            }
            throw new SQLWrapperException(new SQLException(ex));
        }
    }

    @Generated
    public SingleTableInventoryDataConsistencyChecker(String jobId, PipelineDataSourceWrapper sourceDataSource, PipelineDataSourceWrapper targetDataSource, SchemaTableName sourceTable, SchemaTableName targetTable, PipelineColumnMetaData uniqueKey, PipelineTableMetaDataLoader metaDataLoader, JobRateLimitAlgorithm readRateLimitAlgorithm, ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
        this.jobId = jobId;
        this.sourceDataSource = sourceDataSource;
        this.targetDataSource = targetDataSource;
        this.sourceTable = sourceTable;
        this.targetTable = targetTable;
        this.uniqueKey = uniqueKey;
        this.metaDataLoader = metaDataLoader;
        this.readRateLimitAlgorithm = readRateLimitAlgorithm;
        this.consistencyCheckJobItemContext = consistencyCheckJobItemContext;
    }
}

