/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

public class StoreChangelogReader
implements ChangelogReader {
    private final Logger log;
    private final Consumer<byte[], byte[]> consumer;
    private final StateRestoreListener userStateRestoreListener;
    private final Map<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<String, List<PartitionInfo>>();
    private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<TopicPartition, StateRestorer>();
    private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<TopicPartition, StateRestorer>();
    private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<TopicPartition, StateRestorer>();

    public StoreChangelogReader(Consumer<byte[], byte[]> consumer, StateRestoreListener userStateRestoreListener, LogContext logContext) {
        this.consumer = consumer;
        this.log = logContext.logger(this.getClass());
        this.userStateRestoreListener = userStateRestoreListener;
    }

    @Override
    public void register(StateRestorer restorer) {
        restorer.setUserRestoreListener(this.userStateRestoreListener);
        this.stateRestorers.put(restorer.partition(), restorer);
        this.needsInitializing.put(restorer.partition(), restorer);
    }

    @Override
    public Collection<TopicPartition> restore(RestoringTasks active) {
        if (!this.needsInitializing.isEmpty()) {
            this.initialize();
        }
        if (this.needsRestoring.isEmpty()) {
            this.consumer.assign(Collections.emptyList());
            return this.completed();
        }
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(this.needsRestoring.keySet());
        ConsumerRecords allRecords = this.consumer.poll(10L);
        for (TopicPartition partition : partitions) {
            this.restorePartition((ConsumerRecords<byte[], byte[]>)allRecords, partition, active.restoringTaskFor(partition));
        }
        if (this.needsRestoring.isEmpty()) {
            this.consumer.assign(Collections.emptyList());
        }
        return this.completed();
    }

    private void initialize() {
        if (!this.consumer.subscription().isEmpty()) {
            throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + this.consumer.subscription() + ")");
        }
        this.refreshChangelogInfo();
        HashMap<TopicPartition, StateRestorer> initializable = new HashMap<TopicPartition, StateRestorer>();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.needsInitializing.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            if (!this.hasPartition(topicPartition)) continue;
            initializable.put(entry.getKey(), entry.getValue());
        }
        try {
            this.endOffsets.putAll(this.consumer.endOffsets(initializable.keySet()));
        }
        catch (TimeoutException e) {
            this.log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
            return;
        }
        Iterator iter = initializable.keySet().iterator();
        while (iter.hasNext()) {
            TopicPartition topicPartition = (TopicPartition)iter.next();
            Long endOffset = this.endOffsets.get(topicPartition);
            if (endOffset != null) {
                StateRestorer restorer = this.needsInitializing.get(topicPartition);
                if (restorer.checkpoint() >= endOffset) {
                    restorer.setRestoredOffset(restorer.checkpoint());
                    iter.remove();
                } else if (restorer.offsetLimit() == 0L || endOffset == 0L) {
                    restorer.setRestoredOffset(0L);
                    iter.remove();
                } else {
                    restorer.setEndingOffset(endOffset);
                }
                this.needsInitializing.remove(topicPartition);
                continue;
            }
            this.log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop");
            iter.remove();
        }
        if (!initializable.isEmpty()) {
            this.startRestoration(initializable);
        }
    }

    private void startRestoration(Map<TopicPartition, StateRestorer> initialized) {
        this.log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
        HashSet<TopicPartition> assignment = new HashSet<TopicPartition>(this.consumer.assignment());
        assignment.addAll(initialized.keySet());
        this.consumer.assign(assignment);
        ArrayList<StateRestorer> needsPositionUpdate = new ArrayList<StateRestorer>();
        for (StateRestorer restorer : initialized.values()) {
            if (restorer.checkpoint() != -1L) {
                this.consumer.seek(restorer.partition(), restorer.checkpoint());
                this.logRestoreOffsets(restorer.partition(), restorer.checkpoint(), this.endOffsets.get(restorer.partition()));
                restorer.setStartingOffset(this.consumer.position(restorer.partition()));
                restorer.restoreStarted();
                continue;
            }
            this.consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
            needsPositionUpdate.add(restorer);
        }
        for (StateRestorer restorer : needsPositionUpdate) {
            long position = this.consumer.position(restorer.partition());
            this.logRestoreOffsets(restorer.partition(), position, this.endOffsets.get(restorer.partition()));
            restorer.setStartingOffset(position);
            restorer.restoreStarted();
        }
        this.needsRestoring.putAll(initialized);
    }

    private void logRestoreOffsets(TopicPartition partition, long startingOffset, Long endOffset) {
        this.log.debug("Restoring partition {} from offset {} to endOffset {}", new Object[]{partition, startingOffset, endOffset});
    }

    private Collection<TopicPartition> completed() {
        HashSet<TopicPartition> completed = new HashSet<TopicPartition>(this.stateRestorers.keySet());
        completed.removeAll(this.needsRestoring.keySet());
        this.log.trace("The set of restoration completed partitions so far: {}", completed);
        return completed;
    }

    private void refreshChangelogInfo() {
        try {
            this.partitionInfo.putAll(this.consumer.listTopics());
        }
        catch (TimeoutException e) {
            this.log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
        }
    }

    @Override
    public Map<TopicPartition, Long> restoredOffsets() {
        HashMap<TopicPartition, Long> restoredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.stateRestorers.entrySet()) {
            StateRestorer restorer = entry.getValue();
            if (!restorer.isPersistent()) continue;
            restoredOffsets.put(entry.getKey(), restorer.restoredOffset());
        }
        return restoredOffsets;
    }

    @Override
    public void reset() {
        this.partitionInfo.clear();
        this.stateRestorers.clear();
        this.needsRestoring.clear();
        this.endOffsets.clear();
        this.needsInitializing.clear();
    }

    private void restorePartition(ConsumerRecords<byte[], byte[]> allRecords, TopicPartition topicPartition, Task task) {
        StateRestorer restorer = this.stateRestorers.get(topicPartition);
        Long endOffset = this.endOffsets.get(topicPartition);
        long pos = this.processNext(allRecords.records(topicPartition), restorer, endOffset);
        restorer.setRestoredOffset(pos);
        if (restorer.hasCompleted(pos, endOffset)) {
            if (pos > endOffset + 1L) {
                throw new TaskMigratedException(task, topicPartition, endOffset, pos);
            }
            this.log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", new Object[]{topicPartition, restorer.restoredNumRecords(), restorer.startingOffset(), restorer.restoredOffset()});
            restorer.restoreDone();
            this.needsRestoring.remove(topicPartition);
        }
    }

    private long processNext(List<ConsumerRecord<byte[], byte[]>> records, StateRestorer restorer, Long endOffset) {
        ConsumerRecord<byte[], byte[]> record;
        ArrayList<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<KeyValue<byte[], byte[]>>();
        long offset = -1L;
        Iterator<ConsumerRecord<byte[], byte[]>> i$ = records.iterator();
        while (i$.hasNext() && !restorer.hasCompleted(offset = (record = i$.next()).offset(), endOffset)) {
            if (record.key() == null) continue;
            restoreRecords.add(KeyValue.pair(record.key(), record.value()));
        }
        if (offset == -1L) {
            offset = this.consumer.position(restorer.partition());
        }
        if (!restoreRecords.isEmpty()) {
            restorer.restore(restoreRecords);
            restorer.restoreBatchCompleted(offset + 1L, records.size());
        }
        return this.consumer.position(restorer.partition());
    }

    private boolean hasPartition(TopicPartition topicPartition) {
        List<PartitionInfo> partitions = this.partitionInfo.get(topicPartition.topic());
        if (partitions == null) {
            return false;
        }
        for (PartitionInfo partition : partitions) {
            if (partition.partition() != topicPartition.partition()) continue;
            return true;
        }
        return false;
    }
}

