/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionLifetimeListener;
import org.neo4j.bolt.runtime.BoltConnectionQueueMonitor;
import org.neo4j.bolt.runtime.BoltScheduler;
import org.neo4j.bolt.runtime.ExecutorFactory;
import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.util.concurrent.Futures;

public class ExecutorBoltScheduler
implements BoltScheduler,
BoltConnectionLifetimeListener,
BoltConnectionQueueMonitor {
    private final String connector;
    private final ExecutorFactory executorFactory;
    private final JobScheduler scheduler;
    private final Log log;
    private final ConcurrentHashMap<String, BoltConnection> activeConnections = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, CompletableFuture<Boolean>> activeWorkItems = new ConcurrentHashMap();
    private final int corePoolSize;
    private final int maxPoolSize;
    private final Duration keepAlive;
    private final int queueSize;
    private final ExecutorService forkJoinPool;
    private ExecutorService threadPool;

    public ExecutorBoltScheduler(String connector, ExecutorFactory executorFactory, JobScheduler scheduler, LogService logService, int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ExecutorService forkJoinPool) {
        this.connector = connector;
        this.executorFactory = executorFactory;
        this.scheduler = scheduler;
        this.log = logService.getInternalLog(this.getClass());
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.keepAlive = keepAlive;
        this.queueSize = queueSize;
        this.forkJoinPool = forkJoinPool;
    }

    boolean isRegistered(BoltConnection connection) {
        return this.activeConnections.containsKey(connection.id());
    }

    boolean isActive(BoltConnection connection) {
        return this.activeWorkItems.containsKey(connection.id());
    }

    @Override
    public String connector() {
        return this.connector;
    }

    @Override
    public void start() {
        this.threadPool = this.executorFactory.create(this.corePoolSize, this.maxPoolSize, this.keepAlive, this.queueSize, true, new NameAppendingThreadFactory(this.connector, this.scheduler.threadFactory(Group.BOLT_WORKER)));
    }

    @Override
    public void stop() {
        if (this.threadPool != null) {
            this.activeConnections.values().forEach(this::stopConnection);
            this.threadPool.shutdown();
        }
    }

    @Override
    public void created(BoltConnection connection) {
        BoltConnection previous = this.activeConnections.put(connection.id(), connection);
        assert (previous == null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closed(BoltConnection connection) {
        String id = connection.id();
        try {
            CompletableFuture<Boolean> currentFuture = this.activeWorkItems.remove(id);
            if (currentFuture != null) {
                currentFuture.cancel(false);
            }
        }
        finally {
            this.activeConnections.remove(id);
        }
    }

    @Override
    public void enqueued(BoltConnection to, Job job) {
        this.handleSubmission(to);
    }

    @Override
    public void drained(BoltConnection from, Collection<Job> batch) {
    }

    private void handleSubmission(BoltConnection connection) {
        this.activeWorkItems.computeIfAbsent(connection.id(), key -> this.scheduleBatchOrHandleError(connection).whenCompleteAsync((result, error) -> this.handleCompletion(connection, (Boolean)result, (Throwable)error), (Executor)this.forkJoinPool));
    }

    private CompletableFuture<Boolean> scheduleBatchOrHandleError(BoltConnection connection) {
        try {
            return CompletableFuture.supplyAsync(() -> this.executeBatch(connection), this.threadPool);
        }
        catch (RejectedExecutionException ex) {
            return Futures.failedFuture((Throwable)ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean executeBatch(BoltConnection connection) {
        Thread currentThread = Thread.currentThread();
        String originalName = currentThread.getName();
        String newName = String.format("%s [%s] ", originalName, connection.remoteAddress());
        currentThread.setName(newName);
        try {
            boolean bl = connection.processNextBatch();
            return bl;
        }
        finally {
            currentThread.setName(originalName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleCompletion(BoltConnection connection, Boolean shouldContinueScheduling, Throwable error) {
        try {
            if (error != null && ExceptionUtils.hasCause((Throwable)error, RejectedExecutionException.class)) {
                connection.handleSchedulingError(error);
                return;
            }
        }
        finally {
            this.activeWorkItems.remove(connection.id());
        }
        if (error != null) {
            this.log.error(String.format("Unexpected error during job scheduling for session '%s'.", connection.id()), error);
            this.stopConnection(connection);
        } else if (shouldContinueScheduling.booleanValue() && connection.hasPendingJobs()) {
            this.handleSubmission(connection);
        }
    }

    private void stopConnection(BoltConnection connection) {
        try {
            connection.stop();
        }
        catch (Throwable t) {
            this.log.warn(String.format("An unexpected error occurred while stopping BoltConnection [%s]", connection.id()), t);
        }
    }

    private static class NameAppendingThreadFactory
    implements ThreadFactory {
        private final String nameToAppend;
        private final ThreadFactory factory;

        private NameAppendingThreadFactory(String nameToAppend, ThreadFactory factory) {
            this.nameToAppend = nameToAppend;
            this.factory = factory;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread newThread = this.factory.newThread(r);
            newThread.setName(String.format("%s [%s]", newThread.getName(), this.nameToAppend));
            return newThread;
        }
    }
}

