/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.check.consistency;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DataConsistencyCheckerImpl
implements DataConsistencyChecker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DataConsistencyCheckerImpl.class);
    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
    private final JobConfiguration jobConfig;
    private final RuleAlteredContext ruleAlteredContext;
    private final String jobId;
    private final Collection<String> logicTableNames;

    public DataConsistencyCheckerImpl(JobConfiguration jobConfig) {
        this.jobConfig = jobConfig;
        this.ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        this.jobId = jobConfig.getHandleConfig().getJobId();
        this.logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
    }

    /*
     * Exception decompiling
     */
    @Override
    public Map<String, DataConsistencyCheckResult> checkRecordsCount() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private String getJobIdPrefix(String jobId) {
        if (jobId.length() <= 6) {
            return jobId;
        }
        return jobId.substring(0, 6);
    }

    private DataConsistencyCheckResult countCheck(String table, PipelineDataSourceWrapper sourceDataSource, PipelineDataSourceWrapper targetDataSource, ThreadPoolExecutor executor) {
        try {
            Future<Long> sourceFuture = executor.submit(() -> this.count((DataSource)sourceDataSource, table, sourceDataSource.getDatabaseType()));
            Future<Long> targetFuture = executor.submit(() -> this.count((DataSource)targetDataSource, table, targetDataSource.getDatabaseType()));
            long sourceCount = sourceFuture.get();
            long targetCount = targetFuture.get();
            return new DataConsistencyCheckResult(sourceCount, targetCount);
        }
        catch (InterruptedException | ExecutionException ex) {
            throw new PipelineDataConsistencyCheckFailedException(String.format("count check failed for table '%s'", table), ex);
        }
    }

    /*
     * Exception decompiling
     */
    private long count(DataSource dataSource, String table, DatabaseType databaseType) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 5 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public Map<String, Boolean> checkRecordsContent(DataConsistencyCheckAlgorithm checkAlgorithm) {
        Collection supportedDatabaseTypes = checkAlgorithm.getSupportedDatabaseTypes();
        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance((String)this.jobConfig.getPipelineConfig().getSource().getType(), (String)this.jobConfig.getPipelineConfig().getSource().getParameter());
        this.checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, sourceDataSourceConfig.getDatabaseType().getName());
        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance((String)this.jobConfig.getPipelineConfig().getTarget().getType(), (String)this.jobConfig.getPipelineConfig().getTarget().getParameter());
        this.checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName());
        this.addDataSourceConfigToMySQL(sourceDataSourceConfig, targetDataSourceConfig);
        String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getName();
        String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getName();
        SingleTableDataCalculator sourceCalculator = checkAlgorithm.getSingleTableDataCalculator(sourceDatabaseType);
        SingleTableDataCalculator targetCalculator = checkAlgorithm.getSingleTableDataCalculator(targetDatabaseType);
        HashMap<String, Boolean> result = new HashMap<String, Boolean>();
        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build((String)("job" + this.getJobIdPrefix(this.jobId) + "-dataCheck-%d"));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory);
        JobRateLimitAlgorithm inputRateLimitAlgorithm = this.ruleAlteredContext.getInputRateLimitAlgorithm();
        try (PipelineDataSourceWrapper sourceDataSource = this.dataSourceFactory.newInstance(sourceDataSourceConfig);
             PipelineDataSourceWrapper targetDataSource = this.dataSourceFactory.newInstance(targetDataSourceConfig);){
            Map<String, TableMetaData> tableMetaDataMap = this.getTableMetaDataMap(this.jobConfig.getWorkflowConfig().getSchemaName());
            this.logicTableNames.forEach(each -> {
                if (!tableMetaDataMap.containsKey(each)) {
                    throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each));
                }
            });
            for (String each2 : this.logicTableNames) {
                TableMetaData tableMetaData = tableMetaDataMap.get(each2);
                Set columnNames = tableMetaData.getColumns().keySet();
                String uniqueKey = (String)tableMetaData.getPrimaryKeyColumns().get(0);
                DataCalculateParameter sourceCalculateParameter = DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType).logicTableName(each2).columnNames(columnNames).uniqueKey(uniqueKey).build();
                DataCalculateParameter targetCalculateParameter = DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType).logicTableName(each2).columnNames(columnNames).uniqueKey(uniqueKey).build();
                Iterator sourceCalculatedResultIterator = sourceCalculator.calculate(sourceCalculateParameter).iterator();
                Iterator targetCalculatedResultIterator = targetCalculator.calculate(targetCalculateParameter).iterator();
                boolean calculateResultsEquals = true;
                while (sourceCalculatedResultIterator.hasNext() && targetCalculatedResultIterator.hasNext()) {
                    Object targetCalculatedResult;
                    if (null != inputRateLimitAlgorithm) {
                        inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, (Number)1);
                    }
                    Future<Object> sourceFuture = executor.submit(sourceCalculatedResultIterator::next);
                    Future<Object> targetFuture = executor.submit(targetCalculatedResultIterator::next);
                    Object sourceCalculatedResult = sourceFuture.get();
                    calculateResultsEquals = Objects.equals(sourceCalculatedResult, targetCalculatedResult = targetFuture.get());
                    if (calculateResultsEquals) continue;
                    break;
                }
                result.put(each2, calculateResultsEquals);
            }
        }
        catch (InterruptedException | SQLException | ExecutionException ex) {
            throw new PipelineDataConsistencyCheckFailedException("data check failed", ex);
        }
        finally {
            executor.shutdown();
            executor.shutdownNow();
        }
        return result;
    }

    private void checkDatabaseTypeSupportedOrNot(Collection<String> supportedDatabaseTypes, String databaseType) {
        if (!supportedDatabaseTypes.contains(databaseType)) {
            throw new PipelineDataConsistencyCheckFailedException("database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
        }
    }

    private Map<String, TableMetaData> getTableMetaDataMap(String schemaName) {
        ContextManager contextManager = PipelineContext.getContextManager();
        Preconditions.checkNotNull((Object)contextManager, (Object)"contextManager null");
        ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
        return metaData.getSchema().getTables();
    }

    private void addDataSourceConfigToMySQL(PipelineDataSourceConfiguration sourceDataSourceConfig, PipelineDataSourceConfiguration targetDataSourceConfig) {
        if (sourceDataSourceConfig.getDatabaseType().getName().equalsIgnoreCase(new MySQLDatabaseType().getName())) {
            Properties queryProps = new Properties();
            queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
            sourceDataSourceConfig.appendJDBCQueryProperties(queryProps);
            targetDataSourceConfig.appendJDBCQueryProperties(queryProps);
        }
    }

    @Generated
    public PipelineDataSourceFactory getDataSourceFactory() {
        return this.dataSourceFactory;
    }

    @Generated
    public JobConfiguration getJobConfig() {
        return this.jobConfig;
    }

    @Generated
    public RuleAlteredContext getRuleAlteredContext() {
        return this.ruleAlteredContext;
    }

    @Generated
    public String getJobId() {
        return this.jobId;
    }

    @Generated
    public Collection<String> getLogicTableNames() {
        return this.logicTableNames;
    }
}

