/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.importer;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DefaultImporter
extends AbstractLifecycleExecutor
implements Importer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultImporter.class);
    private static final DataRecordMerger MERGER = new DataRecordMerger();
    private final ImporterConfiguration importerConfig;
    private final PipelineDataSourceManager dataSourceManager;
    private final PipelineSQLBuilder pipelineSqlBuilder;
    private final PipelineChannel channel;
    private final PipelineJobProgressListener jobProgressListener;
    private final JobRateLimitAlgorithm rateLimitAlgorithm;

    public DefaultImporter(ImporterConfiguration importerConfig, PipelineDataSourceManager dataSourceManager, PipelineChannel channel, PipelineJobProgressListener jobProgressListener) {
        this.importerConfig = importerConfig;
        this.rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
        this.dataSourceManager = dataSourceManager;
        this.channel = channel;
        this.pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
        this.jobProgressListener = jobProgressListener;
    }

    protected void runBlocking() {
        this.write();
    }

    private void write() {
        log.info("importer write");
        int round = 1;
        int rowCount = 0;
        boolean finishedByBreak = false;
        int batchSize = this.importerConfig.getBatchSize() * 2;
        while (this.isRunning()) {
            List records = this.channel.fetchRecords(batchSize, 3);
            if (null == records || records.isEmpty()) continue;
            rowCount += records.size();
            PipelineJobProgressUpdatedParameter updatedParameter = this.flush((DataSource)this.dataSourceManager.getDataSource(this.importerConfig.getDataSourceConfig()), records);
            this.channel.ack(records);
            this.jobProgressListener.onProgressUpdated(updatedParameter);
            if (0 == ++round % 50) {
                log.info("importer write, round={}, rowCount={}", (Object)round, (Object)rowCount);
            }
            if (!FinishedRecord.class.equals(((Record)records.get(records.size() - 1)).getClass())) continue;
            log.info("write, get FinishedRecord, break");
            finishedByBreak = true;
            break;
        }
        log.info("importer write done, rowCount={}, finishedByBreak={}", (Object)rowCount, (Object)finishedByBreak);
    }

    private PipelineJobProgressUpdatedParameter flush(DataSource dataSource, List<Record> buffer) {
        List<DataRecord> dataRecords = buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord)each).collect(Collectors.toList());
        int insertRecordNumber = 0;
        for (DataRecord each2 : dataRecords) {
            if (!"INSERT".equals(each2.getType())) continue;
            ++insertRecordNumber;
        }
        List<GroupedDataRecord> result = MERGER.group(dataRecords);
        for (GroupedDataRecord each3 : result) {
            this.flushInternal(dataSource, each3.getDeleteDataRecords());
            this.flushInternal(dataSource, each3.getInsertDataRecords());
            this.flushInternal(dataSource, each3.getUpdateDataRecords());
        }
        return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
    }

    private void flushInternal(DataSource dataSource, List<DataRecord> buffer) {
        if (null == buffer || buffer.isEmpty()) {
            return;
        }
        boolean success = this.tryFlush(dataSource, buffer);
        ShardingSpherePreconditions.checkState((!this.isRunning() || success ? 1 : 0) != 0, PipelineImporterJobWriteException::new);
    }

    private boolean tryFlush(DataSource dataSource, List<DataRecord> buffer) {
        for (int i = 0; this.isRunning() && i <= this.importerConfig.getRetryTimes(); ++i) {
            try {
                this.doFlush(dataSource, buffer);
                return true;
            }
            catch (SQLException ex) {
                log.error("flush failed {}/{} times.", new Object[]{i, this.importerConfig.getRetryTimes(), ex});
                ThreadUtil.sleep(Math.min(300000L, 1000L << i));
                continue;
            }
        }
        return false;
    }

    private void doFlush(DataSource dataSource, List<DataRecord> buffer) throws SQLException {
        try (Connection connection = dataSource.getConnection();){
            connection.setAutoCommit(false);
            switch (buffer.get(0).getType()) {
                case "INSERT": {
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.INSERT, (Number)1);
                    }
                    this.executeBatchInsert(connection, buffer);
                    break;
                }
                case "UPDATE": {
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.UPDATE, (Number)1);
                    }
                    this.executeUpdate(connection, buffer);
                    break;
                }
                case "DELETE": {
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.DELETE, (Number)1);
                    }
                    this.executeBatchDelete(connection, buffer);
                    break;
                }
            }
            connection.commit();
        }
    }

    private void executeBatchInsert(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        DataRecord dataRecord = dataRecords.get(0);
        String insertSql = this.pipelineSqlBuilder.buildInsertSQL(this.getSchemaName(dataRecord.getTableName()), dataRecord, this.importerConfig.getShardingColumnsMap());
        try (PreparedStatement ps = connection.prepareStatement(insertSql);){
            ps.setQueryTimeout(30);
            for (DataRecord each : dataRecords) {
                for (int i = 0; i < each.getColumnCount(); ++i) {
                    ps.setObject(i + 1, each.getColumn(i).getValue());
                }
                ps.addBatch();
            }
            ps.executeBatch();
        }
    }

    private String getSchemaName(String logicTableName) {
        return this.getImporterConfig().getSchemaName(new LogicTableName(logicTableName));
    }

    private void executeUpdate(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        for (DataRecord each : dataRecords) {
            this.executeUpdate(connection, each);
        }
    }

    private void executeUpdate(Connection connection, DataRecord record) throws SQLException {
        Set shardingColumns = this.importerConfig.getShardingColumns(record.getTableName());
        if (null == shardingColumns) {
            log.error("executeUpdate, could not get shardingColumns, tableName={}, logicTableNames={}", (Object)record.getTableName(), (Object)this.importerConfig.getLogicTableNames());
        }
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
        List updatedColumns = this.pipelineSqlBuilder.extractUpdatedColumns(record, this.importerConfig.getShardingColumnsMap());
        String updateSql = this.pipelineSqlBuilder.buildUpdateSQL(this.getSchemaName(record.getTableName()), record, conditionColumns, this.importerConfig.getShardingColumnsMap());
        try (PreparedStatement ps = connection.prepareStatement(updateSql);){
            int i;
            for (i = 0; i < updatedColumns.size(); ++i) {
                ps.setObject(i + 1, ((Column)updatedColumns.get(i)).getValue());
            }
            for (i = 0; i < conditionColumns.size(); ++i) {
                Column keyColumn = conditionColumns.get(i);
                ps.setObject(updatedColumns.size() + i + 1, keyColumn.isUniqueKey() && keyColumn.isUpdated() ? keyColumn.getOldValue() : keyColumn.getValue());
            }
            int updateCount = ps.executeUpdate();
            if (1 != updateCount) {
                log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", new Object[]{updateCount, updateSql, updatedColumns, conditionColumns});
            }
        }
    }

    private void executeBatchDelete(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        DataRecord dataRecord = dataRecords.get(0);
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecord, this.importerConfig.getShardingColumns(dataRecord.getTableName()));
        String deleteSQL = this.pipelineSqlBuilder.buildDeleteSQL(this.getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns);
        try (PreparedStatement ps = connection.prepareStatement(deleteSQL);){
            ps.setQueryTimeout(30);
            for (DataRecord each : dataRecords) {
                conditionColumns = RecordUtil.extractConditionColumns(each, this.importerConfig.getShardingColumns(each.getTableName()));
                for (int i = 0; i < conditionColumns.size(); ++i) {
                    ps.setObject(i + 1, conditionColumns.get(i).getValue());
                }
                ps.addBatch();
            }
            ps.executeBatch();
        }
    }

    protected void doStop() {
    }

    @Generated
    protected ImporterConfiguration getImporterConfig() {
        return this.importerConfig;
    }
}

