/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.springframework.amqp.AmqpApplicationContextClosedException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.AsyncConsumerStartedEvent;
import org.springframework.amqp.rabbit.listener.ConsumeOkEvent;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.ApplicationEvent;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOffExecution;

public class DirectMessageListenerContainer
extends AbstractMessageListenerContainer {
    private static final int START_WAIT_TIME = 60;
    private static final int DEFAULT_MONITOR_INTERVAL = 10000;
    private static final int DEFAULT_ACK_TIMEOUT = 20000;
    protected final List<SimpleConsumer> consumers = new LinkedList<SimpleConsumer>();
    private final Set<SimpleConsumer> consumersToRestart = new LinkedHashSet<SimpleConsumer>();
    private final Set<String> removedQueues = ConcurrentHashMap.newKeySet();
    private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap();
    private final ActiveObjectCounter<SimpleConsumer> cancellationLock = new ActiveObjectCounter();
    private TaskScheduler taskScheduler;
    private boolean taskSchedulerSet;
    private long monitorInterval = 10000L;
    private int messagesPerAck;
    private long ackTimeout = 20000L;
    private volatile boolean started;
    private volatile boolean aborted;
    private volatile boolean hasStopped;
    private volatile CountDownLatch startedLatch = new CountDownLatch(1);
    private volatile int consumersPerQueue = 1;
    private volatile ScheduledFuture<?> consumerMonitorTask;
    private volatile long lastAlertAt;
    private volatile long lastRestartAttempt;

    public DirectMessageListenerContainer() {
        this.setMissingQueuesFatal(false);
        this.doSetPossibleAuthenticationFailureFatal(false);
    }

    public DirectMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.setConnectionFactory(connectionFactory);
        this.setMissingQueuesFatal(false);
        this.doSetPossibleAuthenticationFailureFatal(false);
    }

    public void setConsumersPerQueue(int consumersPerQueue) {
        if (this.isRunning()) {
            this.adjustConsumers(consumersPerQueue);
        }
        this.consumersPerQueue = consumersPerQueue;
    }

    @Override
    public final void setExclusive(boolean exclusive) {
        Assert.isTrue((!exclusive || this.consumersPerQueue == 1 ? 1 : 0) != 0, (String)"When the consumer is exclusive, the consumers per queue must be 1");
        super.setExclusive(exclusive);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.taskSchedulerSet = true;
    }

    public void setMonitorInterval(long monitorInterval) {
        this.monitorInterval = monitorInterval;
    }

    @Override
    public void setQueueNames(String ... queueName) {
        Assert.state((!this.isRunning() ? 1 : 0) != 0, (String)"Cannot set queue names while running, use add/remove");
        super.setQueueNames(queueName);
    }

    @Override
    public final void setMissingQueuesFatal(boolean missingQueuesFatal) {
        super.setMissingQueuesFatal(missingQueuesFatal);
    }

    public void setMessagesPerAck(int messagesPerAck) {
        this.messagesPerAck = messagesPerAck;
    }

    public void setAckTimeout(long ackTimeout) {
        this.ackTimeout = ackTimeout;
    }

    @Override
    public void addQueueNames(String ... queueNames) {
        Assert.notNull((Object)queueNames, (String)"'queueNames' cannot be null");
        Assert.noNullElements((Object[])queueNames, (String)"'queueNames' cannot contain null elements");
        try {
            Arrays.stream(queueNames).forEach(this.removedQueues::remove);
            this.addQueues(Arrays.stream(queueNames));
        }
        catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(queueNames), (Throwable)e);
        }
        super.addQueueNames(queueNames);
    }

    @Override
    public void addQueues(Queue ... queues) {
        Assert.notNull((Object)queues, (String)"'queues' cannot be null");
        Assert.noNullElements((Object[])queues, (String)"'queues' cannot contain null elements");
        try {
            Arrays.stream(queues).map(q -> q.getActualName()).forEach(this.removedQueues::remove);
            this.addQueues(Arrays.stream(queues).map(Queue::getName));
        }
        catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(queues), (Throwable)e);
        }
        super.addQueues(queues);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addQueues(Stream<String> queueNameStream) {
        if (this.isRunning()) {
            Object object = this.consumersMonitor;
            synchronized (object) {
                this.checkStartState();
                Set<String> current = this.getQueueNamesAsSet();
                queueNameStream.forEach(queue -> {
                    if (current.contains(queue)) {
                        this.logger.warn((Object)("Queue " + queue + " is already configured for this container: " + this + ", ignoring add"));
                    } else {
                        this.consumeFromQueue((String)queue);
                    }
                });
            }
        }
    }

    @Override
    public boolean removeQueueNames(String ... queueNames) {
        this.removeQueues(Arrays.stream(queueNames));
        return super.removeQueueNames(queueNames);
    }

    @Override
    public boolean removeQueues(Queue ... queues) {
        this.removeQueues(Arrays.stream(queues).map(Queue::getActualName));
        return super.removeQueues(queues);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeQueues(Stream<String> queueNames) {
        if (this.isRunning()) {
            Object object = this.consumersMonitor;
            synchronized (object) {
                this.checkStartState();
                queueNames.map(queue -> {
                    this.removedQueues.add((String)queue);
                    return (List)this.consumersByQueue.remove(queue);
                }).filter(Objects::nonNull).flatMap(Collection::stream).forEach(this::cancelConsumer);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void adjustConsumers(int newCount) {
        Object object = this.consumersMonitor;
        synchronized (object) {
            this.checkStartState();
            this.consumersToRestart.clear();
            for (String queue : this.getQueueNames()) {
                while (this.consumersByQueue.get((Object)queue) == null || ((List)this.consumersByQueue.get((Object)queue)).size() < newCount) {
                    List cBQ = (List)this.consumersByQueue.get((Object)queue);
                    int index = 0;
                    if (cBQ != null) {
                        List indices = cBQ.stream().map(cons -> cons.getIndex()).sorted().collect(Collectors.toList());
                        for (index = 0; index < indices.size() && index >= (Integer)indices.get(index); ++index) {
                        }
                    }
                    this.doConsumeFromQueue(queue, index);
                }
                this.reduceConsumersIfIdle(newCount, queue);
            }
        }
    }

    private void reduceConsumersIfIdle(int newCount, String queue) {
        List consumerList = (List)this.consumersByQueue.get((Object)queue);
        if (consumerList != null && consumerList.size() > newCount) {
            int delta = consumerList.size() - newCount;
            for (int i = 0; i < delta; ++i) {
                SimpleConsumer consumer;
                int index = this.findIdleConsumer();
                if (index < 0 || (consumer = (SimpleConsumer)((Object)consumerList.remove(index))) == null) continue;
                this.cancelConsumer(consumer);
            }
        }
    }

    protected int findIdleConsumer() {
        return 0;
    }

    private void checkStartState() {
        if (!this.isRunning()) {
            try {
                Assert.state((boolean)this.startedLatch.await(60L, TimeUnit.SECONDS), (String)"Container is not started - cannot adjust queues");
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Interrupted waiting for start", (Throwable)e);
            }
        }
    }

    @Override
    protected void doInitialize() {
        if (this.taskScheduler == null) {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setThreadNamePrefix(this.getBeanName() + "-consumerMonitor-");
            threadPoolTaskScheduler.afterPropertiesSet();
            this.taskScheduler = threadPoolTaskScheduler;
        }
        if (this.messagesPerAck > 0) {
            Assert.state((!this.isChannelTransacted() ? 1 : 0) != 0, (String)"'messagesPerAck' is not allowed with transactions");
        }
    }

    @Override
    protected void doStart() {
        if (!this.started) {
            this.actualStart();
        }
    }

    @Override
    protected void doStop() {
        super.doStop();
        if (!this.taskSchedulerSet && this.taskScheduler != null) {
            ((ThreadPoolTaskScheduler)this.taskScheduler).shutdown();
            this.taskScheduler = null;
        }
    }

    protected void actualStart() {
        this.aborted = false;
        this.hasStopped = false;
        if (this.getPrefetchCount() < this.messagesPerAck) {
            this.setPrefetchCount(this.messagesPerAck);
        }
        super.doStart();
        String[] queueNames = this.getQueueNames();
        this.checkMissingQueues(queueNames);
        this.checkConnect();
        long idleEventInterval = this.getIdleEventInterval();
        if (this.taskScheduler == null) {
            this.afterPropertiesSet();
        }
        if (idleEventInterval > 0L && this.monitorInterval > idleEventInterval) {
            this.monitorInterval = idleEventInterval / 2L;
        }
        if (this.getFailedDeclarationRetryInterval() < this.monitorInterval) {
            this.monitorInterval = this.getFailedDeclarationRetryInterval();
        }
        Map<String, Queue> namesToQueues = this.getQueueNamesToQueues();
        this.lastRestartAttempt = System.currentTimeMillis();
        this.startMonitor(idleEventInterval, namesToQueues);
        if (queueNames.length > 0) {
            this.doRedeclareElementsIfNecessary();
            this.getTaskExecutor().execute(() -> this.startConsumers(queueNames));
        } else {
            this.started = true;
            this.startedLatch.countDown();
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Container initialized for queues: " + Arrays.asList(queueNames)));
        }
    }

    protected void checkConnect() {
        if (this.isPossibleAuthenticationFailureFatal()) {
            try (Connection connection = null;){
                this.getConnectionFactory().createConnection();
            }
        }
    }

    private void startMonitor(long idleEventInterval, Map<String, Queue> namesToQueues) {
        this.consumerMonitorTask = this.taskScheduler.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            this.checkIdle(idleEventInterval, now);
            this.checkConsumers(now);
            if (this.lastRestartAttempt + this.getFailedDeclarationRetryInterval() < now) {
                Object object = this.consumersMonitor;
                synchronized (object) {
                    if (this.started) {
                        ArrayList<SimpleConsumer> restartableConsumers = new ArrayList<SimpleConsumer>(this.consumersToRestart);
                        this.consumersToRestart.clear();
                        if (restartableConsumers.size() > 0) {
                            this.doRedeclareElementsIfNecessary();
                        }
                        Iterator iterator = restartableConsumers.iterator();
                        while (iterator.hasNext()) {
                            SimpleConsumer consumer = (SimpleConsumer)((Object)((Object)iterator.next()));
                            iterator.remove();
                            if (this.removedQueues.contains(consumer.getQueue())) {
                                if (!this.logger.isDebugEnabled()) continue;
                                this.logger.debug((Object)("Skipping restart of consumer, queue removed " + (Object)((Object)consumer)));
                                continue;
                            }
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug((Object)("Attempting to restart consumer " + (Object)((Object)consumer)));
                            }
                            if (this.restartConsumer(namesToQueues, restartableConsumers, consumer)) continue;
                            break;
                        }
                        this.lastRestartAttempt = now;
                    }
                }
            }
            this.processMonitorTask();
        }, this.monitorInterval);
    }

    private void checkIdle(long idleEventInterval, long now) {
        if (idleEventInterval > 0L && now - this.getLastReceive() > idleEventInterval && now - this.lastAlertAt > idleEventInterval) {
            this.publishIdleContainerEvent(now - this.getLastReceive());
            this.lastAlertAt = now;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkConsumers(long now) {
        List<SimpleConsumer> consumersToCancel;
        Object object = this.consumersMonitor;
        synchronized (object) {
            consumersToCancel = this.consumers.stream().filter(consumer -> {
                boolean open;
                boolean bl = open = consumer.getChannel().isOpen() && !consumer.isAckFailed() && !consumer.targetChanged();
                if (open && this.messagesPerAck > 1) {
                    try {
                        consumer.ackIfNecessary(now);
                    }
                    catch (IOException e) {
                        this.logger.error((Object)"Exception while sending delayed ack", (Throwable)e);
                    }
                }
                return !open;
            }).collect(Collectors.toList());
        }
        consumersToCancel.forEach(consumer -> {
            block2: {
                try {
                    RabbitUtils.closeMessageConsumer(consumer.getChannel(), Collections.singletonList(consumer.getConsumerTag()), this.isChannelTransacted());
                }
                catch (Exception e) {
                    if (!this.logger.isDebugEnabled()) break block2;
                    this.logger.debug((Object)("Error closing consumer " + (Object)consumer), (Throwable)e);
                }
            }
            this.logger.error((Object)("Consumer canceled - channel closed " + (Object)consumer));
            consumer.cancelConsumer("Consumer " + (Object)consumer + " channel closed");
        });
    }

    private boolean restartConsumer(Map<String, Queue> namesToQueues, List<SimpleConsumer> restartableConsumers, SimpleConsumer consumerArg) {
        String actualName;
        SimpleConsumer consumer = consumerArg;
        Queue queue = namesToQueues.get(consumer.getQueue());
        if (queue != null && !StringUtils.hasText((String)queue.getName()) && StringUtils.hasText((String)(actualName = queue.getActualName()))) {
            namesToQueues.remove(consumer.getQueue());
            namesToQueues.put(actualName, queue);
            consumer = new SimpleConsumer(null, null, actualName, consumer.getIndex());
        }
        try {
            this.doConsumeFromQueue(consumer.getQueue(), consumer.getIndex());
            return true;
        }
        catch (AmqpConnectException | AmqpIOException e) {
            this.logger.error((Object)"Cannot connect to server", e);
            if (e.getCause() instanceof AmqpApplicationContextClosedException) {
                this.logger.error((Object)"Application context is closed, terminating");
                this.taskScheduler.schedule(this::stop, new Date());
            }
            this.consumersToRestart.addAll(restartableConsumers);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("After restart exception, consumers to restart now: " + this.consumersToRestart));
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startConsumers(String[] queueNames) {
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (this.hasStopped) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)"Consumer start aborted - container stopping");
                }
            } else {
                BackOffExecution backOffExecution = this.getRecoveryBackOff().start();
                while (!this.started && this.isRunning()) {
                    this.cancellationLock.reset();
                    try {
                        for (String queue : queueNames) {
                            this.consumeFromQueue(queue);
                        }
                    }
                    catch (AmqpConnectException | AmqpIOException e) {
                        long nextBackOff = backOffExecution.nextBackOff();
                        if (nextBackOff < 0L || e.getCause() instanceof AmqpApplicationContextClosedException) {
                            this.aborted = true;
                            this.shutdown();
                            this.logger.error((Object)"Failed to start container - fatal error or backOffs exhausted", e);
                            this.taskScheduler.schedule(this::stop, new Date());
                            break;
                        }
                        this.logger.error((Object)("Error creating consumer; retrying in " + nextBackOff), e);
                        this.doShutdown();
                        try {
                            Thread.sleep(nextBackOff);
                        }
                        catch (InterruptedException e1) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    this.started = true;
                    this.startedLatch.countDown();
                }
            }
        }
    }

    protected void doRedeclareElementsIfNecessary() {
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        try {
            this.redeclareElementsIfNecessary();
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to redeclare elements", (Throwable)e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(this.getRoutingConnectionFactory());
            }
        }
    }

    protected void processMonitorTask() {
    }

    private void checkMissingQueues(String[] queueNames) {
        if (this.isMissingQueuesFatal()) {
            AmqpAdmin checkAdmin = this.getAmqpAdmin();
            if (checkAdmin == null) {
                try {
                    Class clazz = ClassUtils.forName((String)"org.springframework.amqp.rabbit.core.RabbitAdmin", (ClassLoader)ClassUtils.getDefaultClassLoader());
                    Constructor ctor = clazz.getConstructor(ConnectionFactory.class);
                    checkAdmin = (AmqpAdmin)ctor.newInstance(this.getConnectionFactory());
                    this.setAmqpAdmin(checkAdmin);
                }
                catch (Exception e) {
                    this.logger.error((Object)"Failed to create a RabbitAdmin", (Throwable)e);
                }
            }
            if (checkAdmin != null) {
                for (String queue : queueNames) {
                    Properties queueProperties = checkAdmin.getQueueProperties(queue);
                    if (queueProperties != null || !this.isMissingQueuesFatal()) continue;
                    throw new IllegalStateException("At least one of the configured queues is missing");
                }
            }
        }
    }

    private void consumeFromQueue(String queue) {
        List list = (List)this.consumersByQueue.get((Object)queue);
        if (CollectionUtils.isEmpty((Collection)list)) {
            for (int i = 0; i < this.consumersPerQueue; ++i) {
                this.doConsumeFromQueue(queue, i);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConsumeFromQueue(String queue, int index) {
        if (!this.isActive()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Consume from queue " + queue + " ignore, container stopping"));
            }
            return;
        }
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        Connection connection = null;
        try {
            connection = this.getConnectionFactory().createConnection();
        }
        catch (Exception e) {
            this.publishConsumerFailedEvent(e.getMessage(), false, e);
            this.addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
            throw e instanceof AmqpConnectException ? (AmqpConnectException)((Object)e) : new AmqpConnectException(e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(this.getRoutingConnectionFactory());
            }
        }
        SimpleConsumer consumer = this.consume(queue, index, connection);
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (consumer != null) {
                this.cancellationLock.add(consumer);
                this.consumers.add(consumer);
                this.consumersByQueue.add((Object)queue, (Object)consumer);
                if (this.logger.isInfoEnabled()) {
                    this.logger.info((Object)((Object)((Object)consumer) + " started"));
                }
                if (this.getApplicationEventPublisher() != null) {
                    this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new AsyncConsumerStartedEvent(this, (Object)consumer));
                }
            }
        }
    }

    @Nullable
    private SimpleConsumer consume(String queue, int index, Connection connection) {
        Channel channel = null;
        SimpleConsumer consumer = null;
        try {
            if (this.getConsumeDelay() > 0L) {
                try {
                    Thread.sleep(this.getConsumeDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            channel = connection.createChannel(this.isChannelTransacted());
            channel.basicQos(this.getPrefetchCount(), this.isGlobalQos());
            consumer = new SimpleConsumer(connection, channel, queue, index);
            channel.queueDeclarePassive(queue);
            consumer.consumerTag = channel.basicConsume(queue, this.getAcknowledgeMode().isAutoAck(), this.getConsumerTagStrategy() != null ? this.getConsumerTagStrategy().createConsumerTag(queue) : "", this.isNoLocal(), this.isExclusive(), this.getConsumerArguments(), (Consumer)consumer);
        }
        catch (AmqpApplicationContextClosedException e) {
            throw new AmqpConnectException((Exception)((Object)e));
        }
        catch (Exception e) {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
            consumer = this.handleConsumeException(queue, index, consumer, e);
        }
        return consumer;
    }

    @Nullable
    private SimpleConsumer handleConsumeException(String queue, int index, @Nullable SimpleConsumer consumerArg, Exception e) {
        SimpleConsumer consumer = consumerArg;
        if (e.getCause() instanceof ShutdownSignalException && e.getCause().getMessage().contains("in exclusive use")) {
            this.getExclusiveConsumerExceptionLogger().log(this.logger, "Exclusive consumer failure", e.getCause());
            this.publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
        } else if (e.getCause() instanceof ShutdownSignalException && RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException)e.getCause())) {
            this.publishMissingQueueEvent(queue);
            this.logger.error((Object)("Queue not present, scheduling consumer " + (consumer == null ? "for queue " + queue : consumer) + " for restart"), (Throwable)e);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)("basicConsume failed, scheduling consumer " + (consumer == null ? "for queue " + queue : consumer) + " for restart"), (Throwable)e);
        }
        if (consumer == null) {
            this.addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
        } else {
            this.addConsumerToRestart(consumer);
            consumer = null;
        }
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doShutdown() {
        LinkedList<SimpleConsumer> canceledConsumers = null;
        boolean waitForConsumers = false;
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (this.started || this.aborted) {
                canceledConsumers = new LinkedList<SimpleConsumer>(this.consumers);
                this.actualShutDown(canceledConsumers);
                waitForConsumers = true;
            }
        }
        if (waitForConsumers) {
            try {
                if (this.cancellationLock.await(this.getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
                    this.logger.info((Object)"Successfully waited for consumers to finish.");
                } else {
                    this.logger.info((Object)"Consumers not finished.");
                    if (this.isForceCloseChannel()) {
                        canceledConsumers.forEach(consumer -> {
                            String eventMessage = "Closing channel for unresponsive consumer: " + (Object)consumer;
                            if (this.logger.isWarnEnabled()) {
                                this.logger.warn((Object)eventMessage);
                            }
                            consumer.cancelConsumer(eventMessage);
                        });
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.warn((Object)"Interrupted waiting for consumers. Continuing with shutdown.");
            }
            finally {
                this.startedLatch = new CountDownLatch(1);
                this.started = false;
                this.aborted = false;
                this.hasStopped = true;
            }
        }
    }

    private void actualShutDown(List<SimpleConsumer> consumers) {
        Assert.state((this.getTaskExecutor() != null ? 1 : 0) != 0, (String)"Cannot shut down if not initialized");
        this.logger.debug((Object)"Shutting down");
        consumers.forEach(this::cancelConsumer);
        this.consumers.clear();
        this.consumersByQueue.clear();
        this.logger.debug((Object)"All consumers canceled");
        if (this.consumerMonitorTask != null) {
            this.consumerMonitorTask.cancel(true);
            this.consumerMonitorTask = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelConsumer(SimpleConsumer consumer) {
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Canceling " + (Object)((Object)consumer)));
            }
            SimpleConsumer simpleConsumer = consumer;
            synchronized (simpleConsumer) {
                consumer.setCanceled(true);
                if (this.messagesPerAck > 1) {
                    try {
                        consumer.ackIfNecessary(0L);
                    }
                    catch (IOException e) {
                        this.logger.error((Object)"Exception while sending delayed ack", (Throwable)e);
                    }
                }
            }
            RabbitUtils.cancel(consumer.getChannel(), consumer.getConsumerTag());
        }
        finally {
            this.consumers.remove((Object)consumer);
            this.consumerRemoved(consumer);
        }
    }

    private void addConsumerToRestart(SimpleConsumer consumer) {
        this.consumersToRestart.add(consumer);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Consumers to restart now: " + this.consumersToRestart));
        }
    }

    protected void consumerRemoved(SimpleConsumer consumer) {
    }

    final class SimpleConsumer
    extends DefaultConsumer {
        private final Log logger;
        private final Connection connection;
        private final String queue;
        private final int index;
        private final boolean ackRequired;
        private final ConnectionFactory connectionFactory;
        private final PlatformTransactionManager transactionManager;
        private final TransactionAttribute transactionAttribute;
        private final boolean isRabbitTxManager;
        private final int messagesPerAck;
        private final long ackTimeout;
        private final Channel targetChannel;
        private int pendingAcks;
        private long lastAck;
        private long latestDeferredDeliveryTag;
        private volatile String consumerTag;
        private volatile int epoch;
        private volatile TransactionTemplate transactionTemplate;
        private volatile boolean canceled;
        private volatile boolean ackFailed;

        SimpleConsumer(@Nullable Connection connection, Channel channel, String queue, int index) {
            super(channel);
            this.logger = DirectMessageListenerContainer.this.logger;
            this.connectionFactory = DirectMessageListenerContainer.this.getConnectionFactory();
            this.transactionManager = DirectMessageListenerContainer.this.getTransactionManager();
            this.transactionAttribute = DirectMessageListenerContainer.this.getTransactionAttribute();
            this.isRabbitTxManager = this.transactionManager instanceof RabbitTransactionManager;
            this.messagesPerAck = DirectMessageListenerContainer.this.messagesPerAck;
            this.ackTimeout = DirectMessageListenerContainer.this.ackTimeout;
            this.lastAck = System.currentTimeMillis();
            this.connection = connection;
            this.queue = queue;
            this.index = index;
            this.ackRequired = !DirectMessageListenerContainer.this.getAcknowledgeMode().isAutoAck() && !DirectMessageListenerContainer.this.getAcknowledgeMode().isManual();
            this.targetChannel = channel instanceof ChannelProxy ? ((ChannelProxy)channel).getTargetChannel() : null;
        }

        String getQueue() {
            return this.queue;
        }

        int getIndex() {
            return this.index;
        }

        public String getConsumerTag() {
            return this.consumerTag;
        }

        int getEpoch() {
            return this.epoch;
        }

        void setCanceled(boolean canceled) {
            this.canceled = canceled;
        }

        boolean isAckFailed() {
            return this.ackFailed;
        }

        boolean targetChanged() {
            return this.targetChannel != null && !((ChannelProxy)this.getChannel()).getTargetChannel().equals(this.targetChannel);
        }

        int incrementAndGetEpoch() {
            return ++this.epoch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - void declaration
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            void var9_10;
            MessageProperties messageProperties = DirectMessageListenerContainer.this.getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
            messageProperties.setConsumerTag(consumerTag);
            messageProperties.setConsumerQueue(this.queue);
            Message message = new Message(body, messageProperties);
            long deliveryTag = envelope.getDeliveryTag();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)((Object)((Object)this) + " received " + message));
            }
            DirectMessageListenerContainer.this.updateLastReceive();
            Message message2 = message;
            List<Message> debatched = DirectMessageListenerContainer.this.debatch(message);
            if (debatched != null) {
                List<Message> list = debatched;
            }
            if (this.transactionManager != null) {
                try {
                    this.executeListenerInTransaction(var9_10, deliveryTag);
                    return;
                }
                catch (AbstractMessageListenerContainer.WrappedTransactionException e) {
                    if (!(e.getCause() instanceof Error)) return;
                    throw (Error)e.getCause();
                }
                catch (Exception exception) {
                    return;
                }
                finally {
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                }
            }
            try {
                this.callExecuteListener(var9_10, deliveryTag);
                return;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        private void executeListenerInTransaction(Object data, long deliveryTag) {
            if (this.isRabbitTxManager) {
                ConsumerChannelRegistry.registerConsumerChannel(this.getChannel(), this.connectionFactory);
            }
            if (this.transactionTemplate == null) {
                this.transactionTemplate = new TransactionTemplate(this.transactionManager, (TransactionDefinition)this.transactionAttribute);
            }
            this.transactionTemplate.execute(s -> {
                RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(this.getChannel(), false), this.connectionFactory, true);
                if (resourceHolder != null) {
                    resourceHolder.addDeliveryTag(this.getChannel(), deliveryTag);
                }
                try {
                    this.callExecuteListener(data, deliveryTag);
                }
                catch (RuntimeException e1) {
                    DirectMessageListenerContainer.this.prepareHolderForRollback(resourceHolder, e1);
                    throw e1;
                }
                catch (Throwable e2) {
                    throw new AbstractMessageListenerContainer.WrappedTransactionException(e2);
                }
                return null;
            });
        }

        private void callExecuteListener(Object data, long deliveryTag) {
            boolean channelLocallyTransacted = DirectMessageListenerContainer.this.isChannelLocallyTransacted();
            try {
                DirectMessageListenerContainer.this.executeListener(this.getChannel(), data);
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (ImmediateAcknowledgeAmqpException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("User requested ack for failed delivery '" + e.getMessage() + "': " + deliveryTag));
                }
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (Exception e) {
                if (DirectMessageListenerContainer.this.causeChainHasImmediateAcknowledgeAmqpException(e)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("User requested ack for failed delivery: " + deliveryTag));
                    }
                    this.handleAck(deliveryTag, channelLocallyTransacted);
                } else {
                    this.logger.error((Object)"Failed to invoke listener", (Throwable)e);
                    if (this.transactionManager != null) {
                        if (this.transactionAttribute.rollbackOn((Throwable)e)) {
                            RabbitResourceHolder resourceHolder = (RabbitResourceHolder)((Object)TransactionSynchronizationManager.getResource((Object)DirectMessageListenerContainer.this.getConnectionFactory()));
                            if (resourceHolder == null) {
                                this.rollback(deliveryTag, e);
                            }
                            throw e;
                        }
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("No rollback for " + e));
                        }
                    } else {
                        this.rollback(deliveryTag, e);
                    }
                }
            }
            catch (Error e) {
                this.logger.error((Object)"Failed to invoke listener", (Throwable)e);
                DirectMessageListenerContainer.this.getJavaLangErrorHandler().handle(e);
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleAck(long deliveryTag, boolean channelLocallyTransacted) {
            boolean isLocallyTransacted = channelLocallyTransacted || DirectMessageListenerContainer.this.isChannelTransacted() && TransactionSynchronizationManager.getResource((Object)this.connectionFactory) == null;
            try {
                if (this.ackRequired) {
                    if (this.messagesPerAck > 1) {
                        SimpleConsumer simpleConsumer = this;
                        synchronized (simpleConsumer) {
                            this.latestDeferredDeliveryTag = deliveryTag;
                            ++this.pendingAcks;
                            this.ackIfNecessary(this.lastAck);
                        }
                    } else if (!DirectMessageListenerContainer.this.isChannelTransacted() || isLocallyTransacted) {
                        this.getChannel().basicAck(deliveryTag, false);
                    }
                }
                if (isLocallyTransacted) {
                    RabbitUtils.commitIfNecessary(this.getChannel());
                }
            }
            catch (Exception e) {
                this.ackFailed = true;
                this.logger.error((Object)"Error acking", (Throwable)e);
            }
        }

        synchronized void ackIfNecessary(long now) throws IOException {
            if (this.pendingAcks >= this.messagesPerAck || this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled)) {
                this.sendAck(now);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void rollback(long deliveryTag, Exception e) {
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.rollbackIfNecessary(this.getChannel());
            }
            if (this.ackRequired || ContainerUtils.isRejectManual(e)) {
                try {
                    if (this.messagesPerAck > 1) {
                        SimpleConsumer simpleConsumer = this;
                        synchronized (simpleConsumer) {
                            if (this.pendingAcks > 0) {
                                this.sendAck(System.currentTimeMillis());
                            }
                        }
                    }
                    this.getChannel().basicNack(deliveryTag, !DirectMessageListenerContainer.this.isAsyncReplies(), ContainerUtils.shouldRequeue(DirectMessageListenerContainer.this.isDefaultRequeueRejected(), e, this.logger));
                }
                catch (IOException e1) {
                    this.logger.error((Object)"Failed to nack message", (Throwable)e1);
                }
            }
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.commitIfNecessary(this.getChannel());
            }
        }

        protected synchronized void sendAck(long now) throws IOException {
            this.getChannel().basicAck(this.latestDeferredDeliveryTag, true);
            this.lastAck = now;
            this.pendingAcks = 0;
        }

        public void handleConsumeOk(String consumerTag) {
            super.handleConsumeOk(consumerTag);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("New " + (Object)((Object)this) + " consumeOk"));
            }
            if (DirectMessageListenerContainer.this.getApplicationEventPublisher() != null) {
                DirectMessageListenerContainer.this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new ConsumeOkEvent((Object)this, this.getQueue(), consumerTag));
            }
        }

        public void handleCancelOk(String consumerTag) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("CancelOk " + (Object)((Object)this)));
            }
            this.finalizeConsumer();
        }

        public void handleCancel(String consumerTag) {
            this.logger.error((Object)("Consumer canceled - queue deleted? " + (Object)((Object)this)));
            this.cancelConsumer("Consumer " + (Object)((Object)this) + " canceled");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancelConsumer(String eventMessage) {
            DirectMessageListenerContainer.this.publishConsumerFailedEvent(eventMessage, true, null);
            Object object = DirectMessageListenerContainer.this.consumersMonitor;
            synchronized (object) {
                List list = (List)DirectMessageListenerContainer.this.consumersByQueue.get((Object)this.queue);
                if (list != null) {
                    list.remove((Object)this);
                }
                DirectMessageListenerContainer.this.consumers.remove((Object)this);
                DirectMessageListenerContainer.this.addConsumerToRestart(this);
            }
            this.finalizeConsumer();
        }

        private void finalizeConsumer() {
            RabbitUtils.setPhysicalCloseRequired(this.getChannel(), true);
            RabbitUtils.closeChannel(this.getChannel());
            RabbitUtils.closeConnection(this.connection);
            DirectMessageListenerContainer.this.cancellationLock.release(this);
            DirectMessageListenerContainer.this.consumerRemoved(this);
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + this.getEnclosingInstance().hashCode();
            result = 31 * result + this.index;
            result = 31 * result + (this.queue == null ? 0 : this.queue.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (((Object)((Object)this)).getClass() != obj.getClass()) {
                return false;
            }
            SimpleConsumer other = (SimpleConsumer)((Object)obj);
            if (!this.getEnclosingInstance().equals(other.getEnclosingInstance())) {
                return false;
            }
            if (this.index != other.index) {
                return false;
            }
            return !(this.queue == null ? other.queue != null : !this.queue.equals(other.queue));
        }

        private DirectMessageListenerContainer getEnclosingInstance() {
            return DirectMessageListenerContainer.this;
        }

        public String toString() {
            return "SimpleConsumer [queue=" + this.queue + ", index=" + this.index + ", consumerTag=" + this.consumerTag + " identity=" + ObjectUtils.getIdentityHexString((Object)((Object)this)) + "]";
        }
    }
}

