/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalTopologyBuilder {
    private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
    private static final String[] NO_PREDECESSORS = new String[0];
    private final Map<String, NodeFactory> nodeFactories = new LinkedHashMap<String, NodeFactory>();
    private final Map<String, StateStoreFactory> stateFactories = new HashMap<String, StateStoreFactory>();
    private final Map<String, StoreBuilder> globalStateBuilders = new LinkedHashMap<String, StoreBuilder>();
    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<String, StateStore>();
    private final Set<String> sourceTopicNames = new HashSet<String>();
    private final Set<String> internalTopicNames = new HashSet<String>();
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<Set<String>>();
    private final Map<String, List<String>> nodeToSourceTopics = new HashMap<String, List<String>>();
    private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<String, Pattern>();
    private final Map<String, String> nodeToSinkTopic = new HashMap<String, String>();
    private final Map<String, Pattern> topicToPatterns = new HashMap<String, Pattern>();
    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<String, Set<String>>();
    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<String, Set<Pattern>>();
    private final Map<String, String> storeToChangelogTopic = new HashMap<String, String>();
    private final Set<String> globalTopics = new HashSet<String>();
    private final Set<String> earliestResetTopics = new HashSet<String>();
    private final Set<String> latestResetTopics = new HashSet<String>();
    private final Set<Pattern> earliestResetPatterns = new HashSet<Pattern>();
    private final Set<Pattern> latestResetPatterns = new HashSet<Pattern>();
    private final QuickUnion<String> nodeGrouper = new QuickUnion();
    private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
    private String applicationId = null;
    private Pattern topicPattern = null;
    private Map<Integer, Set<String>> nodeGroups = null;
    private static final NodeComparator NODE_COMPARATOR = new NodeComparator();
    private static final GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator();
    private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();

    public final synchronized InternalTopologyBuilder setApplicationId(String applicationId) {
        Objects.requireNonNull(applicationId, "applicationId can't be null");
        this.applicationId = applicationId;
        return this;
    }

    public final synchronized InternalTopologyBuilder rewriteTopology(StreamsConfig config) {
        Objects.requireNonNull(config, "config can't be null");
        this.setApplicationId(config.getString("application.id"));
        if (config.getLong("cache.max.bytes.buffering") == 0L) {
            for (StateStoreFactory storeFactory : this.stateFactories.values()) {
                storeFactory.builder.withCachingDisabled();
            }
            for (StoreBuilder storeBuilder : this.globalStateBuilders.values()) {
                storeBuilder.withCachingDisabled();
            }
        }
        for (StoreBuilder storeBuilder : this.globalStateBuilders.values()) {
            this.globalStateStores.put(storeBuilder.name(), (StateStore)storeBuilder.build());
        }
        return this;
    }

    public final void addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valDeserializer, String ... topics) {
        if (topics.length == 0) {
            throw new TopologyException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            this.validateTopicNotAlreadyRegistered(topic);
            this.maybeAddToResetList(this.earliestResetTopics, this.latestResetTopics, offsetReset, topic);
            this.sourceTopicNames.add(topic);
        }
        this.nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourceTopics.put(name, Arrays.asList(topics));
        this.nodeGrouper.add(name);
        this.nodeGroups = null;
    }

    public final void addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
        Objects.requireNonNull(name, "name can't be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        for (String sourceTopicName : this.sourceTopicNames) {
            if (!topicPattern.matcher(sourceTopicName).matches()) continue;
            throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
        }
        for (Pattern otherPattern : this.earliestResetPatterns) {
            if (!topicPattern.pattern().contains(otherPattern.pattern()) && !otherPattern.pattern().contains(topicPattern.pattern())) continue;
            throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
        }
        for (Pattern otherPattern : this.latestResetPatterns) {
            if (!topicPattern.pattern().contains(otherPattern.pattern()) && !otherPattern.pattern().contains(topicPattern.pattern())) continue;
            throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
        }
        this.maybeAddToResetList(this.earliestResetPatterns, this.latestResetPatterns, offsetReset, topicPattern);
        this.nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourcePatterns.put(name, topicPattern);
        this.nodeGrouper.add(name);
        this.nodeGroups = null;
    }

    public final <K, V> void addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String ... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        if (predecessorNames.length == 0) {
            throw new TopologyException("Sink " + name + " must have at least one parent");
        }
        this.addSink(name, new StaticTopicNameExtractor(topic), keySerializer, valSerializer, partitioner, predecessorNames);
        this.nodeToSinkTopic.put(name, topic);
        this.nodeGroups = null;
    }

    public final <K, V> void addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String ... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        if (predecessorNames.length == 0) {
            throw new TopologyException("Sink " + name + " must have at least one parent");
        }
        for (String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "predecessor name can't be null");
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!this.nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
            }
            if (!this.nodeToSinkTopic.containsKey(predecessor)) continue;
            throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
        }
        this.nodeFactories.put(name, new SinkNodeFactory(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])predecessorNames);
        this.nodeGroups = null;
    }

    public final void addProcessor(String name, ProcessorSupplier supplier, String ... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        if (predecessorNames.length == 0) {
            throw new TopologyException("Processor " + name + " must have at least one parent");
        }
        for (String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "predecessor name must not be null");
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (this.nodeFactories.containsKey(predecessor)) continue;
            throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
        }
        this.nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])predecessorNames);
        this.nodeGroups = null;
    }

    public final void addStateStore(StoreBuilder<?> storeBuilder, String ... processorNames) {
        this.addStateStore(storeBuilder, false, processorNames);
    }

    public final void addStateStore(StoreBuilder<?> storeBuilder, boolean allowOverride, String ... processorNames) {
        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
        if (!allowOverride && this.stateFactories.containsKey(storeBuilder.name())) {
            throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
        }
        this.stateFactories.put(storeBuilder.name(), new StateStoreFactory(storeBuilder));
        if (processorNames != null) {
            for (String processorName : processorNames) {
                Objects.requireNonNull(processorName, "processor name must not be null");
                this.connectProcessorAndStateStore(processorName, storeBuilder.name());
            }
        }
        this.nodeGroups = null;
    }

    public final void addGlobalStore(StoreBuilder storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier) {
        Objects.requireNonNull(storeBuilder, "store builder must not be null");
        this.validateGlobalStoreArguments(sourceName, topic, processorName, stateUpdateSupplier, storeBuilder.name(), storeBuilder.loggingEnabled());
        this.validateTopicNotAlreadyRegistered(topic);
        String[] topics = new String[]{topic};
        String[] predecessors = new String[]{sourceName};
        ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
        this.globalTopics.add(topic);
        this.nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
        this.nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
        this.nodeGrouper.add(sourceName);
        nodeFactory.addStateStore(storeBuilder.name());
        this.nodeFactories.put(processorName, nodeFactory);
        this.nodeGrouper.add(processorName);
        this.nodeGrouper.unite(processorName, (String[])predecessors);
        this.globalStateBuilders.put(storeBuilder.name(), storeBuilder);
        this.connectSourceStoreAndTopic(storeBuilder.name(), topic);
        this.nodeGroups = null;
    }

    private void validateTopicNotAlreadyRegistered(String topic) {
        if (this.sourceTopicNames.contains(topic) || this.globalTopics.contains(topic)) {
            throw new TopologyException("Topic " + topic + " has already been registered by another source.");
        }
        for (Pattern pattern : this.nodeToSourcePatterns.values()) {
            if (!pattern.matcher(topic).matches()) continue;
            throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
        }
    }

    public final void connectProcessorAndStateStores(String processorName, String ... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName can't be null");
        Objects.requireNonNull(stateStoreNames, "state store list must not be null");
        if (stateStoreNames.length == 0) {
            throw new TopologyException("Must provide at least one state store name.");
        }
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "state store name must not be null");
            this.connectProcessorAndStateStore(processorName, stateStoreName);
        }
        this.nodeGroups = null;
    }

    public void connectSourceStoreAndTopic(String sourceStoreName, String topic) {
        if (this.storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        this.storeToChangelogTopic.put(sourceStoreName, topic);
    }

    public final void addInternalTopic(String topicName) {
        Objects.requireNonNull(topicName, "topicName can't be null");
        this.internalTopicNames.add(topicName);
    }

    public final void copartitionSources(Collection<String> sourceNodes) {
        this.copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<String>(sourceNodes)));
    }

    private void validateGlobalStoreArguments(String sourceName, String topic, String processorName, ProcessorSupplier stateUpdateSupplier, String storeName, boolean loggingEnabled) {
        Objects.requireNonNull(sourceName, "sourceName must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
        Objects.requireNonNull(processorName, "processorName must not be null");
        if (this.nodeFactories.containsKey(sourceName)) {
            throw new TopologyException("Processor " + sourceName + " is already added.");
        }
        if (this.nodeFactories.containsKey(processorName)) {
            throw new TopologyException("Processor " + processorName + " is already added.");
        }
        if (this.stateFactories.containsKey(storeName) || this.globalStateBuilders.containsKey(storeName)) {
            throw new TopologyException("StateStore " + storeName + " is already added.");
        }
        if (loggingEnabled) {
            throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
        }
        if (sourceName.equals(processorName)) {
            throw new TopologyException("sourceName and processorName must be different.");
        }
    }

    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
        if (this.globalStateBuilders.containsKey(stateStoreName)) {
            throw new TopologyException("Global StateStore " + stateStoreName + " can be used by a Processor without being specified; it should not be explicitly passed.");
        }
        if (!this.stateFactories.containsKey(stateStoreName)) {
            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
        }
        if (!this.nodeFactories.containsKey(processorName)) {
            throw new TopologyException("Processor " + processorName + " is not added yet.");
        }
        StateStoreFactory stateStoreFactory = this.stateFactories.get(stateStoreName);
        Iterator iter = stateStoreFactory.users().iterator();
        if (iter.hasNext()) {
            String user = (String)iter.next();
            this.nodeGrouper.unite(user, (String[])new String[]{processorName});
        }
        stateStoreFactory.users().add(processorName);
        NodeFactory nodeFactory = this.nodeFactories.get(processorName);
        if (!(nodeFactory instanceof ProcessorNodeFactory)) {
            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
        }
        ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory)nodeFactory;
        processorNodeFactory.addStateStore(stateStoreName);
        this.connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
    }

    private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(String[] predecessors) {
        HashSet<SourceNodeFactory> sourceNodes = new HashSet<SourceNodeFactory>();
        for (String predecessor : predecessors) {
            NodeFactory nodeFactory = this.nodeFactories.get(predecessor);
            if (nodeFactory instanceof SourceNodeFactory) {
                sourceNodes.add((SourceNodeFactory)nodeFactory);
                continue;
            }
            if (!(nodeFactory instanceof ProcessorNodeFactory)) continue;
            sourceNodes.addAll(this.findSourcesForProcessorPredecessors(((ProcessorNodeFactory)nodeFactory).predecessors));
        }
        return sourceNodes;
    }

    private void connectStateStoreNameToSourceTopicsOrPattern(String stateStoreName, ProcessorNodeFactory processorNodeFactory) {
        if (this.stateStoreNameToSourceTopics.containsKey(stateStoreName) || this.stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
            return;
        }
        HashSet sourceTopics = new HashSet();
        HashSet<Pattern> sourcePatterns = new HashSet<Pattern>();
        Set<SourceNodeFactory> sourceNodesForPredecessor = this.findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
        for (SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
            if (sourceNodeFactory.pattern != null) {
                sourcePatterns.add(sourceNodeFactory.pattern);
                continue;
            }
            sourceTopics.addAll(sourceNodeFactory.topics);
        }
        if (!sourceTopics.isEmpty()) {
            this.stateStoreNameToSourceTopics.put(stateStoreName, Collections.unmodifiableSet(sourceTopics));
        }
        if (!sourcePatterns.isEmpty()) {
            this.stateStoreNameToSourceRegex.put(stateStoreName, Collections.unmodifiableSet(sourcePatterns));
        }
    }

    private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T> latestResets, Topology.AutoOffsetReset offsetReset, T item) {
        if (offsetReset != null) {
            switch (offsetReset) {
                case EARLIEST: {
                    earliestResets.add(item);
                    break;
                }
                case LATEST: {
                    latestResets.add(item);
                    break;
                }
                default: {
                    throw new TopologyException(String.format("Unrecognized reset format %s", new Object[]{offsetReset}));
                }
            }
        }
    }

    public synchronized Map<Integer, Set<String>> nodeGroups() {
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        return this.nodeGroups;
    }

    private Map<Integer, Set<String>> makeNodeGroups() {
        LinkedHashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<Integer, Set<String>>();
        HashMap<String, Set<String>> rootToNodeGroup = new HashMap<String, Set<String>>();
        int nodeGroupId = 0;
        HashSet<String> allSourceNodes = new HashSet<String>(this.nodeToSourceTopics.keySet());
        allSourceNodes.addAll(this.nodeToSourcePatterns.keySet());
        for (String nodeName : Utils.sorted(allSourceNodes)) {
            nodeGroupId = this.putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
        }
        for (String nodeName : Utils.sorted(this.nodeFactories.keySet())) {
            if (this.nodeToSourceTopics.containsKey(nodeName)) continue;
            nodeGroupId = this.putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
        }
        return nodeGroups;
    }

    private int putNodeGroupName(String nodeName, int nodeGroupId, Map<Integer, Set<String>> nodeGroups, Map<String, Set<String>> rootToNodeGroup) {
        int newNodeGroupId = nodeGroupId;
        String root = this.nodeGrouper.root(nodeName);
        Set<String> nodeGroup = rootToNodeGroup.get(root);
        if (nodeGroup == null) {
            nodeGroup = new HashSet<String>();
            rootToNodeGroup.put(root, nodeGroup);
            nodeGroups.put(newNodeGroupId++, nodeGroup);
        }
        nodeGroup.add(nodeName);
        return newNodeGroupId;
    }

    public synchronized ProcessorTopology build() {
        return this.build((Integer)null);
    }

    public synchronized ProcessorTopology build(Integer topicGroupId) {
        Set<String> nodeGroup;
        if (topicGroupId != null) {
            nodeGroup = this.nodeGroups().get(topicGroupId);
        } else {
            Set<String> globalNodeGroups = this.globalNodeGroups();
            Collection<Set<String>> values = this.nodeGroups().values();
            nodeGroup = new HashSet<String>();
            for (Set<String> value : values) {
                nodeGroup.addAll(value);
            }
            nodeGroup.removeAll(globalNodeGroups);
        }
        return this.build(nodeGroup);
    }

    public synchronized ProcessorTopology buildGlobalStateTopology() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        Set<String> globalGroups = this.globalNodeGroups();
        if (globalGroups.isEmpty()) {
            return null;
        }
        return this.build(globalGroups);
    }

    private Set<String> globalNodeGroups() {
        HashSet<String> globalGroups = new HashSet<String>();
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.nodeGroups().entrySet()) {
            Set<String> nodes = nodeGroup.getValue();
            for (String node : nodes) {
                if (!this.isGlobalSource(node)) continue;
                globalGroups.addAll(nodes);
            }
        }
        return globalGroups;
    }

    private ProcessorTopology build(Set<String> nodeGroup) {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        LinkedHashMap<String, ProcessorNode> processorMap = new LinkedHashMap<String, ProcessorNode>();
        HashMap<String, SourceNode> topicSourceMap = new HashMap<String, SourceNode>();
        HashMap<String, SinkNode> topicSinkMap = new HashMap<String, SinkNode>();
        LinkedHashMap<String, StateStore> stateStoreMap = new LinkedHashMap<String, StateStore>();
        HashSet<String> repartitionTopics = new HashSet<String>();
        for (NodeFactory factory : this.nodeFactories.values()) {
            if (nodeGroup != null && !nodeGroup.contains(factory.name)) continue;
            ProcessorNode node = factory.build();
            processorMap.put(node.name(), node);
            if (factory instanceof ProcessorNodeFactory) {
                this.buildProcessorNode(processorMap, stateStoreMap, (ProcessorNodeFactory)factory, node);
                continue;
            }
            if (factory instanceof SourceNodeFactory) {
                this.buildSourceNode(topicSourceMap, repartitionTopics, (SourceNodeFactory)factory, (SourceNode)node);
                continue;
            }
            if (factory instanceof SinkNodeFactory) {
                this.buildSinkNode(processorMap, topicSinkMap, repartitionTopics, (SinkNodeFactory)factory, (SinkNode)node);
                continue;
            }
            throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
        }
        return new ProcessorTopology(new ArrayList<ProcessorNode>(processorMap.values()), topicSourceMap, topicSinkMap, new ArrayList<StateStore>(stateStoreMap.values()), new ArrayList<StateStore>(this.globalStateStores.values()), this.storeToChangelogTopic, repartitionTopics);
    }

    private void buildSinkNode(Map<String, ProcessorNode> processorMap, Map<String, SinkNode> topicSinkMap, Set<String> repartitionTopics, SinkNodeFactory sinkNodeFactory, SinkNode node) {
        for (String predecessor : sinkNodeFactory.predecessors) {
            processorMap.get(predecessor).addChild(node);
            if (!(sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor)) continue;
            String topic = ((StaticTopicNameExtractor)((SinkNodeFactory)sinkNodeFactory).topicExtractor).topicName;
            if (this.internalTopicNames.contains(topic)) {
                String decoratedTopic = this.decorateTopic(topic);
                topicSinkMap.put(decoratedTopic, node);
                repartitionTopics.add(decoratedTopic);
                continue;
            }
            topicSinkMap.put(topic, node);
        }
    }

    private void buildSourceNode(Map<String, SourceNode> topicSourceMap, Set<String> repartitionTopics, SourceNodeFactory sourceNodeFactory, SourceNode node) {
        List<String> topics = sourceNodeFactory.pattern != null ? sourceNodeFactory.getTopics(this.subscriptionUpdates.getUpdates()) : sourceNodeFactory.topics;
        for (String topic : topics) {
            if (this.internalTopicNames.contains(topic)) {
                String decoratedTopic = this.decorateTopic(topic);
                topicSourceMap.put(decoratedTopic, node);
                repartitionTopics.add(decoratedTopic);
                continue;
            }
            topicSourceMap.put(topic, node);
        }
    }

    private void buildProcessorNode(Map<String, ProcessorNode> processorMap, Map<String, StateStore> stateStoreMap, ProcessorNodeFactory factory, ProcessorNode node) {
        for (String predecessor : factory.predecessors) {
            ProcessorNode predecessorNode = processorMap.get(predecessor);
            predecessorNode.addChild(node);
        }
        for (String stateStoreName : factory.stateStoreNames) {
            if (stateStoreMap.containsKey(stateStoreName)) continue;
            if (this.stateFactories.containsKey(stateStoreName)) {
                StateStoreFactory stateStoreFactory = this.stateFactories.get(stateStoreName);
                if (stateStoreFactory.loggingEnabled() && !this.storeToChangelogTopic.containsKey(stateStoreName)) {
                    String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
                    this.storeToChangelogTopic.put(stateStoreName, changelogTopic);
                }
                stateStoreMap.put(stateStoreName, stateStoreFactory.build());
                continue;
            }
            stateStoreMap.put(stateStoreName, this.globalStateStores.get(stateStoreName));
        }
    }

    public Map<String, StateStore> globalStateStores() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        return Collections.unmodifiableMap(this.globalStateStores);
    }

    public Set<String> allStateStoreName() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        HashSet<String> allNames = new HashSet<String>(this.stateFactories.keySet());
        allNames.addAll(this.globalStateStores.keySet());
        return Collections.unmodifiableSet(allNames);
    }

    public synchronized Map<Integer, TopicsInfo> topicGroups() {
        LinkedHashMap<Integer, TopicsInfo> topicGroups = new LinkedHashMap<Integer, TopicsInfo>();
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        for (Map.Entry<Integer, Set<String>> entry : this.nodeGroups.entrySet()) {
            HashSet<String> sinkTopics = new HashSet<String>();
            HashSet<String> sourceTopics = new HashSet<String>();
            HashMap<String, RepartitionTopicConfig> repartitionTopics = new HashMap<String, RepartitionTopicConfig>();
            HashMap<String, InternalTopicConfig> stateChangelogTopics = new HashMap<String, InternalTopicConfig>();
            for (String node : entry.getValue()) {
                String topic;
                List<String> topics = this.nodeToSourceTopics.get(node);
                if (topics != null) {
                    for (String topic2 : topics) {
                        if (this.globalTopics.contains(topic2)) continue;
                        if (this.internalTopicNames.contains(topic2)) {
                            String internalTopic = this.decorateTopic(topic2);
                            repartitionTopics.put(internalTopic, new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
                            sourceTopics.add(internalTopic);
                            continue;
                        }
                        sourceTopics.add(topic2);
                    }
                }
                if ((topic = this.nodeToSinkTopic.get(node)) != null) {
                    if (this.internalTopicNames.contains(topic)) {
                        sinkTopics.add(this.decorateTopic(topic));
                    } else {
                        sinkTopics.add(topic);
                    }
                }
                for (StateStoreFactory stateFactory : this.stateFactories.values()) {
                    String topicName;
                    if (!stateFactory.loggingEnabled() || !stateFactory.users().contains(node) || stateChangelogTopics.containsKey(topicName = this.storeToChangelogTopic.containsKey(stateFactory.name()) ? this.storeToChangelogTopic.get(stateFactory.name()) : ProcessorStateManager.storeChangelogTopic(this.applicationId, stateFactory.name()))) continue;
                    InternalTopicConfig internalTopicConfig = this.createChangelogTopicConfig(stateFactory, topicName);
                    stateChangelogTopics.put(topicName, internalTopicConfig);
                }
            }
            if (sourceTopics.isEmpty()) continue;
            topicGroups.put(entry.getKey(), new TopicsInfo(Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(repartitionTopics), Collections.unmodifiableMap(stateChangelogTopics)));
        }
        return Collections.unmodifiableMap(topicGroups);
    }

    private void setRegexMatchedTopicsToSourceNodes() {
        if (this.subscriptionUpdates.hasUpdates()) {
            for (Map.Entry<String, Pattern> stringPatternEntry : this.nodeToSourcePatterns.entrySet()) {
                SourceNodeFactory sourceNode = (SourceNodeFactory)this.nodeFactories.get(stringPatternEntry.getKey());
                this.nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(this.subscriptionUpdates.getUpdates()));
                log.debug("nodeToSourceTopics {}", this.nodeToSourceTopics);
            }
        }
    }

    private void setRegexMatchedTopicToStateStore() {
        if (this.subscriptionUpdates.hasUpdates()) {
            for (Map.Entry<String, Set<Pattern>> storePattern : this.stateStoreNameToSourceRegex.entrySet()) {
                HashSet<String> updatedTopicsForStateStore = new HashSet<String>();
                for (String subscriptionUpdateTopic : this.subscriptionUpdates.getUpdates()) {
                    for (Pattern pattern : storePattern.getValue()) {
                        if (!pattern.matcher(subscriptionUpdateTopic).matches()) continue;
                        updatedTopicsForStateStore.add(subscriptionUpdateTopic);
                    }
                }
                if (updatedTopicsForStateStore.isEmpty()) continue;
                Collection storeTopics = this.stateStoreNameToSourceTopics.get(storePattern.getKey());
                if (storeTopics != null) {
                    updatedTopicsForStateStore.addAll(storeTopics);
                }
                this.stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
            }
        }
    }

    private InternalTopicConfig createChangelogTopicConfig(StateStoreFactory factory, String name) {
        if (factory.isWindowStore()) {
            WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
            config.setRetentionMs(factory.retentionPeriod());
            return config;
        }
        return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
    }

    public synchronized Pattern earliestResetTopicsPattern() {
        return this.resetTopicsPattern(this.earliestResetTopics, this.earliestResetPatterns);
    }

    public synchronized Pattern latestResetTopicsPattern() {
        return this.resetTopicsPattern(this.latestResetTopics, this.latestResetPatterns);
    }

    private Pattern resetTopicsPattern(Set<String> resetTopics, Set<Pattern> resetPatterns) {
        List<String> topics = this.maybeDecorateInternalSourceTopics(resetTopics);
        return InternalTopologyBuilder.buildPatternForOffsetResetTopics(topics, resetPatterns);
    }

    private static Pattern buildPatternForOffsetResetTopics(Collection<String> sourceTopics, Collection<Pattern> sourcePatterns) {
        StringBuilder builder = new StringBuilder();
        for (String topic : sourceTopics) {
            builder.append(topic).append("|");
        }
        for (Pattern sourcePattern : sourcePatterns) {
            builder.append(sourcePattern.pattern()).append("|");
        }
        if (builder.length() > 0) {
            builder.setLength(builder.length() - 1);
            return Pattern.compile(builder.toString());
        }
        return EMPTY_ZERO_LENGTH_PATTERN;
    }

    public Map<String, List<String>> stateStoreNameToSourceTopics() {
        HashMap<String, List<String>> results = new HashMap<String, List<String>>();
        for (Map.Entry<String, Set<String>> entry : this.stateStoreNameToSourceTopics.entrySet()) {
            results.put(entry.getKey(), this.maybeDecorateInternalSourceTopics((Collection<String>)entry.getValue()));
        }
        return results;
    }

    public synchronized Collection<Set<String>> copartitionGroups() {
        ArrayList list = new ArrayList(this.copartitionSourceGroups.size());
        for (Set<String> nodeNames : this.copartitionSourceGroups) {
            HashSet<String> copartitionGroup = new HashSet<String>();
            for (String node : nodeNames) {
                List<String> topics = this.nodeToSourceTopics.get(node);
                if (topics == null) continue;
                copartitionGroup.addAll(this.maybeDecorateInternalSourceTopics(topics));
            }
            list.add(Collections.unmodifiableSet(copartitionGroup));
        }
        return Collections.unmodifiableList(list);
    }

    private List<String> maybeDecorateInternalSourceTopics(Collection<String> sourceTopics) {
        ArrayList<String> decoratedTopics = new ArrayList<String>();
        for (String topic : sourceTopics) {
            if (this.internalTopicNames.contains(topic)) {
                decoratedTopics.add(this.decorateTopic(topic));
                continue;
            }
            decoratedTopics.add(topic);
        }
        return decoratedTopics;
    }

    private String decorateTopic(String topic) {
        if (this.applicationId == null) {
            throw new TopologyException("there are internal topics and applicationId hasn't been set. Call setApplicationId first");
        }
        return this.applicationId + "-" + topic;
    }

    SubscriptionUpdates subscriptionUpdates() {
        return this.subscriptionUpdates;
    }

    synchronized Pattern sourceTopicPattern() {
        if (this.topicPattern == null) {
            ArrayList<String> allSourceTopics = new ArrayList<String>();
            if (!this.nodeToSourceTopics.isEmpty()) {
                for (List<String> topics : this.nodeToSourceTopics.values()) {
                    allSourceTopics.addAll(this.maybeDecorateInternalSourceTopics(topics));
                }
            }
            Collections.sort(allSourceTopics);
            this.topicPattern = InternalTopologyBuilder.buildPatternForOffsetResetTopics(allSourceTopics, this.nodeToSourcePatterns.values());
        }
        return this.topicPattern;
    }

    synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String logPrefix) {
        log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)", (Object)logPrefix, (Object)subscriptionUpdates);
        this.subscriptionUpdates = subscriptionUpdates;
        this.setRegexMatchedTopicsToSourceNodes();
        this.setRegexMatchedTopicToStateStore();
    }

    private boolean isGlobalSource(String nodeName) {
        NodeFactory nodeFactory = this.nodeFactories.get(nodeName);
        if (nodeFactory instanceof SourceNodeFactory) {
            List topics = ((SourceNodeFactory)nodeFactory).topics;
            return topics != null && topics.size() == 1 && this.globalTopics.contains(topics.get(0));
        }
        return false;
    }

    public TopologyDescription describe() {
        TopologyDescription description = new TopologyDescription();
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.makeNodeGroups().entrySet()) {
            Set<String> allNodesOfGroups = nodeGroup.getValue();
            boolean isNodeGroupOfGlobalStores = this.nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
            if (!isNodeGroupOfGlobalStores) {
                this.describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
                continue;
            }
            this.describeGlobalStore(description, allNodesOfGroups, nodeGroup.getKey());
        }
        return description;
    }

    private void describeGlobalStore(TopologyDescription description, Set<String> nodes, int id) {
        Iterator<String> it = nodes.iterator();
        while (it.hasNext()) {
            String node = it.next();
            if (!this.isGlobalSource(node)) continue;
            it.remove();
            String processorNode = nodes.iterator().next();
            description.addGlobalStore(new GlobalStore(node, processorNode, (String)((ProcessorNodeFactory)this.nodeFactories.get(processorNode)).stateStoreNames.iterator().next(), this.nodeToSourceTopics.get(node).get(0), id));
            break;
        }
    }

    private boolean nodeGroupContainsGlobalSourceNode(Set<String> allNodesOfGroups) {
        for (String node : allNodesOfGroups) {
            if (!this.isGlobalSource(node)) continue;
            return true;
        }
        return false;
    }

    private static void updateSize(AbstractNode node, int delta) {
        node.size += delta;
        for (TopologyDescription.Node predecessor : node.predecessors()) {
            InternalTopologyBuilder.updateSize((AbstractNode)predecessor, delta);
        }
    }

    private void describeSubtopology(TopologyDescription description, Integer subtopologyId, Set<String> nodeNames) {
        HashMap<String, AbstractNode> nodesByName = new HashMap<String, AbstractNode>();
        for (String nodeName : nodeNames) {
            nodesByName.put(nodeName, this.nodeFactories.get(nodeName).describe());
        }
        for (AbstractNode node : nodesByName.values()) {
            for (String predecessorName : this.nodeFactories.get((Object)node.name()).predecessors) {
                AbstractNode predecessor = (AbstractNode)nodesByName.get(predecessorName);
                node.addPredecessor(predecessor);
                predecessor.addSuccessor(node);
                InternalTopologyBuilder.updateSize(predecessor, node.size);
            }
        }
        description.addSubtopology(new Subtopology(subtopologyId, new HashSet<TopologyDescription.Node>(nodesByName.values())));
    }

    private static String nodeNames(Set<TopologyDescription.Node> nodes) {
        StringBuilder sb = new StringBuilder();
        if (!nodes.isEmpty()) {
            for (TopologyDescription.Node n : nodes) {
                sb.append(n.name());
                sb.append(", ");
            }
        } else {
            return "none";
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }

    void updateSubscribedTopics(Set<String> topics, String logPrefix) {
        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
        log.debug("{}found {} topics possibly matching regex", (Object)logPrefix, topics);
        subscriptionUpdates.updateTopics(topics);
        this.updateSubscriptions(subscriptionUpdates, logPrefix);
    }

    public synchronized Set<String> getSourceTopicNames() {
        return this.sourceTopicNames;
    }

    public synchronized Map<String, StateStoreFactory> getStateStores() {
        return this.stateFactories;
    }

    static /* synthetic */ SubtopologyComparator access$2300() {
        return SUBTOPOLOGY_COMPARATOR;
    }

    static /* synthetic */ GlobalStoreComparator access$2400() {
        return GLOBALSTORE_COMPARATOR;
    }

    public static class SubscriptionUpdates {
        private final Set<String> updatedTopicSubscriptions = new HashSet<String>();

        private void updateTopics(Collection<String> topicNames) {
            this.updatedTopicSubscriptions.clear();
            this.updatedTopicSubscriptions.addAll(topicNames);
        }

        public Collection<String> getUpdates() {
            return Collections.unmodifiableSet(this.updatedTopicSubscriptions);
        }

        boolean hasUpdates() {
            return !this.updatedTopicSubscriptions.isEmpty();
        }

        public String toString() {
            return String.format("SubscriptionUpdates{updatedTopicSubscriptions=%s}", this.updatedTopicSubscriptions);
        }
    }

    public static final class TopologyDescription
    implements org.apache.kafka.streams.TopologyDescription {
        private final TreeSet<TopologyDescription.Subtopology> subtopologies = new TreeSet<TopologyDescription.Subtopology>(InternalTopologyBuilder.access$2300());
        private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<TopologyDescription.GlobalStore>(InternalTopologyBuilder.access$2400());

        public void addSubtopology(TopologyDescription.Subtopology subtopology) {
            this.subtopologies.add(subtopology);
        }

        public void addGlobalStore(TopologyDescription.GlobalStore globalStore) {
            this.globalStores.add(globalStore);
        }

        @Override
        public Set<TopologyDescription.Subtopology> subtopologies() {
            return Collections.unmodifiableSet(this.subtopologies);
        }

        @Override
        public Set<TopologyDescription.GlobalStore> globalStores() {
            return Collections.unmodifiableSet(this.globalStores);
        }

        public String toString() {
            TopologyDescription.Subtopology subtopology;
            StringBuilder sb = new StringBuilder();
            sb.append("Topologies:\n ");
            TopologyDescription.Subtopology[] sortedSubtopologies = this.subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]);
            TopologyDescription.GlobalStore[] sortedGlobalStores = this.globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[0]);
            int expectedId = 0;
            int subtopologiesIndex = sortedSubtopologies.length - 1;
            int globalStoresIndex = sortedGlobalStores.length - 1;
            while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                sb.append("  ");
                subtopology = sortedSubtopologies[subtopologiesIndex];
                TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                if (subtopology.id() == expectedId) {
                    sb.append(subtopology);
                    --subtopologiesIndex;
                } else {
                    sb.append(globalStore);
                    --globalStoresIndex;
                }
                ++expectedId;
            }
            while (subtopologiesIndex != -1) {
                subtopology = sortedSubtopologies[subtopologiesIndex];
                sb.append("  ");
                sb.append(subtopology);
                --subtopologiesIndex;
            }
            while (globalStoresIndex != -1) {
                TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                sb.append("  ");
                sb.append(globalStore);
                --globalStoresIndex;
            }
            return sb.toString();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TopologyDescription that = (TopologyDescription)o;
            return this.subtopologies.equals(that.subtopologies) && this.globalStores.equals(that.globalStores);
        }

        public int hashCode() {
            return Objects.hash(this.subtopologies, this.globalStores);
        }
    }

    private static class SubtopologyComparator
    implements Comparator<TopologyDescription.Subtopology>,
    Serializable {
        private SubtopologyComparator() {
        }

        @Override
        public int compare(TopologyDescription.Subtopology subtopology1, TopologyDescription.Subtopology subtopology2) {
            if (subtopology1.equals(subtopology2)) {
                return 0;
            }
            return subtopology1.id() - subtopology2.id();
        }
    }

    private static class GlobalStoreComparator
    implements Comparator<TopologyDescription.GlobalStore>,
    Serializable {
        private GlobalStoreComparator() {
        }

        @Override
        public int compare(TopologyDescription.GlobalStore globalStore1, TopologyDescription.GlobalStore globalStore2) {
            if (globalStore1.equals(globalStore2)) {
                return 0;
            }
            return globalStore1.id() - globalStore2.id();
        }
    }

    public static class TopicsInfo {
        final Set<String> sinkTopics;
        final Set<String> sourceTopics;
        public final Map<String, InternalTopicConfig> stateChangelogTopics;
        public final Map<String, InternalTopicConfig> repartitionSourceTopics;

        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
            this.sinkTopics = sinkTopics;
            this.sourceTopics = sourceTopics;
            this.stateChangelogTopics = stateChangelogTopics;
            this.repartitionSourceTopics = repartitionSourceTopics;
        }

        public boolean equals(Object o) {
            if (o instanceof TopicsInfo) {
                TopicsInfo other = (TopicsInfo)o;
                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
            }
            return false;
        }

        public int hashCode() {
            long n = (long)this.sourceTopics.hashCode() << 32 | (long)this.stateChangelogTopics.hashCode();
            return (int)(n % 0xFFFFFFFFL);
        }

        public String toString() {
            return "TopicsInfo{sinkTopics=" + this.sinkTopics + ", sourceTopics=" + this.sourceTopics + ", repartitionSourceTopics=" + this.repartitionSourceTopics + ", stateChangelogTopics=" + this.stateChangelogTopics + '}';
        }
    }

    public static final class Subtopology
    implements TopologyDescription.Subtopology {
        private final int id;
        private final Set<TopologyDescription.Node> nodes;

        public Subtopology(int id, Set<TopologyDescription.Node> nodes) {
            this.id = id;
            this.nodes = new TreeSet<TopologyDescription.Node>(NODE_COMPARATOR);
            this.nodes.addAll(nodes);
        }

        @Override
        public int id() {
            return this.id;
        }

        @Override
        public Set<TopologyDescription.Node> nodes() {
            return Collections.unmodifiableSet(this.nodes);
        }

        Iterator<TopologyDescription.Node> nodesInOrder() {
            return this.nodes.iterator();
        }

        public String toString() {
            return "Sub-topology: " + this.id + "\n" + this.nodesAsString() + "\n";
        }

        private String nodesAsString() {
            StringBuilder sb = new StringBuilder();
            for (TopologyDescription.Node node : this.nodes) {
                sb.append("    ");
                sb.append(node);
                sb.append('\n');
            }
            return sb.toString();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Subtopology that = (Subtopology)o;
            return this.id == that.id && this.nodes.equals(that.nodes);
        }

        public int hashCode() {
            return Objects.hash(this.id, this.nodes);
        }
    }

    public static final class Sink
    extends AbstractNode
    implements TopologyDescription.Sink {
        private final TopicNameExtractor topicNameExtractor;

        public Sink(String name, TopicNameExtractor topicNameExtractor) {
            super(name);
            this.topicNameExtractor = topicNameExtractor;
        }

        public Sink(String name, String topic) {
            super(name);
            this.topicNameExtractor = new StaticTopicNameExtractor(topic);
        }

        @Override
        public String topic() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return ((StaticTopicNameExtractor)this.topicNameExtractor).topicName;
            }
            return null;
        }

        @Override
        public TopicNameExtractor topicNameExtractor() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return null;
            }
            return this.topicNameExtractor;
        }

        @Override
        public void addSuccessor(TopologyDescription.Node successor) {
            throw new UnsupportedOperationException("Sinks don't have successors.");
        }

        public String toString() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return "Sink: " + this.name + " (topic: " + this.topic() + ")\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
            }
            return "Sink: " + this.name + " (extractor class: " + this.topicNameExtractor + ")\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Sink sink = (Sink)o;
            return this.name.equals(sink.name) && this.topicNameExtractor.equals(sink.topicNameExtractor) && this.predecessors.equals(sink.predecessors);
        }

        public int hashCode() {
            return Objects.hash(this.name, this.topicNameExtractor);
        }
    }

    public static final class Processor
    extends AbstractNode
    implements TopologyDescription.Processor {
        private final Set<String> stores;

        public Processor(String name, Set<String> stores) {
            super(name);
            this.stores = stores;
        }

        @Override
        public Set<String> stores() {
            return Collections.unmodifiableSet(this.stores);
        }

        public String toString() {
            return "Processor: " + this.name + " (stores: " + this.stores + ")\n      --> " + InternalTopologyBuilder.nodeNames(this.successors) + "\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Processor processor = (Processor)o;
            return this.name.equals(processor.name) && this.stores.equals(processor.stores) && this.predecessors.equals(processor.predecessors);
        }

        public int hashCode() {
            return Objects.hash(this.name, this.stores);
        }
    }

    public static final class Source
    extends AbstractNode
    implements TopologyDescription.Source {
        private final Set<String> topics;
        private final Pattern topicPattern;

        public Source(String name, Set<String> topics, Pattern pattern) {
            super(name);
            if (topics == null && pattern == null) {
                throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null.");
            }
            if (topics != null && pattern != null) {
                throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null.");
            }
            this.topics = topics;
            this.topicPattern = pattern;
        }

        @Override
        @Deprecated
        public String topics() {
            return this.topics.toString();
        }

        @Override
        public Set<String> topicSet() {
            return this.topics;
        }

        @Override
        public Pattern topicPattern() {
            return this.topicPattern;
        }

        @Override
        public void addPredecessor(TopologyDescription.Node predecessor) {
            throw new UnsupportedOperationException("Sources don't have predecessors.");
        }

        public String toString() {
            String topicsString = this.topics == null ? this.topicPattern.toString() : this.topics.toString();
            return "Source: " + this.name + " (topics: " + topicsString + ")\n      --> " + InternalTopologyBuilder.nodeNames(this.successors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Source source = (Source)o;
            return this.name.equals(source.name) && Objects.equals(this.topics, source.topics) && (this.topicPattern == null ? source.topicPattern == null : this.topicPattern.pattern().equals(source.topicPattern.pattern()));
        }

        public int hashCode() {
            return Objects.hash(this.name, this.topics, this.topicPattern);
        }
    }

    public static abstract class AbstractNode
    implements TopologyDescription.Node {
        final String name;
        final Set<TopologyDescription.Node> predecessors = new TreeSet<TopologyDescription.Node>(InternalTopologyBuilder.access$1900());
        final Set<TopologyDescription.Node> successors = new TreeSet<TopologyDescription.Node>(InternalTopologyBuilder.access$1900());
        int size;

        AbstractNode(String name) {
            Objects.requireNonNull(name, "name cannot be null");
            this.name = name;
            this.size = 1;
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public Set<TopologyDescription.Node> predecessors() {
            return Collections.unmodifiableSet(this.predecessors);
        }

        @Override
        public Set<TopologyDescription.Node> successors() {
            return Collections.unmodifiableSet(this.successors);
        }

        public void addPredecessor(TopologyDescription.Node predecessor) {
            this.predecessors.add(predecessor);
        }

        public void addSuccessor(TopologyDescription.Node successor) {
            this.successors.add(successor);
        }
    }

    public static final class GlobalStore
    implements TopologyDescription.GlobalStore {
        private final Source source;
        private final Processor processor;
        private final int id;

        public GlobalStore(String sourceName, String processorName, String storeName, String topicName, int id) {
            this.source = new Source(sourceName, Collections.singleton(topicName), null);
            this.processor = new Processor(processorName, Collections.singleton(storeName));
            this.source.successors.add(this.processor);
            this.processor.predecessors.add(this.source);
            this.id = id;
        }

        @Override
        public int id() {
            return this.id;
        }

        @Override
        public TopologyDescription.Source source() {
            return this.source;
        }

        @Override
        public TopologyDescription.Processor processor() {
            return this.processor;
        }

        public String toString() {
            return "Sub-topology: " + this.id + " for global store (will not generate tasks)\n    " + this.source.toString() + "\n    " + this.processor.toString() + "\n";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            GlobalStore that = (GlobalStore)o;
            return this.source.equals(that.source) && this.processor.equals(that.processor);
        }

        public int hashCode() {
            return Objects.hash(this.source, this.processor);
        }
    }

    private static class NodeComparator
    implements Comparator<TopologyDescription.Node>,
    Serializable {
        private NodeComparator() {
        }

        @Override
        public int compare(TopologyDescription.Node node1, TopologyDescription.Node node2) {
            if (node1.equals(node2)) {
                return 0;
            }
            int size1 = ((AbstractNode)node1).size;
            int size2 = ((AbstractNode)node2).size;
            if (size1 != size2) {
                return size2 - size1;
            }
            return node1.name().compareTo(node2.name());
        }
    }

    private class SinkNodeFactory<K, V>
    extends NodeFactory {
        private final Serializer<K> keySerializer;
        private final Serializer<V> valSerializer;
        private final StreamPartitioner<? super K, ? super V> partitioner;
        private final TopicNameExtractor<K, V> topicExtractor;

        private SinkNodeFactory(String name, String[] predecessors, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
            super(name, (String[])predecessors.clone());
            this.topicExtractor = topicExtractor;
            this.keySerializer = keySerializer;
            this.valSerializer = valSerializer;
            this.partitioner = partitioner;
        }

        @Override
        public ProcessorNode build() {
            if (this.topicExtractor instanceof StaticTopicNameExtractor) {
                String topic = ((StaticTopicNameExtractor)this.topicExtractor).topicName;
                if (InternalTopologyBuilder.this.internalTopicNames.contains(topic)) {
                    return new SinkNode<K, V>(this.name, new StaticTopicNameExtractor(InternalTopologyBuilder.this.decorateTopic(topic)), this.keySerializer, this.valSerializer, this.partitioner);
                }
                return new SinkNode<K, V>(this.name, this.topicExtractor, this.keySerializer, this.valSerializer, this.partitioner);
            }
            return new SinkNode<K, V>(this.name, this.topicExtractor, this.keySerializer, this.valSerializer, this.partitioner);
        }

        @Override
        Sink describe() {
            return new Sink(this.name, this.topicExtractor);
        }
    }

    private class SourceNodeFactory
    extends NodeFactory {
        private final List<String> topics;
        private final Pattern pattern;
        private final Deserializer<?> keyDeserializer;
        private final Deserializer<?> valDeserializer;
        private final TimestampExtractor timestampExtractor;

        private SourceNodeFactory(String name, String[] topics, Pattern pattern, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
            super(name, NO_PREDECESSORS);
            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
            this.pattern = pattern;
            this.keyDeserializer = keyDeserializer;
            this.valDeserializer = valDeserializer;
            this.timestampExtractor = timestampExtractor;
        }

        List<String> getTopics(Collection<String> subscribedTopics) {
            if (subscribedTopics.isEmpty()) {
                return Collections.singletonList(String.valueOf(this.pattern));
            }
            ArrayList<String> matchedTopics = new ArrayList<String>();
            for (String update : subscribedTopics) {
                if (this.pattern == InternalTopologyBuilder.this.topicToPatterns.get(update)) {
                    matchedTopics.add(update);
                    continue;
                }
                if (InternalTopologyBuilder.this.topicToPatterns.containsKey(update) && this.isMatch(update)) {
                    throw new TopologyException("Topic " + update + " is already matched for another regex pattern " + InternalTopologyBuilder.this.topicToPatterns.get(update) + " and hence cannot be matched to this regex pattern " + this.pattern + " any more.");
                }
                if (!this.isMatch(update)) continue;
                InternalTopologyBuilder.this.topicToPatterns.put(update, this.pattern);
                matchedTopics.add(update);
            }
            return matchedTopics;
        }

        @Override
        public ProcessorNode build() {
            List sourceTopics = (List)InternalTopologyBuilder.this.nodeToSourceTopics.get(this.name);
            if (sourceTopics == null) {
                return new SourceNode(this.name, Collections.singletonList(String.valueOf(this.pattern)), this.timestampExtractor, this.keyDeserializer, this.valDeserializer);
            }
            return new SourceNode(this.name, InternalTopologyBuilder.this.maybeDecorateInternalSourceTopics(sourceTopics), this.timestampExtractor, this.keyDeserializer, this.valDeserializer);
        }

        private boolean isMatch(String topic) {
            return this.pattern.matcher(topic).matches();
        }

        @Override
        Source describe() {
            return new Source(this.name, (Set<String>)(this.topics.size() == 0 ? null : new HashSet<String>(this.topics)), this.pattern);
        }
    }

    private static class ProcessorNodeFactory
    extends NodeFactory {
        private final ProcessorSupplier<?, ?> supplier;
        private final Set<String> stateStoreNames = new HashSet<String>();

        ProcessorNodeFactory(String name, String[] predecessors, ProcessorSupplier<?, ?> supplier) {
            super(name, (String[])predecessors.clone());
            this.supplier = supplier;
        }

        public void addStateStore(String stateStoreName) {
            this.stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode build() {
            return new ProcessorNode(this.name, this.supplier.get(), this.stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(this.name, new HashSet<String>(this.stateStoreNames));
        }
    }

    private static abstract class NodeFactory {
        final String name;
        final String[] predecessors;

        NodeFactory(String name, String[] predecessors) {
            this.name = name;
            this.predecessors = predecessors;
        }

        public abstract ProcessorNode build();

        abstract AbstractNode describe();
    }

    public static class StateStoreFactory {
        private final StoreBuilder builder;
        private final Set<String> users = new HashSet<String>();

        private StateStoreFactory(StoreBuilder<?> builder) {
            this.builder = builder;
        }

        public StateStore build() {
            return this.builder.build();
        }

        long retentionPeriod() {
            if (this.builder instanceof WindowStoreBuilder) {
                return ((WindowStoreBuilder)this.builder).retentionPeriod();
            }
            if (this.builder instanceof TimestampedWindowStoreBuilder) {
                return ((TimestampedWindowStoreBuilder)this.builder).retentionPeriod();
            }
            if (this.builder instanceof SessionStoreBuilder) {
                return ((SessionStoreBuilder)this.builder).retentionPeriod();
            }
            throw new IllegalStateException("retentionPeriod is not supported when not a window store");
        }

        private Set<String> users() {
            return this.users;
        }

        public boolean loggingEnabled() {
            return this.builder.loggingEnabled();
        }

        private String name() {
            return this.builder.name();
        }

        private boolean isWindowStore() {
            return this.builder instanceof WindowStoreBuilder || this.builder instanceof TimestampedWindowStoreBuilder || this.builder instanceof SessionStoreBuilder;
        }

        private Map<String, String> logConfig() {
            return this.builder.logConfig();
        }
    }
}

