/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.state.changelog.ChangelogAggregatingState;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogListState;
import org.apache.flink.state.changelog.ChangelogMapState;
import org.apache.flink.state.changelog.ChangelogReducingState;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogValueState;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.KvStateChangeLoggerImpl;
import org.apache.flink.state.changelog.PeriodicMaterializationManager;
import org.apache.flink.state.changelog.PriorityQueueStateChangeLoggerImpl;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
    private static final CheckpointOptions CHECKPOINT_OPTIONS = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of((Object)StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of((Object)StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of((Object)StateDescriptor.Type.MAP, ChangelogMapState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (StateFactory)t.f1));
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final Map<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private final Map<String, ChangelogState> changelogStates;
    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter;
    private final Closer closer = Closer.create();
    private final CheckpointStreamFactory streamFactory;
    private ChangelogSnapshotState changelogSnapshotState;
    private long lastCheckpointId = -1L;
    private long materializedId = 0L;
    private InternalKvState lastState;
    private String lastName;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();
    @Nullable
    private SequenceNumber lastUploadedFrom;
    @Nullable
    private SequenceNumber lastUploadedTo;
    private final String subtaskName;
    private short lastCreatedStateId = (short)-1;
    private final NavigableMap<Long, Long> materializationIdByCheckpointId = new TreeMap<Long, Long>();
    private long lastConfirmedMaterializationId = -1L;

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, String subtaskName, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, final CheckpointStorageWorkerView checkpointStorageWorkerView) {
        this.keyedStateBackend = keyedStateBackend;
        this.subtaskName = subtaskName;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.keyValueStatesByName = new HashMap();
        this.priorityQueueStatesByName = new HashMap();
        this.stateChangelogWriter = stateChangelogWriter;
        this.changelogStates = new HashMap<String, ChangelogState>();
        this.changelogSnapshotState = this.completeRestore(initialState);
        this.streamFactory = new CheckpointStreamFactory(){

            public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
                return checkpointStorageWorkerView.createTaskOwnedStateStream();
            }

            public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) throws IOException {
                return false;
            }

            public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
                return null;
            }
        };
        this.closer.register(keyedStateBackend);
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void setCurrentKey(K newKey) {
        this.keyedStateBackend.setCurrentKey(newKey);
    }

    public K getCurrentKey() {
        return (K)this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        return this.keyedStateBackend.getKeys(state, namespace);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return this.keyedStateBackend.getKeysAndNamespaces(state);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
        this.changelogStates.clear();
        this.priorityQueueStatesByName.clear();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keyedStateBackend.registerKeySelectionListener(listener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(listener);
    }

    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        this.keyedStateBackend.applyToAllKeys(namespace, namespaceSerializer, stateDescriptor, function, this::getPartitionedState);
    }

    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            this.functionDelegationHelper.addOrUpdate(stateDescriptor);
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        this.lastCheckpointId = checkpointId;
        this.lastUploadedFrom = this.changelogSnapshotState.lastMaterializedTo();
        this.lastUploadedTo = this.stateChangelogWriter.nextSequenceNumber();
        LOG.info("snapshot of {} for checkpoint {}, change range: {}..{}", new Object[]{this.subtaskName, checkpointId, this.lastUploadedFrom, this.lastUploadedTo});
        ChangelogSnapshotState changelogStateBackendStateCopy = this.changelogSnapshotState;
        this.materializationIdByCheckpointId.put(checkpointId, changelogStateBackendStateCopy.materializationID);
        return ChangelogKeyedStateBackend.toRunnableFuture(this.stateChangelogWriter.persist(this.lastUploadedFrom).thenApply(delta -> this.buildSnapshotResult((ChangelogStateHandle)delta, changelogStateBackendStateCopy)));
    }

    private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle delta, ChangelogSnapshotState changelogStateBackendStateCopy) {
        ArrayList<ChangelogStateHandle> prevDeltaCopy = new ArrayList<ChangelogStateHandle>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
        long persistedSizeOfThisCheckpoint = 0L;
        if (delta != null && delta.getStateSize() > 0L) {
            prevDeltaCopy.add(delta);
            persistedSizeOfThisCheckpoint += delta.getCheckpointedSize();
        }
        if (prevDeltaCopy.isEmpty() && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
            return SnapshotResult.empty();
        }
        return SnapshotResult.of((StateObject)new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(changelogStateBackendStateCopy.getMaterializedSnapshot(), prevDeltaCopy, this.getKeyGroupRange(), changelogStateBackendStateCopy.materializationID, persistedSizeOfThisCheckpoint));
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        ChangelogKeyGroupedPriorityQueue<Object> queue = this.priorityQueueStatesByName.get(stateName);
        if (queue == null) {
            this.lastCreatedStateId = (short)(this.lastCreatedStateId + 1);
            PriorityQueueStateChangeLoggerImpl priorityQueueStateChangeLogger = new PriorityQueueStateChangeLoggerImpl(byteOrderedElementSerializer, this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer), this.lastCreatedStateId);
            this.closer.register(priorityQueueStateChangeLogger);
            queue = new ChangelogKeyGroupedPriorityQueue(this.keyedStateBackend.create(stateName, byteOrderedElementSerializer), priorityQueueStateChangeLogger, byteOrderedElementSerializer);
            this.priorityQueueStatesByName.put(stateName, queue);
        }
        return queue;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isSafeToReuseKVState() {
        return this.keyedStateBackend.isSafeToReuseKVState();
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Long materializationID;
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.confirm(this.lastUploadedFrom, this.lastUploadedTo);
        }
        if ((materializationID = (Long)this.materializationIdByCheckpointId.remove(checkpointId)) != null && materializationID > this.lastConfirmedMaterializationId) {
            this.keyedStateBackend.notifyCheckpointComplete(materializationID.longValue());
            this.lastConfirmedMaterializationId = materializationID;
        }
        this.materializationIdByCheckpointId.headMap(checkpointId, true).clear();
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.reset(this.lastUploadedFrom, this.lastUploadedTo);
        }
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        Preconditions.checkNotNull(this.getKeySerializer(), (String)"State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, (KeyedStateBackend)this, (TtlTimeProvider)this.ttlTimeProvider)), stateDescriptor, (LatencyTrackingStateConfig)this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return (S)kvState;
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer(), snapshotTransformFactory);
        InternalKvState state = (InternalKvState)this.keyedStateBackend.createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
        this.lastCreatedStateId = (short)(this.lastCreatedStateId + 1);
        KvStateChangeLoggerImpl kvStateChangeLogger = new KvStateChangeLoggerImpl(state.getKeySerializer(), state.getNamespaceSerializer(), state.getValueSerializer(), this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, (RegisteredStateMetaInfoBase)meta, stateDesc.getTtlConfig(), stateDesc.getDefaultValue(), this.lastCreatedStateId);
        this.closer.register(kvStateChangeLogger);
        Object is = stateFactory.create(state, kvStateChangeLogger, this.keyedStateBackend);
        this.changelogStates.put(stateDesc.getName(), (ChangelogState)is);
        return is;
    }

    public void registerCloseable(@Nullable Closeable closeable) {
        this.closer.register(closeable);
    }

    private ChangelogSnapshotState completeRestore(Collection<ChangelogStateBackendHandle> stateHandles) {
        long materializationId = 0L;
        ArrayList<KeyedStateHandle> materialized = new ArrayList<KeyedStateHandle>();
        ArrayList<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<ChangelogStateHandle>();
        for (ChangelogStateBackendHandle h : stateHandles) {
            if (h == null) continue;
            materialized.addAll(h.getMaterializedStateHandles());
            restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
            materializationId = Math.max(materializationId, h.getMaterializationID());
        }
        this.materializedId = materializationId + 1L;
        return new ChangelogSnapshotState(materialized, restoredNonMaterialized, this.stateChangelogWriter.initialSequenceNumber(), materializationId);
    }

    public Optional<PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws Exception {
        SequenceNumber upTo = this.stateChangelogWriter.nextSequenceNumber();
        SequenceNumber lastMaterializedTo = this.changelogSnapshotState.lastMaterializedTo();
        LOG.info("Initialize Materialization. Current changelog writers last append to sequence number {}", (Object)upTo);
        if (upTo.compareTo((Object)lastMaterializedTo) > 0) {
            LOG.info("Starting materialization from {} : {}", (Object)lastMaterializedTo, (Object)upTo);
            long materializationID = this.materializedId++;
            PeriodicMaterializationManager.MaterializationRunnable materializationRunnable = new PeriodicMaterializationManager.MaterializationRunnable(this.keyedStateBackend.snapshot(materializationID, System.currentTimeMillis(), this.streamFactory, CHECKPOINT_OPTIONS), materializationID, upTo);
            for (ChangelogState changelogState : this.changelogStates.values()) {
                changelogState.resetWritingMetaFlag();
            }
            for (ChangelogKeyGroupedPriorityQueue changelogKeyGroupedPriorityQueue : this.priorityQueueStatesByName.values()) {
                changelogKeyGroupedPriorityQueue.resetWritingMetaFlag();
            }
            return Optional.of(materializationRunnable);
        }
        LOG.debug("Skip materialization, last materialized to {} : last log to {}", (Object)lastMaterializedTo, (Object)upTo);
        return Optional.empty();
    }

    public void updateChangelogSnapshotState(SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo) throws Exception {
        LOG.info("Task {} finishes materialization, updates the snapshotState upTo {} : {}", new Object[]{this.subtaskName, upTo, materializedSnapshot});
        this.changelogSnapshotState = new ChangelogSnapshotState(this.getMaterializedResult(materializedSnapshot), Collections.emptyList(), upTo, materializationID);
        this.stateChangelogWriter.truncate(upTo);
    }

    private List<KeyedStateHandle> getMaterializedResult(@Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
        KeyedStateHandle jobManagerOwned = (KeyedStateHandle)materializedSnapshot.getJobManagerOwnedSnapshot();
        return jobManagerOwned == null ? Collections.emptyList() : Collections.singletonList(jobManagerOwned);
    }

    public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
        return this.keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
    }

    public ChangelogState getExistingStateForRecovery(String name, StateMetaInfoSnapshot.BackendStateType type) throws NoSuchElementException, UnsupportedOperationException {
        ChangelogState state;
        switch (type) {
            case KEY_VALUE: {
                state = this.changelogStates.get(name);
                break;
            }
            case PRIORITY_QUEUE: {
                state = this.priorityQueueStatesByName.get(name);
                break;
            }
            default: {
                throw new UnsupportedOperationException(String.format("Unknown state type %s (%s)", type, name));
            }
        }
        if (state == null) {
            throw new NoSuchElementException(String.format("%s state %s not found", type, name));
        }
        return state;
    }

    private static <T> RunnableFuture<T> toRunnableFuture(final CompletableFuture<T> f) {
        return new RunnableFuture<T>(){

            @Override
            public void run() {
                f.join();
            }

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return f.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return f.isCancelled();
            }

            @Override
            public boolean isDone() {
                return f.isDone();
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                return f.get();
            }

            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return f.get(timeout, unit);
            }
        };
    }

    @VisibleForTesting
    StateChangelogWriter<? extends ChangelogStateHandle> getChangelogWriter() {
        return this.stateChangelogWriter;
    }

    private static class ChangelogSnapshotState {
        private final List<KeyedStateHandle> materializedSnapshot;
        private final SequenceNumber materializedTo;
        private final List<ChangelogStateHandle> restoredNonMaterialized;
        private final long materializationID;

        public ChangelogSnapshotState(List<KeyedStateHandle> materializedSnapshot, List<ChangelogStateHandle> restoredNonMaterialized, SequenceNumber materializedTo, long materializationID) {
            this.materializedSnapshot = Collections.unmodifiableList(materializedSnapshot);
            this.restoredNonMaterialized = Collections.unmodifiableList(restoredNonMaterialized);
            this.materializedTo = materializedTo;
            this.materializationID = materializationID;
        }

        public List<KeyedStateHandle> getMaterializedSnapshot() {
            return this.materializedSnapshot;
        }

        public SequenceNumber lastMaterializedTo() {
            return this.materializedTo;
        }

        public List<ChangelogStateHandle> getRestoredNonMaterialized() {
            return this.restoredNonMaterialized;
        }

        public long getMaterializationID() {
            return this.materializationID;
        }
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> var1, KvStateChangeLogger<SV, N> var2, InternalKeyContext<K> var3) throws Exception;
    }
}

