package org.apache.storm.localizer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.localizer.LocallyCachedTopologyBlob;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusLeaderNotFoundException;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/localizer/AsyncLocalizer.class */
public class AsyncLocalizer implements AutoCloseable {
    private static final Logger LOG;
    private static final CompletableFuture<Void> ALL_DONE_FUTURE;
    private static final int ATTEMPTS_INTERVAL_TIME = 100;
    private final Timer singleBlobLocalizationDuration;
    private final Timer blobCacheUpdateDuration;
    private final Timer blobLocalizationDuration;
    private final Meter numBlobUpdateVersionChanged;
    private final Meter localResourceFileNotFoundWhenReleasingSlot;
    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userFiles;
    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userArchives;
    private final boolean isLocalMode;
    private final ConcurrentHashMap<String, CompletableFuture<Void>> blobPending;
    private final Map<String, Object> conf;
    private final AdvancedFSOps fsOps;
    private final boolean symlinksDisabled;
    private final ConcurrentHashMap<String, LocallyCachedBlob> topologyBlobs;
    private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded;
    private final Path localBaseDir;
    private final int blobDownloadRetries;
    private final ScheduledExecutorService downloadExecService;
    private final ScheduledExecutorService taskExecService;
    private final long cacheCleanupPeriod;
    private final StormMetricsRegistry metricsRegistry;

    @VisibleForTesting
    protected long cacheTargetSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/localizer/AsyncLocalizer$ConsumePathAndId.class */
    public interface ConsumePathAndId {
        void accept(Path path, String str) throws IOException;
    }

    /* loaded from: input_file:org/apache/storm/localizer/AsyncLocalizer$DownloadBlobs.class */
    private class DownloadBlobs implements Supplier<Void> {
        private final PortAndAssignment pna;
        private final BlobChangingCallback cb;

        DownloadBlobs(PortAndAssignment portAndAssignment, BlobChangingCallback blobChangingCallback) {
            this.pna = portAndAssignment;
            this.cb = blobChangingCallback;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public Void get() {
            String str;
            try {
                String toplogyId = this.pna.getToplogyId();
                String owner = this.pna.getOwner();
                String supervisorStormDistRoot = ConfigUtils.supervisorStormDistRoot(AsyncLocalizer.this.conf, toplogyId);
                Map map = (Map) ConfigUtils.readSupervisorStormConf(AsyncLocalizer.this.conf, toplogyId).get("topology.blobstore.map");
                List<LocalResource> localResources = AsyncLocalizer.this.getLocalResources(this.pna);
                if (!localResources.isEmpty()) {
                    File localUserFileCacheDir = AsyncLocalizer.this.getLocalUserFileCacheDir(owner);
                    if (!AsyncLocalizer.this.fsOps.fileExists(localUserFileCacheDir)) {
                        AsyncLocalizer.this.fsOps.forceMkdir(localUserFileCacheDir);
                    }
                    List<LocalizedResource> blobs = AsyncLocalizer.this.getBlobs(localResources, this.pna, this.cb);
                    AsyncLocalizer.this.fsOps.setupBlobPermissions(localUserFileCacheDir, owner);
                    if (!AsyncLocalizer.this.symlinksDisabled) {
                        for (LocalizedResource localizedResource : blobs) {
                            String key = localizedResource.getKey();
                            File file = localizedResource.getCurrentSymlinkPath().toFile();
                            if (map != null) {
                                Map map2 = (Map) map.get(key);
                                str = (map2 == null || !map2.containsKey("localname")) ? key : (String) map2.get("localname");
                            } else {
                                str = key;
                            }
                            AsyncLocalizer.this.fsOps.createSymlink(new File(supervisorStormDistRoot, str), file);
                        }
                    }
                }
                this.pna.complete();
                return null;
            } catch (Exception e) {
                AsyncLocalizer.LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
                throw new RuntimeException(e);
            }
        }
    }

    @VisibleForTesting
    AsyncLocalizer(Map<String, Object> map, AdvancedFSOps advancedFSOps, String str, StormMetricsRegistry stormMetricsRegistry) throws IOException {
        this.userFiles = new ConcurrentHashMap<>();
        this.userArchives = new ConcurrentHashMap<>();
        this.topologyBlobs = new ConcurrentHashMap<>();
        this.topologyBasicDownloaded = new ConcurrentHashMap<>();
        this.conf = map;
        this.singleBlobLocalizationDuration = stormMetricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
        this.blobCacheUpdateDuration = stormMetricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
        this.blobLocalizationDuration = stormMetricsRegistry.registerTimer("supervisor:blob-localization-duration");
        this.numBlobUpdateVersionChanged = stormMetricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
        this.localResourceFileNotFoundWhenReleasingSlot = stormMetricsRegistry.registerMeter("supervisor:local-resource-file-not-found-when-releasing-slot");
        this.metricsRegistry = stormMetricsRegistry;
        this.isLocalMode = ConfigUtils.isLocalMode(map);
        this.fsOps = advancedFSOps;
        this.localBaseDir = Paths.get(str, new String[0]);
        this.cacheTargetSize = ObjectReader.getInt(map.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB), 10240).longValue() << 20;
        this.cacheCleanupPeriod = ObjectReader.getInt(map.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30000).longValue();
        this.blobDownloadRetries = ObjectReader.getInt(map.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3).intValue();
        this.downloadExecService = Executors.newScheduledThreadPool(ObjectReader.getInt(map.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5).intValue(), new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
        this.taskExecService = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
        reconstructLocalizedResources();
        this.symlinksDisabled = ((Boolean) map.getOrDefault("storm.disable.symlinks", false)).booleanValue();
        this.blobPending = new ConcurrentHashMap<>();
    }

    public AsyncLocalizer(Map<String, Object> map, StormMetricsRegistry stormMetricsRegistry) throws IOException {
        this(map, AdvancedFSOps.make(map), ConfigUtils.supervisorLocalDir(map), stormMetricsRegistry);
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoJar(String str, String str2) {
        return this.topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(str), str3 -> {
            try {
                return new LocallyCachedTopologyBlob(str, this.isLocalMode, this.conf, this.fsOps, LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR, str2, this.metricsRegistry);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoCode(String str, String str2) {
        return this.topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(str), str3 -> {
            try {
                return new LocallyCachedTopologyBlob(str, this.isLocalMode, this.conf, this.fsOps, LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE, str2, this.metricsRegistry);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoConf(String str, String str2) {
        return this.topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(str), str3 -> {
            try {
                return new LocallyCachedTopologyBlob(str, this.isLocalMode, this.conf, this.fsOps, LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF, str2, this.metricsRegistry);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private LocalizedResource getUserArchive(String str, String str2) {
        if ($assertionsDisabled || str != null) {
            return this.userArchives.computeIfAbsent(str, str3 -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(str2, str4 -> {
                return new LocalizedResource(str2, this.localBaseDir, true, this.fsOps, this.conf, str, this.metricsRegistry);
            });
        }
        throw new AssertionError("All user archives require a user present");
    }

    private LocalizedResource getUserFile(String str, String str2) {
        if ($assertionsDisabled || str != null) {
            return this.userFiles.computeIfAbsent(str, str3 -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(str2, str4 -> {
                return new LocalizedResource(str2, this.localBaseDir, false, this.fsOps, this.conf, str, this.metricsRegistry);
            });
        }
        throw new AssertionError("All user archives require a user present");
    }

    public CompletableFuture<Void> requestDownloadTopologyBlobs(LocalAssignment localAssignment, int i, BlobChangingCallback blobChangingCallback) throws IOException {
        TimePortAndAssignment timePortAndAssignment = new TimePortAndAssignment(new PortAndAssignmentImpl(i, localAssignment), this.blobLocalizationDuration);
        String toplogyId = timePortAndAssignment.getToplogyId();
        return requestDownloadBaseTopologyBlobs(timePortAndAssignment, blobChangingCallback).thenComposeAsync(r11 -> {
            return this.blobPending.compute(toplogyId, (str, completableFuture) -> {
                CompletableFuture completableFuture = completableFuture;
                if (completableFuture == null) {
                    completableFuture = CompletableFuture.supplyAsync(new DownloadBlobs(timePortAndAssignment, blobChangingCallback), this.taskExecService);
                } else {
                    try {
                        try {
                            addReferencesToBlobs(timePortAndAssignment, blobChangingCallback);
                            timePortAndAssignment.complete();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } catch (Throwable th) {
                        timePortAndAssignment.complete();
                        throw th;
                    }
                }
                LOG.debug("Reserved blobs {} {}", toplogyId, completableFuture);
                return completableFuture;
            });
        });
    }

    @VisibleForTesting
    CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment portAndAssignment, BlobChangingCallback blobChangingCallback) {
        String toplogyId = portAndAssignment.getToplogyId();
        LocallyCachedBlob topoJar = getTopoJar(toplogyId, portAndAssignment.getAssignment().get_owner());
        topoJar.addReference(portAndAssignment, blobChangingCallback);
        LocallyCachedBlob topoCode = getTopoCode(toplogyId, portAndAssignment.getAssignment().get_owner());
        topoCode.addReference(portAndAssignment, blobChangingCallback);
        LocallyCachedBlob topoConf = getTopoConf(toplogyId, portAndAssignment.getAssignment().get_owner());
        topoConf.addReference(portAndAssignment, blobChangingCallback);
        return this.topologyBasicDownloaded.computeIfAbsent(toplogyId, str -> {
            return downloadOrUpdate(topoJar, topoCode, topoConf);
        });
    }

    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob... locallyCachedBlobArr) {
        return downloadOrUpdate(Arrays.asList(locallyCachedBlobArr));
    }

    private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> collection) {
        CompletableFuture[] completableFutureArr = new CompletableFuture[collection.size()];
        int i = 0;
        for (LocallyCachedBlob locallyCachedBlob : collection) {
            completableFutureArr[i] = CompletableFuture.runAsync(() -> {
                LOG.debug("STARTING download of {}", locallyCachedBlob);
                ClientBlobStore clientBlobStore = getClientBlobStore();
                Throwable th = null;
                boolean z = false;
                long j = 0;
                while (!z) {
                    try {
                        try {
                            synchronized (locallyCachedBlob) {
                                if (!locallyCachedBlob.isUsed()) {
                                    LOG.debug("Skipping update of unused blob {}", locallyCachedBlob);
                                } else if (locallyCachedBlob.getLocalVersion() != locallyCachedBlob.getRemoteVersion(clientBlobStore) || !locallyCachedBlob.isFullyDownloaded()) {
                                    if (locallyCachedBlob.isFullyDownloaded()) {
                                        this.numBlobUpdateVersionChanged.mark();
                                    }
                                    Timer.Context time = this.singleBlobLocalizationDuration.time();
                                    try {
                                        locallyCachedBlob.informReferencesAndCommitNewVersion(locallyCachedBlob.fetchUnzipToTemp(clientBlobStore));
                                        time.stop();
                                        locallyCachedBlob.cleanupOrphanedData();
                                    } catch (Throwable th2) {
                                        locallyCachedBlob.cleanupOrphanedData();
                                        throw th2;
                                        break;
                                    }
                                }
                            }
                            z = true;
                        } finally {
                            if (clientBlobStore != null) {
                                if (0 != 0) {
                                    try {
                                        clientBlobStore.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    clientBlobStore.close();
                                }
                            }
                        }
                    } catch (Exception e) {
                        j++;
                        if (j > this.blobDownloadRetries) {
                            throw new RuntimeException("Could not download...", e);
                        }
                        LOG.warn("Failed to download blob {} will try again in {} ms", new Object[]{locallyCachedBlob, Integer.valueOf(ATTEMPTS_INTERVAL_TIME), e});
                        Utils.sleep(100L);
                    }
                }
                LOG.debug("FINISHED download of {}", locallyCachedBlob);
            }, this.downloadExecService);
            i++;
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    @VisibleForTesting
    void updateBlobs() {
        Timer.Context time = this.blobCacheUpdateDuration.time();
        Throwable th = null;
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(downloadOrUpdate(this.topologyBlobs.values()));
            if (this.symlinksDisabled) {
                LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
            } else {
                Iterator<ConcurrentHashMap<String, LocalizedResource>> it = this.userArchives.values().iterator();
                while (it.hasNext()) {
                    arrayList.add(downloadOrUpdate(it.next().values()));
                }
                Iterator<ConcurrentHashMap<String, LocalizedResource>> it2 = this.userFiles.values().iterator();
                while (it2.hasNext()) {
                    arrayList.add(downloadOrUpdate(it2.next().values()));
                }
            }
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                try {
                    ((CompletableFuture) it3.next()).get();
                } catch (Exception e) {
                    if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
                        LOG.error("Network error while updating blobs, will retry again later", e);
                    } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
                        LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
                    } else {
                        LOG.error("Could not update blob, will retry again later", e);
                    }
                }
            }
            if (time != null) {
                if (0 == 0) {
                    time.close();
                    return;
                }
                try {
                    time.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (time != null) {
                if (0 != 0) {
                    try {
                        time.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    time.close();
                }
            }
            throw th3;
        }
    }

    public void start() {
        this.taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30L, 30L, TimeUnit.SECONDS);
        LOG.debug("Scheduling cleanup every {} millis", Long.valueOf(this.cacheCleanupPeriod));
        this.taskExecService.scheduleAtFixedRate(this::cleanup, this.cacheCleanupPeriod, this.cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.downloadExecService.shutdown();
        this.taskExecService.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<LocalResource> getLocalResources(PortAndAssignment portAndAssignment) throws IOException {
        List<LocalResource> blobstoreMapToLocalresources;
        String toplogyId = portAndAssignment.getToplogyId();
        Map map = (Map) ConfigUtils.readSupervisorStormConf(this.conf, toplogyId).get("topology.blobstore.map");
        ArrayList arrayList = new ArrayList();
        if (map != null && (blobstoreMapToLocalresources = SupervisorUtils.blobstoreMapToLocalresources(map)) != null) {
            arrayList.addAll(blobstoreMapToLocalresources);
        }
        StormTopology readSupervisorTopology = ConfigUtils.readSupervisorTopology(this.conf, toplogyId, this.fsOps);
        ArrayList arrayList2 = new ArrayList();
        if (readSupervisorTopology.is_set_dependency_jars()) {
            arrayList2.addAll(readSupervisorTopology.get_dependency_jars());
        }
        if (readSupervisorTopology.is_set_dependency_artifacts()) {
            arrayList2.addAll(readSupervisorTopology.get_dependency_artifacts());
        }
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.add(new LocalResource((String) it.next(), false, true));
        }
        return arrayList;
    }

    @VisibleForTesting
    void addReferencesToBlobs(PortAndAssignment portAndAssignment, BlobChangingCallback blobChangingCallback) throws IOException, KeyNotFoundException, AuthorizationException {
        List<LocalResource> localResources = getLocalResources(portAndAssignment);
        if (localResources.isEmpty()) {
            return;
        }
        getBlobs(localResources, portAndAssignment, blobChangingCallback);
    }

    public void recoverRunningTopology(LocalAssignment localAssignment, int i, BlobChangingCallback blobChangingCallback) throws IOException {
        PortAndAssignmentImpl portAndAssignmentImpl = new PortAndAssignmentImpl(i, localAssignment);
        String toplogyId = portAndAssignmentImpl.getToplogyId();
        getTopoJar(toplogyId, portAndAssignmentImpl.getAssignment().get_owner()).addReference(portAndAssignmentImpl, blobChangingCallback);
        getTopoCode(toplogyId, portAndAssignmentImpl.getAssignment().get_owner()).addReference(portAndAssignmentImpl, blobChangingCallback);
        getTopoConf(toplogyId, portAndAssignmentImpl.getAssignment().get_owner()).addReference(portAndAssignmentImpl, blobChangingCallback);
        CompletableFuture<Void> computeIfAbsent = this.blobPending.computeIfAbsent(toplogyId, str -> {
            return ALL_DONE_FUTURE;
        });
        try {
            addReferencesToBlobs(portAndAssignmentImpl, blobChangingCallback);
        } catch (KeyNotFoundException | AuthorizationException e) {
            LOG.error("Could not recover all blob references for {}", portAndAssignmentImpl);
        }
        LOG.debug("Recovered blobs {} {}", toplogyId, computeIfAbsent);
    }

    public void releaseSlotFor(LocalAssignment localAssignment, int i) throws IOException {
        PortAndAssignmentImpl portAndAssignmentImpl = new PortAndAssignmentImpl(i, localAssignment);
        String str = localAssignment.get_topology_id();
        LOG.debug("Releasing slot for {} {}", str, Integer.valueOf(i));
        String masterStormJarKey = ConfigUtils.masterStormJarKey(str);
        String masterStormCodeKey = ConfigUtils.masterStormCodeKey(str);
        String masterStormConfKey = ConfigUtils.masterStormConfKey(str);
        LocallyCachedBlob locallyCachedBlob = this.topologyBlobs.get(masterStormJarKey);
        if (locallyCachedBlob != null) {
            locallyCachedBlob.removeReference(portAndAssignmentImpl);
        }
        LocallyCachedBlob locallyCachedBlob2 = this.topologyBlobs.get(masterStormCodeKey);
        if (locallyCachedBlob2 != null) {
            locallyCachedBlob2.removeReference(portAndAssignmentImpl);
        }
        LocallyCachedBlob locallyCachedBlob3 = this.topologyBlobs.get(masterStormConfKey);
        if (locallyCachedBlob3 != null) {
            locallyCachedBlob3.removeReference(portAndAssignmentImpl);
        }
        try {
            for (LocalResource localResource : getLocalResources(portAndAssignmentImpl)) {
                removeBlobReference(localResource.getBlobName(), portAndAssignmentImpl, localResource.shouldUncompress());
            }
        } catch (IOException e) {
            LOG.info("Port and assignment info: {}", portAndAssignmentImpl);
            if (!(e instanceof FileNotFoundException)) {
                LOG.error("Unable to read local file. ", e);
                throw e;
            }
            this.localResourceFileNotFoundWhenReleasingSlot.mark();
            LOG.warn("Local base blobs have not been downloaded yet. ", e);
        }
    }

    @VisibleForTesting
    File getLocalUserDir(String str) {
        return LocalizedResource.getLocalUserDir(this.localBaseDir, str).toFile();
    }

    @VisibleForTesting
    File getLocalUserFileCacheDir(String str) {
        return LocalizedResource.getLocalUserFileCacheDir(this.localBaseDir, str).toFile();
    }

    private void recoverLocalizedArchivesForUser(String str) throws IOException {
        Iterator<String> it = LocalizedResource.getLocalizedArchiveKeys(this.localBaseDir, str).iterator();
        while (it.hasNext()) {
            getUserArchive(str, it.next());
        }
    }

    private void recoverLocalizedFilesForUser(String str) throws IOException {
        Iterator<String> it = LocalizedResource.getLocalizedFileKeys(this.localBaseDir, str).iterator();
        while (it.hasNext()) {
            getUserFile(str, it.next());
        }
    }

    private void reconstructLocalizedResources() {
        try {
            LOG.info("Reconstruct localized resources");
            Collection<String> localizedUsers = LocalizedResource.getLocalizedUsers(this.localBaseDir);
            if (localizedUsers == null || localizedUsers.isEmpty()) {
                LOG.debug("No left over resources found for any user");
            } else {
                for (String str : localizedUsers) {
                    LOG.debug("reconstructing resources owned by {}", str);
                    recoverLocalizedFilesForUser(str);
                    recoverLocalizedArchivesForUser(str);
                }
            }
        } catch (Exception e) {
            LOG.error("ERROR reconstructing localized resources", e);
        }
    }

    void removeBlobReference(String str, PortAndAssignment portAndAssignment, boolean z) {
        String owner = portAndAssignment.getOwner();
        String toplogyId = portAndAssignment.getToplogyId();
        ConcurrentHashMap<String, LocalizedResource> concurrentHashMap = z ? this.userArchives.get(owner) : this.userFiles.get(owner);
        if (concurrentHashMap == null) {
            LOG.warn("trying to remove blob for non-existent resource set for user: " + owner + " key: " + str + " topo: " + toplogyId);
            return;
        }
        LocalizedResource localizedResource = concurrentHashMap.get(str);
        if (localizedResource == null) {
            LOG.warn("trying to remove non-existent blob, key: " + str + " for user: " + owner + " topo: " + toplogyId);
        } else {
            LOG.debug("removing blob reference to: {} for topo: {}", str, toplogyId);
            localizedResource.removeReference(portAndAssignment);
        }
    }

    protected ClientBlobStore getClientBlobStore() {
        return ServerUtils.getClientBlobStoreForSupervisor(this.conf);
    }

    List<LocalizedResource> getBlobs(List<LocalResource> list, PortAndAssignment portAndAssignment, BlobChangingCallback blobChangingCallback) throws AuthorizationException, KeyNotFoundException, IOException {
        if (((Boolean) this.conf.getOrDefault("storm.disable.symlinks", false)).booleanValue()) {
            throw new WrappedKeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
        }
        String owner = portAndAssignment.getOwner();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            for (LocalResource localResource : list) {
                String blobName = localResource.getBlobName();
                LocalizedResource userArchive = localResource.shouldUncompress() ? getUserArchive(owner, blobName) : getUserFile(owner, blobName);
                LOG.debug("fetching blob: {}", blobName);
                userArchive.addReference(portAndAssignment, localResource.needsCallback() ? blobChangingCallback : null);
                arrayList2.add(downloadOrUpdate(userArchive));
                arrayList.add(userArchive);
            }
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).get();
            }
            return arrayList;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted Exception", e);
        } catch (ExecutionException e2) {
            Utils.unwrapAndThrow(AuthorizationException.class, e2);
            Utils.unwrapAndThrow(KeyNotFoundException.class, e2);
            throw new IOException("Error getting blobs", e2);
        } catch (RejectedExecutionException e3) {
            throw new IOException("RejectedExecutionException: ", e3);
        }
    }

    private void forEachTopologyDistDir(ConsumePathAndId consumePathAndId) throws IOException {
        Path path = Paths.get(ConfigUtils.supervisorStormDistRoot(this.conf), new String[0]);
        if (Files.exists(path, new LinkOption[0]) && Files.isDirectory(path, new LinkOption[0])) {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path);
            Throwable th = null;
            try {
                for (Path path2 : newDirectoryStream) {
                    if (Files.isDirectory(path2, new LinkOption[0])) {
                        consumePathAndId.accept(path2, path2.getFileName().toString());
                    }
                }
                if (newDirectoryStream != null) {
                    if (0 == 0) {
                        newDirectoryStream.close();
                        return;
                    }
                    try {
                        newDirectoryStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (newDirectoryStream != null) {
                    if (0 != 0) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                throw th3;
            }
        }
    }

    @VisibleForTesting
    void cleanup() {
        try {
            LocalizedResourceRetentionSet localizedResourceRetentionSet = new LocalizedResourceRetentionSet(this.cacheTargetSize);
            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> entry : this.userArchives.entrySet()) {
                localizedResourceRetentionSet.addResources(entry.getValue());
                LOG.debug("Resources to be cleaned after adding {} archives : {}", entry.getKey(), localizedResourceRetentionSet);
            }
            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> entry2 : this.userFiles.entrySet()) {
                localizedResourceRetentionSet.addResources(entry2.getValue());
                LOG.debug("Resources to be cleaned after adding {} files : {}", entry2.getKey(), localizedResourceRetentionSet);
            }
            localizedResourceRetentionSet.addResources(this.topologyBlobs);
            ClientBlobStore clientBlobStore = getClientBlobStore();
            Throwable th = null;
            try {
                try {
                    localizedResourceRetentionSet.cleanup(clientBlobStore);
                    if (clientBlobStore != null) {
                        if (0 != 0) {
                            try {
                                clientBlobStore.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            clientBlobStore.close();
                        }
                    }
                    HashSet hashSet = new HashSet();
                    Iterator it = this.topologyBlobs.keySet().iterator();
                    while (it.hasNext()) {
                        hashSet.add(ConfigUtils.getIdFromBlobKey((String) it.next()));
                    }
                    this.topologyBasicDownloaded.keySet().removeIf(str -> {
                        return !hashSet.contains(str);
                    });
                    this.blobPending.keySet().removeIf(str2 -> {
                        return !hashSet.contains(str2);
                    });
                    try {
                        forEachTopologyDistDir((path, str3) -> {
                            if (hashSet.contains(str3)) {
                                return;
                            }
                            this.fsOps.deleteIfExists(path.toFile());
                        });
                    } catch (Exception e) {
                        LOG.error("Could not read topology directories for cleanup", e);
                    }
                    LOG.debug("Resource cleanup: {}", localizedResourceRetentionSet);
                    HashSet<String> hashSet2 = new HashSet(this.userArchives.keySet());
                    hashSet2.addAll(this.userFiles.keySet());
                    for (String str4 : hashSet2) {
                        ConcurrentHashMap<String, LocalizedResource> concurrentHashMap = this.userFiles.get(str4);
                        ConcurrentHashMap<String, LocalizedResource> concurrentHashMap2 = this.userArchives.get(str4);
                        if ((concurrentHashMap == null || concurrentHashMap.size() == 0) && (concurrentHashMap2 == null || concurrentHashMap2.size() == 0)) {
                            LOG.debug("removing empty set: {}", str4);
                            try {
                                LocalizedResource.completelyRemoveUnusedUser(this.localBaseDir, str4);
                                this.userFiles.remove(str4);
                                this.userArchives.remove(str4);
                            } catch (IOException e2) {
                                LOG.error("Error trying to delete cached user files", e2);
                            }
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (clientBlobStore != null) {
                    if (th != null) {
                        try {
                            clientBlobStore.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        clientBlobStore.close();
                    }
                }
                throw th4;
            }
        } catch (Error e3) {
            LOG.error("AsyncLocalizer cleanup failure", e3);
            Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
        } catch (Exception e4) {
            LOG.error("AsyncLocalizer cleanup failure", e4);
        }
    }

    static {
        $assertionsDisabled = !AsyncLocalizer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
        ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
    }
}
