/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.channel;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.ContextView;

public class FluxMessageChannel
extends AbstractMessageChannel
implements Publisher<Message<?>>,
ReactiveStreamsSubscribableChannel {
    private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
    private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
    private volatile boolean active = true;

    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        Assert.state((this.active && this.sink.currentSubscriberCount() > 0 ? 1 : 0) != 0, () -> "The [" + this + "] doesn't have subscribers to accept messages");
        long remainingTime = 0L;
        if (timeout > 0L) {
            remainingTime = timeout;
        }
        long parkTimeout = 10L;
        long parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(parkTimeout);
        while (this.active && !this.tryEmitMessage(message)) {
            if (timeout >= 0L && (remainingTime -= parkTimeout) <= 0L) {
                return false;
            }
            LockSupport.parkNanos(parkTimeoutNs);
        }
        return true;
    }

    private boolean tryEmitMessage(Message<?> message) {
        Message<?> messageToEmit = message;
        ContextView contextView = IntegrationReactiveUtils.captureReactorContext();
        if (!contextView.isEmpty()) {
            messageToEmit = MutableMessageBuilder.fromMessage(message).setHeader("reactorContext", contextView).build();
        }
        return switch (this.sink.tryEmitNext(messageToEmit)) {
            default -> throw new IncompatibleClassChangeError();
            case Sinks.EmitResult.OK -> true;
            case Sinks.EmitResult.FAIL_NON_SERIALIZED, Sinks.EmitResult.FAIL_OVERFLOW -> false;
            case Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER -> throw new IllegalStateException("The [" + this + "] doesn't have subscribers to accept messages");
            case Sinks.EmitResult.FAIL_TERMINATED, Sinks.EmitResult.FAIL_CANCELLED -> throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: " + this.sink);
        };
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.sink.asFlux().publish(1).refCount().subscribe(subscriber);
    }

    private void addPublisherToSubscribe(Flux<?> publisher) {
        AtomicReference<Disposable> disposableReference = new AtomicReference<Disposable>();
        Disposable disposable = publisher.doOnTerminate(() -> this.disposeUpstreamSubscription(disposableReference)).subscribe();
        if (!disposable.isDisposed() && this.upstreamSubscriptions.add(disposable)) {
            disposableReference.set(disposable);
        }
    }

    private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableReference) {
        Disposable disposable = disposableReference.get();
        if (disposable != null) {
            this.upstreamSubscriptions.remove(disposable);
            disposable.dispose();
        }
    }

    @Override
    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        Flux upstreamPublisher = Flux.from(publisher).delaySubscription((Publisher)Mono.fromCallable(() -> this.sink.currentSubscriberCount()).filter(value -> value > 0).repeatWhenEmpty(repeat -> this.active ? repeat.delayElements(Duration.ofMillis(100L)) : repeat)).flatMap(message -> Mono.just((Object)message).handle((messageToHandle, syncSink) -> this.sendReactiveMessage((Message<?>)messageToHandle)).contextWrite(StaticMessageHeaderAccessor.getReactorContext(message))).contextCapture();
        this.addPublisherToSubscribe(upstreamPublisher);
    }

    private void sendReactiveMessage(Message<?> message) {
        Message<?> messageToSend = message;
        if (messageToSend.getHeaders().containsKey((Object)"reactorContext")) {
            messageToSend = MutableMessageBuilder.fromMessage(message).removeHeader("reactorContext").build();
        }
        try {
            if (!this.send(messageToSend)) {
                this.logger.warn((Throwable)new MessageDeliveryException(messageToSend, "Failed to send message to channel '" + this), (CharSequence)"Message was not delivered");
            }
        }
        catch (Exception ex) {
            this.logger.warn((Throwable)ex, (CharSequence)LogMessage.format((String)"Error during processing event: %s", messageToSend));
        }
    }

    @Override
    public void destroy() {
        this.active = false;
        this.upstreamSubscriptions.dispose();
        this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        super.destroy();
    }
}

