/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MultiplexMemoryPipelineChannel
implements PipelineChannel {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MultiplexMemoryPipelineChannel.class);
    private static final EmptyAckCallback EMPTY_ACK_CALLBACK = new EmptyAckCallback();
    private final int channelNumber;
    private final PipelineChannel[] channels;
    private final Map<String, Integer> channelAssignment = new HashMap<String, Integer>();

    public MultiplexMemoryPipelineChannel() {
        this(EMPTY_ACK_CALLBACK);
    }

    public MultiplexMemoryPipelineChannel(AckCallback ackCallback) {
        this(10000, ackCallback);
    }

    public MultiplexMemoryPipelineChannel(int blockQueueSize, AckCallback ackCallback) {
        this(1, blockQueueSize, ackCallback);
    }

    public MultiplexMemoryPipelineChannel(int channelNumber, int blockQueueSize, AckCallback ackCallback) {
        this.channelNumber = channelNumber;
        this.channels = new PipelineChannel[channelNumber];
        for (int i = 0; i < channelNumber; ++i) {
            this.channels[i] = new SimpleMemoryPipelineChannel(blockQueueSize, ackCallback);
        }
    }

    public void pushRecord(Record record) {
        if (FinishedRecord.class.equals(record.getClass())) {
            for (int i = 0; i < this.channelNumber; ++i) {
                this.pushRecord(record, i);
            }
        } else if (DataRecord.class.equals(record.getClass())) {
            this.pushRecord(record, Math.abs(record.hashCode() % this.channelNumber));
        } else if (PlaceholderRecord.class.equals(record.getClass())) {
            this.pushRecord(record, 0);
        } else {
            throw new RuntimeException("Not Support Record Type");
        }
    }

    private void pushRecord(Record record, int channelIndex) {
        PipelineChannel channel = this.channels[channelIndex];
        channel.pushRecord(record);
    }

    public List<Record> fetchRecords(int batchSize, int timeoutSeconds) {
        return this.findChannel().fetchRecords(batchSize, timeoutSeconds);
    }

    public void ack(List<Record> records) {
        this.findChannel().ack(records);
    }

    private PipelineChannel findChannel() {
        String threadId = Long.toString(Thread.currentThread().getId());
        this.checkAssignment(threadId);
        return this.channels[this.channelAssignment.get(threadId)];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAssignment(String threadId) {
        if (!this.channelAssignment.containsKey(threadId)) {
            MultiplexMemoryPipelineChannel multiplexMemoryPipelineChannel = this;
            synchronized (multiplexMemoryPipelineChannel) {
                if (!this.channelAssignment.containsKey(threadId)) {
                    this.assignmentChannel(threadId);
                }
            }
        }
    }

    private void assignmentChannel(String threadId) {
        for (int i = 0; i < this.channels.length; ++i) {
            if (this.channelAssignment.containsValue(i)) continue;
            this.channelAssignment.put(threadId, i);
            return;
        }
    }

    public void close() {
        for (PipelineChannel each : this.channels) {
            each.close();
        }
    }
}

