package org.apache.storm.windowing.persistence;

import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.windowing.Event;
import org.apache.storm.windowing.persistence.WindowPartitionCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/windowing/persistence/WindowState.class */
public class WindowState<T> extends AbstractCollection<Event<T>> {
    public static final int MAX_PARTITION_EVENTS = 1000;
    public static final int MIN_PARTITIONS = 10;
    private static final Logger LOG = LoggerFactory.getLogger(WindowState.class);
    private static final String PARTITION_IDS_KEY = "pk";
    private final KeyValueState<String, Deque<Long>> partitionIdsState;
    private final KeyValueState<Long, WindowPartition<T>> windowPartitionsState;
    private final KeyValueState<String, Optional<?>> windowSystemState;
    private final long maxEventsInMemory;
    private volatile Deque<Long> partitionIds;
    private volatile long latestPartitionId;
    private volatile WindowPartition<T> latestPartition;
    private volatile WindowPartitionCache<Long, WindowPartition<T>> cache;
    private Supplier<Map<String, Optional<?>>> windowSystemStateSupplier;
    private final ReentrantLock partitionIdsLock = new ReentrantLock(true);
    private final WindowPartitionLock windowPartitionsLock = new WindowPartitionLock();
    private Set<Long> iteratorPins = new HashSet();

    /* loaded from: input_file:org/apache/storm/windowing/persistence/WindowState$WindowPartition.class */
    public static class WindowPartition<T> implements Iterable<Event<T>> {
        private final ConcurrentLinkedQueue<Event<T>> events = new ConcurrentLinkedQueue<>();
        private final AtomicInteger size = new AtomicInteger();
        private final long id;
        private volatile transient boolean modified;

        public WindowPartition(long j) {
            this.id = j;
        }

        void add(Event<T> event) {
            this.events.add(event);
            this.size.incrementAndGet();
            setModified();
        }

        boolean isModified() {
            return this.modified;
        }

        void setModified() {
            if (this.modified) {
                return;
            }
            this.modified = true;
        }

        void clearModified() {
            this.modified = false;
        }

        boolean isEmpty() {
            return this.events.isEmpty();
        }

        @Override // java.lang.Iterable
        public Iterator<Event<T>> iterator() {
            return new Iterator<Event<T>>() { // from class: org.apache.storm.windowing.persistence.WindowState.WindowPartition.1
                Iterator<Event<T>> it;

                {
                    this.it = WindowPartition.this.events.iterator();
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.it.hasNext();
                }

                @Override // java.util.Iterator
                public Event<T> next() {
                    return this.it.next();
                }

                @Override // java.util.Iterator
                public void remove() {
                    this.it.remove();
                    WindowPartition.this.size.decrementAndGet();
                    WindowPartition.this.setModified();
                }
            };
        }

        public int size() {
            return this.size.get();
        }

        public long getId() {
            return this.id;
        }

        public Collection<Event<T>> getEvents() {
            return Collections.unmodifiableCollection(this.events);
        }

        public String toString() {
            return "WindowPartition{id=" + this.id + ", size=" + this.size + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/windowing/persistence/WindowState$WindowPartitionLock.class */
    public static class WindowPartitionLock {
        private final int numLocks = 8;
        private final ImmutableMap<Long, ReentrantLock> locks;

        WindowPartitionLock() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= 8) {
                    this.locks = builder.build();
                    return;
                } else {
                    builder.put(Long.valueOf(j2), new ReentrantLock(true));
                    j = j2 + 1;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void lock(long j) {
            ((ReentrantLock) this.locks.get(Long.valueOf(j % 8))).lock();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unlock(long j) {
            ((ReentrantLock) this.locks.get(Long.valueOf(j % 8))).unlock();
        }
    }

    public WindowState(KeyValueState<Long, WindowPartition<T>> keyValueState, KeyValueState<String, Deque<Long>> keyValueState2, KeyValueState<String, Optional<?>> keyValueState3, Supplier<Map<String, Optional<?>>> supplier, long j) {
        this.windowPartitionsState = keyValueState;
        this.partitionIdsState = keyValueState2;
        this.windowSystemState = keyValueState3;
        this.windowSystemStateSupplier = supplier;
        this.maxEventsInMemory = Math.max(10000L, j);
        init();
    }

    @Override // java.util.AbstractCollection, java.util.Collection
    public boolean add(Event<T> event) {
        if (this.latestPartition.size() >= 1000) {
            this.cache.unpin(Long.valueOf(this.latestPartition.getId()));
            this.latestPartition = getPinnedPartition(getNextPartitionId());
        }
        this.latestPartition.add(event);
        return true;
    }

    @Override // java.util.AbstractCollection, java.util.Collection, java.lang.Iterable
    public Iterator<Event<T>> iterator() {
        return new Iterator<Event<T>>() { // from class: org.apache.storm.windowing.persistence.WindowState.1
            private Iterator<Long> ids = getIds();
            private Iterator<Event<T>> current = Collections.emptyIterator();
            private Iterator<Event<T>> removeFrom;
            private WindowPartition<T> curPartition;

            private Iterator<Long> getIds() {
                try {
                    WindowState.this.partitionIdsLock.lock();
                    WindowState.LOG.debug("Iterator partitionIds: {}", WindowState.this.partitionIds);
                    return new ArrayList(WindowState.this.partitionIds).iterator();
                } finally {
                    WindowState.this.partitionIdsLock.unlock();
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                if (this.removeFrom == null) {
                    throw new IllegalStateException("No calls to next() since last call to remove()");
                }
                this.removeFrom.remove();
                this.removeFrom = null;
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                boolean hasNext = this.current.hasNext();
                while (!hasNext && this.ids.hasNext()) {
                    if (this.curPartition != null) {
                        unpin(this.curPartition.getId());
                    }
                    this.curPartition = WindowState.this.getPinnedPartition(this.ids.next().longValue());
                    if (this.curPartition != null) {
                        WindowState.this.iteratorPins.add(Long.valueOf(this.curPartition.getId()));
                        this.current = this.curPartition.iterator();
                        hasNext = this.current.hasNext();
                    }
                }
                if (!hasNext && this.curPartition != null) {
                    unpin(this.curPartition.getId());
                    this.curPartition = null;
                }
                return hasNext;
            }

            @Override // java.util.Iterator
            public Event<T> next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                this.removeFrom = this.current;
                return this.current.next();
            }

            private void unpin(long j) {
                WindowState.this.cache.unpin(Long.valueOf(j));
                WindowState.this.iteratorPins.remove(Long.valueOf(j));
            }
        };
    }

    public void clearIteratorPins() {
        LOG.debug("clearIteratorPins '{}'", this.iteratorPins);
        Iterator<Long> it = this.iteratorPins.iterator();
        while (it.hasNext()) {
            this.cache.unpin(it.next());
            it.remove();
        }
    }

    @Override // java.util.AbstractCollection, java.util.Collection
    public int size() {
        throw new UnsupportedOperationException();
    }

    public void prepareCommit(long j) {
        flush();
        this.partitionIdsState.prepareCommit(j);
        this.windowPartitionsState.prepareCommit(j);
        this.windowSystemState.prepareCommit(j);
    }

    public void commit(long j) {
        this.partitionIdsState.commit(j);
        this.windowPartitionsState.commit(j);
        this.windowSystemState.commit(j);
    }

    public void rollback(boolean z) {
        this.partitionIdsState.rollback();
        this.windowPartitionsState.rollback();
        this.windowSystemState.rollback();
        if (z) {
            init();
        }
    }

    private void init() {
        initCache();
        initPartitions();
    }

    private void initPartitions() {
        this.partitionIds = this.partitionIdsState.get(PARTITION_IDS_KEY, new LinkedList());
        if (this.partitionIds.isEmpty()) {
            this.partitionIds.add(0L);
            this.partitionIdsState.put(PARTITION_IDS_KEY, this.partitionIds);
        }
        this.latestPartitionId = this.partitionIds.peekLast().longValue();
        this.latestPartition = this.cache.pinAndGet(Long.valueOf(this.latestPartitionId));
    }

    private void initCache() {
        long j = this.maxEventsInMemory / 1000;
        LOG.info("maxEventsInMemory: {}, partition size: {}, number of partitions: {}", new Object[]{Long.valueOf(this.maxEventsInMemory), Integer.valueOf(MAX_PARTITION_EVENTS), Long.valueOf(j)});
        this.cache = SimpleWindowPartitionCache.newBuilder().maximumSize(j).removalListener((WindowPartitionCache.RemovalListener) new WindowPartitionCache.RemovalListener<Long, WindowPartition<T>>() { // from class: org.apache.storm.windowing.persistence.WindowState.3
            @Override // org.apache.storm.windowing.persistence.WindowPartitionCache.RemovalListener
            public void onRemoval(Long l, WindowPartition<T> windowPartition, WindowPartitionCache.RemovalCause removalCause) {
                Objects.requireNonNull(l, "Null partition id");
                Objects.requireNonNull(windowPartition, "Null window partition");
                WindowState.LOG.debug("onRemoval for id '{}', WindowPartition '{}'", l, windowPartition);
                try {
                    WindowState.this.windowPartitionsLock.lock(l.longValue());
                    if (!windowPartition.isEmpty() || l.longValue() == WindowState.this.latestPartitionId) {
                        if (windowPartition.isModified()) {
                            WindowState.this.windowPartitionsState.put(l, windowPartition);
                        } else {
                            WindowState.LOG.debug("WindowPartition '{}' is not modified", l);
                        }
                    } else if (removalCause != WindowPartitionCache.RemovalCause.EXPLICIT) {
                        WindowState.this.deletePartition(l.longValue());
                        WindowState.this.windowPartitionsState.delete(l);
                    }
                } finally {
                    WindowState.this.windowPartitionsLock.unlock(l.longValue());
                }
            }
        }).build((WindowPartitionCache.CacheLoader) new WindowPartitionCache.CacheLoader<Long, WindowPartition<T>>() { // from class: org.apache.storm.windowing.persistence.WindowState.2
            @Override // org.apache.storm.windowing.persistence.WindowPartitionCache.CacheLoader
            public WindowPartition<T> load(Long l) {
                WindowState.LOG.debug("Load partition: {}", l);
                try {
                    WindowState.this.windowPartitionsLock.lock(l.longValue());
                    return (WindowPartition) WindowState.this.windowPartitionsState.get(l, new WindowPartition(l.longValue()));
                } finally {
                    WindowState.this.windowPartitionsLock.unlock(l.longValue());
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deletePartition(long j) {
        LOG.debug("Delete partition: {}", Long.valueOf(j));
        try {
            this.partitionIdsLock.lock();
            this.partitionIds.remove(Long.valueOf(j));
            this.partitionIdsState.put(PARTITION_IDS_KEY, this.partitionIds);
        } finally {
            this.partitionIdsLock.unlock();
        }
    }

    private long getNextPartitionId() {
        try {
            this.partitionIdsLock.lock();
            Deque<Long> deque = this.partitionIds;
            long j = this.latestPartitionId + 1;
            this.latestPartitionId = j;
            deque.add(Long.valueOf(j));
            this.partitionIdsState.put(PARTITION_IDS_KEY, this.partitionIds);
            return this.latestPartitionId;
        } finally {
            this.partitionIdsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WindowPartition<T> getPinnedPartition(long j) {
        return this.cache.pinAndGet(Long.valueOf(j));
    }

    private void flush() {
        LOG.debug("Flushing modified partitions");
        this.cache.asMap().forEach((l, windowPartition) -> {
            Long l = null;
            try {
                this.windowPartitionsLock.lock(l.longValue());
                if (windowPartition.isEmpty() && l.longValue() != this.latestPartitionId) {
                    LOG.debug("Invalidating empty partition {}", l);
                    deletePartition(l.longValue());
                    this.windowPartitionsState.delete(l);
                    l = l;
                } else if (windowPartition.isModified()) {
                    LOG.debug("Updating modified partition {}", l);
                    windowPartition.clearModified();
                    this.windowPartitionsState.put(l, windowPartition);
                }
                if (l != null) {
                    this.cache.invalidate(l);
                }
            } finally {
                this.windowPartitionsLock.unlock(l.longValue());
            }
        });
        for (Map.Entry<String, Optional<?>> entry : this.windowSystemStateSupplier.get().entrySet()) {
            this.windowSystemState.put(entry.getKey(), entry.getValue());
        }
    }
}
