/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.KeyValueStoreFacade;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.internals.WindowStoreFacade;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TestDriverProducer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.streams.test.TestRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyTestDriver
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
    private final LogContext logContext;
    private final Time mockWallClockTime;
    private InternalTopologyBuilder internalTopologyBuilder;
    private static final int PARTITION_ID = 0;
    private static final TaskId TASK_ID = new TaskId(0, 0);
    StreamTask task;
    private GlobalStateUpdateTask globalStateTask;
    private GlobalStateManager globalStateManager;
    private StateDirectory stateDirectory;
    private Metrics metrics;
    ProcessorTopology processorTopology;
    ProcessorTopology globalTopology;
    private final MockConsumer<byte[], byte[]> consumer;
    private final MockProducer<byte[], byte[]> producer;
    private final TestDriverProducer testDriverProducer;
    private final Map<String, TopicPartition> partitionsByInputTopic = new HashMap<String, TopicPartition>();
    private final Map<String, TopicPartition> globalPartitionsByInputTopic = new HashMap<String, TopicPartition>();
    private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition = new HashMap<TopicPartition, AtomicLong>();
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<String, Queue<ProducerRecord<byte[], byte[]>>>();
    private final StreamThread.ProcessingMode processingMode;
    private final StateRestoreListener stateRestoreListener = new StateRestoreListener(){

        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
        }

        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
        }

        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
        }
    };

    public TopologyTestDriver(Topology topology, Properties config) {
        this(topology, config, null);
    }

    @Deprecated
    public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs) {
        this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
    }

    public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime) {
        this(topology.internalTopologyBuilder, config, initialWallClockTime == null ? System.currentTimeMillis() : initialWallClockTime.toEpochMilli());
    }

    private TopologyTestDriver(InternalTopologyBuilder builder, Properties config, long initialWallClockTimeMs) {
        QuietStreamsConfig streamsConfig = new QuietStreamsConfig((Map)config);
        TopologyTestDriver.logIfTaskIdleEnabled((StreamsConfig)streamsConfig);
        this.logContext = new LogContext("topology-test-driver ");
        this.mockWallClockTime = new MockTime(initialWallClockTimeMs);
        this.processingMode = StreamThread.processingMode((StreamsConfig)streamsConfig);
        StreamsMetricsImpl streamsMetrics = this.setupMetrics((StreamsConfig)streamsConfig);
        this.setupTopology(builder, (StreamsConfig)streamsConfig);
        ThreadCache cache = new ThreadCache(this.logContext, Math.max(0L, streamsConfig.getLong("cache.max.bytes.buffering")), streamsMetrics);
        this.consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        ByteArraySerializer bytesSerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, (Serializer)bytesSerializer, (Serializer)bytesSerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null));
            }
        };
        this.testDriverProducer = new TestDriverProducer((StreamsConfig)streamsConfig, new KafkaClientSupplier(){

            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return TopologyTestDriver.this.producer;
            }

            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                throw new IllegalStateException();
            }

            public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
                throw new IllegalStateException();
            }

            public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
                throw new IllegalStateException();
            }
        }, this.logContext);
        this.setupGlobalTask((StreamsConfig)streamsConfig, streamsMetrics, cache);
        this.setupTask((StreamsConfig)streamsConfig, streamsMetrics, cache);
    }

    private static void logIfTaskIdleEnabled(StreamsConfig streamsConfig) {
        Long taskIdleTime = streamsConfig.getLong("max.task.idle.ms");
        if (taskIdleTime > 0L) {
            log.info("Detected {} config in use with TopologyTestDriver (set to {}ms). This means you might need to use TopologyTestDriver#advanceWallClockTime() or enqueue records on all partitions to allow Steams to make progress. TopologyTestDriver will log a message each time it cannot process enqueued records due to {}.", new Object[]{"max.task.idle.ms", taskIdleTime, "max.task.idle.ms"});
        }
    }

    private StreamsMetricsImpl setupMetrics(StreamsConfig streamsConfig) {
        String threadId = Thread.currentThread().getName();
        MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName((String)streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        this.metrics = new Metrics(metricConfig, this.mockWallClockTime);
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test-client", streamsConfig.getString("built.in.metrics.version"));
        streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger(this.mockWallClockTime));
        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor((String)threadId, (String)TASK_ID.toString(), (StreamsMetricsImpl)streamsMetrics);
        return streamsMetrics;
    }

    private void setupTopology(InternalTopologyBuilder builder, StreamsConfig streamsConfig) {
        this.internalTopologyBuilder = builder;
        this.internalTopologyBuilder.rewriteTopology(streamsConfig);
        this.processorTopology = this.internalTopologyBuilder.buildTopology();
        this.globalTopology = this.internalTopologyBuilder.buildGlobalStateTopology();
        for (String topic : this.processorTopology.sourceTopics()) {
            TopicPartition tp = new TopicPartition(topic, 0);
            this.partitionsByInputTopic.put(topic, tp);
            this.offsetsByTopicOrPatternPartition.put(tp, new AtomicLong());
        }
        boolean createStateDirectory = this.processorTopology.hasPersistentLocalStore() || this.globalTopology != null && this.globalTopology.hasPersistentGlobalStore();
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockWallClockTime, createStateDirectory);
    }

    private void setupGlobalTask(StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetrics, ThreadCache cache) {
        if (this.globalTopology != null) {
            MockConsumer globalConsumer = new MockConsumer(OffsetResetStrategy.NONE);
            for (String topicName : this.globalTopology.sourceTopics()) {
                TopicPartition partition = new TopicPartition(topicName, 0);
                this.globalPartitionsByInputTopic.put(topicName, partition);
                this.offsetsByTopicOrPatternPartition.put(partition, new AtomicLong());
                globalConsumer.updatePartitions(topicName, Collections.singletonList(new PartitionInfo(topicName, 0, null, null, null)));
                globalConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
                globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
            }
            this.globalStateManager = new GlobalStateManagerImpl(this.logContext, this.globalTopology, (Consumer)globalConsumer, this.stateDirectory, this.stateRestoreListener, streamsConfig);
            GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(streamsConfig, (StateManager)this.globalStateManager, streamsMetrics, cache);
            this.globalStateManager.setGlobalProcessorContext((InternalProcessorContext)globalProcessorContext);
            this.globalStateTask = new GlobalStateUpdateTask(this.logContext, this.globalTopology, (InternalProcessorContext)globalProcessorContext, this.globalStateManager, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler());
            this.globalStateTask.initialize();
            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, "__null_topic__", (Headers)new RecordHeaders()));
        } else {
            this.globalStateManager = null;
            this.globalStateTask = null;
        }
    }

    private void setupTask(StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetrics, ThreadCache cache) {
        if (!this.partitionsByInputTopic.isEmpty()) {
            this.consumer.assign(this.partitionsByInputTopic.values());
            HashMap<TopicPartition, Long> startOffsets = new HashMap<TopicPartition, Long>();
            for (TopicPartition topicPartition : this.partitionsByInputTopic.values()) {
                startOffsets.put(topicPartition, 0L);
            }
            this.consumer.updateBeginningOffsets(startOffsets);
            ProcessorStateManager stateManager = new ProcessorStateManager(TASK_ID, Task.TaskType.ACTIVE, "exactly_once".equals(streamsConfig.getString("processing.guarantee")), this.logContext, this.stateDirectory, (ChangelogRegister)new MockChangelogRegister(), this.processorTopology.storeToChangelogTopic(), new HashSet<TopicPartition>(this.partitionsByInputTopic.values()));
            RecordCollectorImpl recordCollector = new RecordCollectorImpl(this.logContext, TASK_ID, (StreamsProducer)this.testDriverProducer, streamsConfig.defaultProductionExceptionHandler(), streamsMetrics);
            ProcessorContextImpl context = new ProcessorContextImpl(TASK_ID, streamsConfig, stateManager, streamsMetrics, cache);
            this.task = new StreamTask(TASK_ID, new HashSet<TopicPartition>(this.partitionsByInputTopic.values()), this.processorTopology, this.consumer, streamsConfig, streamsMetrics, this.stateDirectory, cache, this.mockWallClockTime, stateManager, (RecordCollector)recordCollector, (InternalProcessorContext)context);
            this.task.initializeIfNeeded();
            this.task.completeRestoration();
            this.task.processorContext().setRecordContext(new ProcessorRecordContext(0L, -1L, -1, "__null_topic__", (Headers)new RecordHeaders()));
        } else {
            this.task = null;
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Deprecated
    public void pipeInput(ConsumerRecord<byte[], byte[]> consumerRecord) {
        this.pipeRecord(consumerRecord.topic(), consumerRecord.timestamp(), (byte[])consumerRecord.key(), (byte[])consumerRecord.value(), consumerRecord.headers());
    }

    private void pipeRecord(String topicName, long timestamp, byte[] key, byte[] value, Headers headers) {
        TopicPartition inputTopicOrPatternPartition = this.getInputTopicOrPatternPartition(topicName);
        TopicPartition globalInputTopicPartition = this.globalPartitionsByInputTopic.get(topicName);
        if (inputTopicOrPatternPartition == null && globalInputTopicPartition == null) {
            throw new IllegalArgumentException("Unknown topic: " + topicName);
        }
        if (inputTopicOrPatternPartition != null) {
            this.enqueueTaskRecord(topicName, inputTopicOrPatternPartition, timestamp, key, value, headers);
            this.completeAllProcessableWork();
        }
        if (globalInputTopicPartition != null) {
            this.processGlobalRecord(globalInputTopicPartition, timestamp, key, value, headers);
        }
    }

    private void enqueueTaskRecord(String inputTopic, TopicPartition topicOrPatternPartition, long timestamp, byte[] key, byte[] value, Headers headers) {
        this.task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord(inputTopic, topicOrPatternPartition.partition(), this.offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1L, timestamp, TimestampType.CREATE_TIME, Long.valueOf(-1L), key == null ? -1 : key.length, value == null ? -1 : value.length, (Object)key, (Object)value, headers)));
    }

    private void completeAllProcessableWork() {
        this.captureOutputsAndReEnqueueInternalResults();
        if (this.task != null) {
            while (this.task.hasRecordsQueued() && this.task.isProcessable(this.mockWallClockTime.milliseconds())) {
                this.task.process(this.mockWallClockTime.milliseconds());
                this.task.maybePunctuateStreamTime();
                this.commit(this.task.prepareCommit());
                this.task.postCommit();
                this.captureOutputsAndReEnqueueInternalResults();
            }
            if (this.task.hasRecordsQueued()) {
                log.info("Due to the {} configuration, there are currently some records that cannot be processed. Advancing wall-clock time or enqueuing records on the empty topics will allow Streams to process more.", (Object)"max.task.idle.ms");
            }
        }
    }

    private void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA || this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
            this.testDriverProducer.commitTransaction(offsets, new ConsumerGroupMetadata("dummy-app-id"));
        } else {
            this.consumer.commitSync(offsets);
        }
    }

    private void processGlobalRecord(TopicPartition globalInputTopicPartition, long timestamp, byte[] key, byte[] value, Headers headers) {
        this.globalStateTask.update(new ConsumerRecord(globalInputTopicPartition.topic(), globalInputTopicPartition.partition(), this.offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1L, timestamp, TimestampType.CREATE_TIME, Long.valueOf(-1L), key == null ? -1 : key.length, value == null ? -1 : value.length, (Object)key, (Object)value, headers));
        this.globalStateTask.flushState();
    }

    private void validateSourceTopicNameRegexPattern(String inputRecordTopic) {
        for (String sourceTopicName : this.internalTopologyBuilder.sourceTopicNames()) {
            if (sourceTopicName.equals(inputRecordTopic) || !Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) continue;
            throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName + " cannot contain regex pattern for input record topic: " + inputRecordTopic + " and hence cannot process the message.");
        }
    }

    private TopicPartition getInputTopicOrPatternPartition(String topicName) {
        TopicPartition topicPartition;
        if (!this.internalTopologyBuilder.sourceTopicNames().isEmpty()) {
            this.validateSourceTopicNameRegexPattern(topicName);
        }
        if ((topicPartition = this.partitionsByInputTopic.get(topicName)) == null) {
            for (Map.Entry<String, TopicPartition> entry : this.partitionsByInputTopic.entrySet()) {
                if (!Pattern.compile(entry.getKey()).matcher(topicName).matches()) continue;
                return entry.getValue();
            }
        }
        return topicPartition;
    }

    private void captureOutputsAndReEnqueueInternalResults() {
        List output = this.producer.history();
        this.producer.clear();
        for (ProducerRecord record : output) {
            this.outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList()).add(record);
            String outputTopicName = record.topic();
            TopicPartition inputTopicOrPatternPartition = this.getInputTopicOrPatternPartition(outputTopicName);
            TopicPartition globalInputTopicPartition = this.globalPartitionsByInputTopic.get(outputTopicName);
            if (inputTopicOrPatternPartition != null) {
                this.enqueueTaskRecord(outputTopicName, inputTopicOrPatternPartition, record.timestamp(), (byte[])record.key(), (byte[])record.value(), record.headers());
            }
            if (globalInputTopicPartition == null) continue;
            this.processGlobalRecord(globalInputTopicPartition, record.timestamp(), (byte[])record.key(), (byte[])record.value(), record.headers());
        }
    }

    @Deprecated
    public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records) {
        for (ConsumerRecord<byte[], byte[]> record : records) {
            this.pipeInput(record);
        }
    }

    @Deprecated
    public void advanceWallClockTime(long advanceMs) {
        this.advanceWallClockTime(Duration.ofMillis(advanceMs));
    }

    public void advanceWallClockTime(Duration advance) {
        Objects.requireNonNull(advance, "advance cannot be null");
        this.mockWallClockTime.sleep(advance.toMillis());
        if (this.task != null) {
            this.task.maybePunctuateSystemTime();
            this.commit(this.task.prepareCommit());
            this.task.postCommit();
        }
        this.completeAllProcessableWork();
    }

    @Deprecated
    public ProducerRecord<byte[], byte[]> readOutput(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(topic);
        if (outputRecords == null) {
            return null;
        }
        return outputRecords.poll();
    }

    @Deprecated
    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        ProducerRecord<byte[], byte[]> record = this.readOutput(topic);
        if (record == null) {
            return null;
        }
        Object key = keyDeserializer.deserialize(record.topic(), record.headers(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), record.headers(), (byte[])record.value());
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), key, value, (Iterable)record.headers());
    }

    private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(String topicName) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(topicName);
        if (outputRecords == null && !this.processorTopology.sinkTopics().contains(topicName)) {
            throw new IllegalArgumentException("Unknown topic: " + topicName);
        }
        return outputRecords;
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        return new TestInputTopic<K, V>(this, topicName, keySerializer, valueSerializer, Instant.now(), Duration.ZERO);
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance) {
        return new TestInputTopic<K, V>(this, topicName, keySerializer, valueSerializer, startTimestamp, autoAdvance);
    }

    public final <K, V> TestOutputTopic<K, V> createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return new TestOutputTopic<K, V>(this, topicName, keyDeserializer, valueDeserializer);
    }

    public final Set<String> producedTopicNames() {
        return Collections.unmodifiableSet(this.outputRecordsByTopic.keySet());
    }

    ProducerRecord<byte[], byte[]> readRecord(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.getRecordsQueue(topic);
        if (outputRecords == null) {
            return null;
        }
        return outputRecords.poll();
    }

    <K, V> TestRecord<K, V> readRecord(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.getRecordsQueue(topic);
        if (outputRecords == null) {
            throw new NoSuchElementException("Uninitialized topic: " + topic);
        }
        ProducerRecord<byte[], byte[]> record = outputRecords.poll();
        if (record == null) {
            throw new NoSuchElementException("Empty topic: " + topic);
        }
        Object key = keyDeserializer.deserialize(record.topic(), record.headers(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), record.headers(), (byte[])record.value());
        return new TestRecord<Object, Object>(key, value, record.headers(), record.timestamp());
    }

    <K, V> void pipeRecord(String topic, TestRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant time) {
        long timestamp;
        byte[] serializedKey = keySerializer.serialize(topic, record.headers(), record.key());
        byte[] serializedValue = valueSerializer.serialize(topic, record.headers(), record.value());
        if (time != null) {
            timestamp = time.toEpochMilli();
        } else if (record.timestamp() != null) {
            timestamp = record.timestamp();
        } else {
            throw new IllegalStateException("Provided `TestRecord` does not have a timestamp and no timestamp overwrite was provided via `time` parameter.");
        }
        this.pipeRecord(topic, timestamp, serializedKey, serializedValue, record.headers());
    }

    final long getQueueSize(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.getRecordsQueue(topic);
        if (queue == null) {
            return 0L;
        }
        return queue.size();
    }

    final boolean isEmpty(String topic) {
        return this.getQueueSize(topic) == 0L;
    }

    public Map<String, StateStore> getAllStateStores() {
        HashMap<String, StateStore> allStores = new HashMap<String, StateStore>();
        for (String storeName : this.internalTopologyBuilder.allStateStoreName()) {
            allStores.put(storeName, this.getStateStore(storeName, false));
        }
        return allStores;
    }

    public StateStore getStateStore(String name) throws IllegalArgumentException {
        return this.getStateStore(name, true);
    }

    private StateStore getStateStore(String name, boolean throwForBuiltInStores) {
        StateStore stateStore;
        if (this.task != null && (stateStore = ((ProcessorContextImpl)this.task.processorContext()).stateManager().getStore(name)) != null) {
            if (throwForBuiltInStores) {
                this.throwIfBuiltInStore(stateStore);
            }
            return stateStore;
        }
        if (this.globalStateManager != null && (stateStore = this.globalStateManager.getStore(name)) != null) {
            if (throwForBuiltInStores) {
                this.throwIfBuiltInStore(stateStore);
            }
            return stateStore;
        }
        return null;
    }

    private void throwIfBuiltInStore(StateStore stateStore) {
        if (stateStore instanceof TimestampedKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
        }
        if (stateStore instanceof ReadOnlyKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a key-value store and should be accessed via `getKeyValueStore()`");
        }
        if (stateStore instanceof TimestampedWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
        }
        if (stateStore instanceof ReadOnlyWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a window store and should be accessed via `getWindowStore()`");
        }
        if (stateStore instanceof ReadOnlySessionStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a session store and should be accessed via `getSessionStore()`");
        }
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
        StateStore store = this.getStateStore(name, false);
        if (store instanceof TimestampedKeyValueStore) {
            log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
            return new KeyValueStoreFacade((TimestampedKeyValueStore)store);
        }
        return store instanceof KeyValueStore ? (KeyValueStore)store : null;
    }

    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name) {
        StateStore store = this.getStateStore(name, false);
        return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore)store : null;
    }

    public <K, V> WindowStore<K, V> getWindowStore(String name) {
        StateStore store = this.getStateStore(name, false);
        if (store instanceof TimestampedWindowStore) {
            log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
            return new WindowStoreFacade((TimestampedWindowStore)store);
        }
        return store instanceof WindowStore ? (WindowStore)store : null;
    }

    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(String name) {
        StateStore store = this.getStateStore(name, false);
        return store instanceof TimestampedWindowStore ? (TimestampedWindowStore)store : null;
    }

    public <K, V> SessionStore<K, V> getSessionStore(String name) {
        StateStore store = this.getStateStore(name, false);
        return store instanceof SessionStore ? (SessionStore)store : null;
    }

    @Override
    public void close() {
        if (this.task != null) {
            this.task.suspend();
            this.task.prepareCommit();
            this.task.postCommit();
            this.task.closeClean();
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close(false);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        this.completeAllProcessableWork();
        if (this.task != null && this.task.hasRecordsQueued()) {
            log.warn("Found some records that cannot be processed due to the {} configuration during TopologyTestDriver#close().", (Object)"max.task.idle.ms");
        }
        if (this.processingMode == StreamThread.ProcessingMode.AT_LEAST_ONCE) {
            this.producer.close();
        }
        this.stateDirectory.clean();
    }

    static class MockTime
    implements Time {
        private final AtomicLong timeMs;
        private final AtomicLong highResTimeNs;

        MockTime(long startTimestampMs) {
            this.timeMs = new AtomicLong(startTimestampMs);
            this.highResTimeNs = new AtomicLong(startTimestampMs * 1000L * 1000L);
        }

        public long milliseconds() {
            return this.timeMs.get();
        }

        public long nanoseconds() {
            return this.highResTimeNs.get();
        }

        public long hiResClockMs() {
            return TimeUnit.NANOSECONDS.toMillis(this.nanoseconds());
        }

        public void sleep(long ms) {
            if (ms < 0L) {
                throw new IllegalArgumentException("Sleep ms cannot be negative.");
            }
            this.timeMs.addAndGet(ms);
            this.highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
        }

        public void waitObject(Object obj, Supplier<Boolean> condition, long timeoutMs) {
            throw new UnsupportedOperationException();
        }
    }

    static class MockChangelogRegister
    implements ChangelogRegister {
        private final Set<TopicPartition> restoringPartitions = new HashSet<TopicPartition>();

        MockChangelogRegister() {
        }

        public void register(TopicPartition partition, ProcessorStateManager stateManager) {
            this.restoringPartitions.add(partition);
        }

        public void unregister(Collection<TopicPartition> partitions) {
            this.restoringPartitions.removeAll(partitions);
        }
    }
}

