/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.client.http.converter.batch;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.converter.batch.BatchBinaryRecord;
import com.aliyun.datahub.client.http.converter.batch.BinaryRecord;
import com.aliyun.datahub.client.impl.schemaregistry.SchemaRegistryClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.CompressType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordRespMeta;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.InflaterInputStream;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.commons.io.IOUtils;

public class BatchDeserializer {
    private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor();
    private final String projectName;
    private final String topicName;
    private final RecordSchema initSchema;
    private final SchemaRegistryClient schemaRegistry;

    public BatchDeserializer(String projectName, String topicName, RecordSchema initSchema, SchemaRegistryClient schemaRegistry) {
        this.projectName = projectName;
        this.topicName = topicName;
        this.initSchema = initSchema;
        this.schemaRegistry = schemaRegistry;
    }

    public List<RecordEntry> deserialize(byte[] data, RecordRespMeta meta, String shardId) {
        BatchBinaryRecord batch = this.parseBatchRecordFrom(data);
        ArrayList<RecordEntry> records = new ArrayList<RecordEntry>();
        int index = 0;
        int recordSize = batch.getRecords().size();
        for (BinaryRecord binaryRecord : batch.getRecords()) {
            RecordEntry recordEntry = BatchDeserializer.convertFromBinaryRecord(binaryRecord, meta, binaryRecord.getSchema());
            recordEntry.innerSetSegmentInfo(binaryRecord.getSchemaVersionId(), recordSize, index++);
            recordEntry.setShardId(shardId);
            records.add(recordEntry);
        }
        return records;
    }

    private static RecordEntry convertFromBinaryRecord(BinaryRecord binaryRecord, RecordRespMeta meta, RecordSchema schema) {
        RecordEntry record = new RecordEntry();
        record.setCursor(meta.getCursor());
        record.setSequence(meta.getSequence());
        record.setSerial(meta.getSerial());
        record.setSystemTime(meta.getSystemTime());
        record.setLatestSequence(meta.getLatestSequence());
        record.setLatestTime(meta.getLatestTime());
        record.setAttributes(binaryRecord.getAttrMap());
        if (schema != null) {
            TupleRecordData data = new TupleRecordData(schema);
            for (int i = 0; i < schema.getFields().size(); ++i) {
                data.setField(i, binaryRecord.getField(i));
            }
            record.setRecordData(data);
        } else {
            byte[] bytes = (byte[])binaryRecord.getField(0);
            BlobRecordData data = new BlobRecordData(bytes);
            record.setRecordData(data);
        }
        return record;
    }

    public BatchBinaryRecord parseBatchRecordFrom(byte[] bytes) {
        int byteLength = bytes.length;
        BatchBinaryRecord.BatchHeader header = BatchBinaryRecord.BatchHeader.parseFrom(bytes);
        ByteArrayInputStream inputStream = null;
        CompressType compressType = CompressType.fromValue(header.getAttributes() & 3);
        if (compressType != CompressType.NONE) {
            ByteArrayInputStream input = new ByteArrayInputStream(bytes, 26, byteLength - 26);
            byte[] rawData = BatchDeserializer.decompress(compressType, input, header.getRawSize());
            inputStream = new ByteArrayInputStream(rawData);
        } else {
            inputStream = new ByteArrayInputStream(bytes, 26, byteLength - 26);
        }
        BatchBinaryRecord batch = new BatchBinaryRecord();
        for (int i = 0; i < header.getRecordCount(); ++i) {
            BinaryRecord record = this.parseBinaryRecordFrom(inputStream);
            batch.addRecord(record);
        }
        return batch;
    }

    public BinaryRecord parseBinaryRecordFrom(ByteArrayInputStream input) {
        int left = input.available();
        input.mark(0);
        BinaryRecord.RecordHeader header = BinaryRecord.RecordHeader.parseFrom(input);
        if (left < header.getTotalSize()) {
            throw new DatahubClientException("Check record header length fail");
        }
        RecordSchema schema = null;
        if (header.getSchemaVersion() != -1) {
            schema = this.schemaRegistry != null ? this.schemaRegistry.getSchema(this.projectName, this.topicName, header.getSchemaVersion()) : this.initSchema;
        }
        input.reset();
        byte[] bytes = new byte[header.getTotalSize()];
        int length = input.read(bytes, 0, header.getTotalSize());
        if (length != header.getTotalSize()) {
            throw new DatahubClientException("Check record total size fail");
        }
        return new BinaryRecord(bytes, header, schema, header.getSchemaVersion());
    }

    private static byte[] decompress(CompressType type, ByteArrayInputStream input, int rawSize) {
        try {
            if (type == CompressType.DEFLATE) {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                InflaterInputStream iis = new InflaterInputStream(input);
                IOUtils.copy((InputStream)iis, (OutputStream)out);
                return out.toByteArray();
            }
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            IOUtils.copy((InputStream)input, (OutputStream)out);
            byte[] restored = new byte[rawSize];
            int size = LZ4_DECOMPRESSOR.decompress(out.toByteArray(), restored);
            if (rawSize != size) {
                return Arrays.copyOfRange(restored, 0, size);
            }
            return restored;
        }
        catch (Exception e) {
            throw new DatahubClientException("Decompress data fail. error:" + e.getMessage());
        }
    }
}

