/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.DefaultAcknowledgment;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import reactor.core.processor.RingBufferProcessor;

class QueueingMessageListenerInvoker {
    private final MessageListener messageListener;
    private final AcknowledgingMessageListener acknowledgingMessageListener;
    private final OffsetManager offsetManager;
    private final ErrorHandler errorHandler;
    private final int capacity;
    private final ExecutorService executorService;
    private RingBufferProcessor<KafkaMessage> ringBufferProcessor;
    private volatile boolean running = false;
    private volatile CountDownLatch shutdownLatch;

    public QueueingMessageListenerInvoker(int capacity, OffsetManager offsetManager, Object delegate, ErrorHandler errorHandler, Executor executor) {
        this.capacity = capacity;
        if (delegate instanceof MessageListener) {
            this.messageListener = (MessageListener)delegate;
            this.acknowledgingMessageListener = null;
        } else if (delegate instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener)delegate;
            this.messageListener = null;
        } else {
            throw new IllegalArgumentException("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
        }
        this.offsetManager = offsetManager;
        this.errorHandler = errorHandler;
        this.executorService = executor != null ? new ExecutorServiceAdapter((TaskExecutor)new ConcurrentTaskExecutor(executor)) : null;
    }

    public void enqueue(KafkaMessage message) {
        if (this.running) {
            this.ringBufferProcessor.onNext((Object)message);
        }
    }

    public synchronized void start() {
        if (!this.running) {
            this.running = true;
            ExecutorService service = this.executorService != null ? this.executorService : Executors.newSingleThreadExecutor();
            this.ringBufferProcessor = RingBufferProcessor.share((ExecutorService)service, (int)this.capacity);
            this.ringBufferProcessor.subscribe((Subscriber)new KafkaMessageDispatchingSubscriber());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stop(long stopTimeout) {
        if (this.running) {
            this.running = false;
            if (this.ringBufferProcessor != null) {
                this.shutdownLatch = new CountDownLatch(1);
                this.ringBufferProcessor.onComplete();
                this.ringBufferProcessor = null;
                try {
                    this.shutdownLatch.await(stopTimeout, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.shutdownLatch = null;
                }
            }
        }
    }

    private class KafkaMessageDispatchingSubscriber
    implements Subscriber<KafkaMessage> {
        private KafkaMessageDispatchingSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(KafkaMessage kafkaMessage) {
            try {
                if (QueueingMessageListenerInvoker.this.messageListener != null) {
                    QueueingMessageListenerInvoker.this.messageListener.onMessage(kafkaMessage);
                } else {
                    QueueingMessageListenerInvoker.this.acknowledgingMessageListener.onMessage(kafkaMessage, new DefaultAcknowledgment(QueueingMessageListenerInvoker.this.offsetManager, kafkaMessage));
                }
            }
            catch (Exception e) {
                if (QueueingMessageListenerInvoker.this.errorHandler != null) {
                    QueueingMessageListenerInvoker.this.errorHandler.handle(e, kafkaMessage);
                }
            }
            finally {
                if (QueueingMessageListenerInvoker.this.messageListener != null) {
                    QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                }
            }
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
            CountDownLatch latch = QueueingMessageListenerInvoker.this.shutdownLatch;
            if (latch != null) {
                latch.countDown();
            }
        }
    }
}

