/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerResponse;
import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCnx
extends PulsarHandler {
    protected final Authentication authentication;
    private State state;
    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests = new ConcurrentLongHashMap(16, 1);
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<BinaryProtoLookupService.LookupDataResult>>>> waitingLookupRequests;
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap(16, 1);
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture();
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue();
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
    private final EventLoopGroup eventLoopGroup;
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ClientCnx.class, "numberOfRejectRequests");
    private volatile int numberOfRejectRequests = 0;
    private static int maxMessageSize = 0x500000;
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    private final int protocolVersion;
    private final long operationTimeoutMs;
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
    private boolean isTlsHostnameVerificationEnable;
    private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
    private ScheduledFuture<?> timeoutTask;
    protected AuthenticationDataProvider authenticationDataProvider;
    private TransactionBufferHandler transactionBufferHandler;
    private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);

    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
    }

    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
        super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        Preconditions.checkArgument((conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest() ? 1 : 0) != 0);
        this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), false);
        this.maxLookupRequestSemaphore = new Semaphore(conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest(), false);
        this.waitingLookupRequests = Queues.newConcurrentLinkedQueue();
        this.authentication = conf.getAuthentication();
        this.eventLoopGroup = eventLoopGroup;
        this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
        this.operationTimeoutMs = conf.getOperationTimeoutMs();
        this.state = State.None;
        this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
        this.protocolVersion = protocolVersion;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> this.checkRequestTimeout(), this.operationTimeoutMs, this.operationTimeoutMs, TimeUnit.MILLISECONDS);
        if (this.proxyToTargetBrokerAddress == null) {
            if (log.isDebugEnabled()) {
                log.debug("{} Connected to broker", (Object)ctx.channel());
            }
        } else {
            log.info("{} Connected through proxy to target broker at {}", (Object)ctx.channel(), (Object)this.proxyToTargetBrokerAddress);
        }
        ctx.writeAndFlush((Object)this.newConnectCommand()).addListener(future -> {
            if (future.isSuccess()) {
                if (log.isDebugEnabled()) {
                    log.debug("Complete: {}", (Object)future.isSuccess());
                }
                this.state = State.SentConnectFrame;
            } else {
                log.warn("Error during handshake", future.cause());
                ctx.close();
            }
        });
    }

    protected ByteBuf newConnectCommand() throws Exception {
        this.authenticationDataProvider = this.authentication.getAuthData(this.remoteHostName);
        AuthData authData = this.authenticationDataProvider.authenticate(AuthData.of((byte[])AuthData.INIT_AUTH_DATA));
        return Commands.newConnect((String)this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)PulsarVersion.getVersion(), (String)this.proxyToTargetBrokerAddress, null, null, null);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("{} Disconnected", (Object)ctx.channel());
        if (!this.connectionFuture.isDone()) {
            this.connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
        }
        PulsarClientException e = new PulsarClientException("Disconnected from server at " + ctx.channel().remoteAddress());
        this.pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.waitingLookupRequests.forEach(pair -> ((CompletableFuture)((Pair)pair.getRight()).getRight()).completeExceptionally(e));
        this.producers.forEach((id, producer) -> producer.connectionClosed(this));
        this.consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
        this.transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this));
        this.pendingRequests.clear();
        this.waitingLookupRequests.clear();
        this.producers.clear();
        this.consumers.clear();
        this.timeoutTask.cancel(true);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {}", (Object)this.remoteAddress, ClientCnx.isKnownException(cause) ? cause : ExceptionUtils.getStackTrace((Throwable)cause));
            this.state = State.Failed;
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", new Object[]{this.remoteAddress, cause.getMessage(), cause});
        }
        ctx.close();
    }

    public static boolean isKnownException(Throwable t) {
        return t instanceof Errors.NativeIoException || t instanceof ClosedChannelException;
    }

    protected void handleConnected(PulsarApi.CommandConnected connected) {
        if (this.isTlsHostnameVerificationEnable && this.remoteHostName != null && !this.verifyTlsHostName(this.remoteHostName, this.ctx)) {
            log.warn("[{}] Failed to verify hostname of {}", (Object)this.ctx.channel(), (Object)this.remoteHostName);
            this.ctx.close();
            return;
        }
        Preconditions.checkArgument((this.state == State.SentConnectFrame || this.state == State.Connecting ? 1 : 0) != 0);
        if (connected.hasMaxMessageSize()) {
            if (log.isDebugEnabled()) {
                log.debug("{} Connection has max message size setting, replace old frameDecoder with server frame size {}", (Object)this.ctx.channel(), (Object)connected.getMaxMessageSize());
            }
            maxMessageSize = connected.getMaxMessageSize();
            this.ctx.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
        }
        if (log.isDebugEnabled()) {
            log.debug("{} Connection is ready", (Object)this.ctx.channel());
        }
        this.remoteEndpointProtocolVersion = connected.getProtocolVersion();
        this.connectionFuture.complete(null);
        this.state = State.Ready;
    }

    protected void handleAuthChallenge(PulsarApi.CommandAuthChallenge authChallenge) {
        Preconditions.checkArgument((boolean)authChallenge.hasChallenge());
        Preconditions.checkArgument((boolean)authChallenge.getChallenge().hasAuthData());
        try {
            AuthData authData = this.authenticationDataProvider.authenticate(AuthData.of((byte[])authChallenge.getChallenge().getAuthData().toByteArray()));
            Preconditions.checkState((!authData.isComplete() ? 1 : 0) != 0);
            ByteBuf request = Commands.newAuthResponse((String)this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)PulsarVersion.getVersion());
            if (log.isDebugEnabled()) {
                log.debug("{} Mutual auth {}", (Object)this.ctx.channel(), (Object)this.authentication.getAuthMethodName());
            }
            this.ctx.writeAndFlush((Object)request).addListener(writeFuture -> {
                if (!writeFuture.isSuccess()) {
                    log.warn("{} Failed to send request for mutual auth to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                    this.connectionFuture.completeExceptionally(writeFuture.cause());
                }
            });
            if (this.state == State.SentConnectFrame) {
                this.state = State.Connecting;
            }
        }
        catch (Exception e) {
            log.error("{} Error mutual verify: {}", (Object)this.ctx.channel(), (Object)e);
            this.connectionFuture.completeExceptionally(e);
            return;
        }
    }

    protected void handleSendReceipt(PulsarApi.CommandSendReceipt sendReceipt) {
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        long producerId = sendReceipt.getProducerId();
        long sequenceId = sendReceipt.getSequenceId();
        long highestSequenceId = sendReceipt.getHighestSequenceId();
        long ledgerId = -1L;
        long entryId = -1L;
        if (sendReceipt.hasMessageId()) {
            ledgerId = sendReceipt.getMessageId().getLedgerId();
            entryId = sendReceipt.getMessageId().getEntryId();
        }
        if (ledgerId == -1L && entryId == -1L) {
            log.warn("[{}] Message has been dropped for non-persistent topic producer-id {}-{}", new Object[]{this.ctx.channel(), producerId, sequenceId});
        }
        if (log.isDebugEnabled()) {
            log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", new Object[]{this.ctx.channel(), producerId, sequenceId, ledgerId, entryId});
        }
        ((ProducerImpl)this.producers.get(producerId)).ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId);
    }

    protected void handleAckResponse(PulsarApi.CommandAckResponse ackResponse) {
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        Preconditions.checkArgument((ackResponse.getRequestId() >= 0L ? 1 : 0) != 0);
        long consumerId = ackResponse.getConsumerId();
        if (!ackResponse.hasError()) {
            ((ConsumerImpl)this.consumers.get(consumerId)).ackReceipt(ackResponse.getRequestId());
        } else {
            ((ConsumerImpl)this.consumers.get(consumerId)).ackError(ackResponse.getRequestId(), this.getPulsarClientException(ackResponse.getError(), ackResponse.getMessage()));
        }
    }

    protected void handleMessage(PulsarApi.CommandMessage cmdMessage, ByteBuf headersAndPayload) {
        ConsumerImpl consumer;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received a message from the server: {}", (Object)this.ctx.channel(), (Object)cmdMessage);
        }
        if ((consumer = (ConsumerImpl)this.consumers.get(cmdMessage.getConsumerId())) != null) {
            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), cmdMessage.getAckSetList(), headersAndPayload, this);
        }
    }

    protected void handleActiveConsumerChange(PulsarApi.CommandActiveConsumerChange change) {
        ConsumerImpl consumer;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received a consumer group change message from the server : {}", (Object)this.ctx.channel(), (Object)change);
        }
        if ((consumer = (ConsumerImpl)this.consumers.get(change.getConsumerId())) != null) {
            consumer.activeConsumerChanged(change.getIsActive());
        }
    }

    protected void handleSuccess(PulsarApi.CommandSuccess success) {
        long requestId;
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received success response from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(null);
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdResponse success) {
        long requestId;
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received success GetLastMessageId response from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(success.getLastMessageId());
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleProducerSuccess(PulsarApi.CommandProducerSuccess success) {
        long requestId;
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("{} Received producer success response from server: {} - producer-name: {}", new Object[]{this.ctx.channel(), success.getRequestId(), success.getProducerName()});
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(new ProducerResponse(success.getProducerName(), success.getLastSequenceId(), success.getSchemaVersion().toByteArray()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleLookupResponse(PulsarApi.CommandLookupTopicResponse lookupResult) {
        long requestId;
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture;
        if (log.isDebugEnabled()) {
            log.debug("Received Broker lookup response: {}", (Object)lookupResult.getResponse());
        }
        if ((requestFuture = this.getAndRemovePendingLookupRequest(requestId = lookupResult.getRequestId())) != null) {
            if (requestFuture.isCompletedExceptionally()) {
                if (log.isDebugEnabled()) {
                    log.debug("{} Request {} already timed-out", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
                }
                return;
            }
            if (!lookupResult.hasResponse() || PulsarApi.CommandLookupTopicResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally((Throwable)new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    protected void handlePartitionResponse(PulsarApi.CommandPartitionedTopicMetadataResponse lookupResult) {
        long requestId;
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture;
        if (log.isDebugEnabled()) {
            log.debug("Received Broker Partition response: {}", (Object)lookupResult.getPartitions());
        }
        if ((requestFuture = this.getAndRemovePendingLookupRequest(requestId = lookupResult.getRequestId())) != null) {
            if (requestFuture.isCompletedExceptionally()) {
                if (log.isDebugEnabled()) {
                    log.debug("{} Request {} already timed-out", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
                }
                return;
            }
            if (!lookupResult.hasResponse() || PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally((Throwable)new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult.getPartitions()));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    protected void handleReachedEndOfTopic(PulsarApi.CommandReachedEndOfTopic commandReachedEndOfTopic) {
        long consumerId = commandReachedEndOfTopic.getConsumerId();
        log.info("[{}] Broker notification reached the end of topic: {}", (Object)this.remoteAddress, (Object)consumerId);
        ConsumerImpl consumer = (ConsumerImpl)this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.setTerminated();
        }
    }

    private void addPendingLookupRequests(long requestId, CompletableFuture<BinaryProtoLookupService.LookupDataResult> future) {
        this.pendingRequests.put(requestId, future);
        this.eventLoopGroup.schedule(() -> {
            if (!future.isDone()) {
                future.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(requestId + " lookup request timedout after ms " + this.operationTimeoutMs));
            }
        }, this.operationTimeoutMs, TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<BinaryProtoLookupService.LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
        CompletableFuture result = (CompletableFuture)this.pendingRequests.remove(requestId);
        if (result != null) {
            Pair<Long, Pair<ByteBuf, CompletableFuture<BinaryProtoLookupService.LookupDataResult>>> firstOneWaiting = this.waitingLookupRequests.poll();
            if (firstOneWaiting != null) {
                this.maxLookupRequestSemaphore.release();
                this.eventLoopGroup.submit(() -> {
                    long newId = (Long)firstOneWaiting.getLeft();
                    CompletableFuture newFuture = (CompletableFuture)((Pair)firstOneWaiting.getRight()).getRight();
                    this.addPendingLookupRequests(newId, newFuture);
                    this.ctx.writeAndFlush(((Pair)firstOneWaiting.getRight()).getLeft()).addListener(writeFuture -> {
                        if (!writeFuture.isSuccess()) {
                            log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), newId, writeFuture.cause().getMessage()});
                            this.getAndRemovePendingLookupRequest(newId);
                            newFuture.completeExceptionally(writeFuture.cause());
                        }
                    });
                });
            } else {
                this.pendingLookupRequestSemaphore.release();
            }
        }
        return result;
    }

    protected void handleSendError(PulsarApi.CommandSendError sendError) {
        log.warn("{} Received send error from server: {} : {}", new Object[]{this.ctx.channel(), sendError.getError(), sendError.getMessage()});
        long producerId = sendError.getProducerId();
        long sequenceId = sendError.getSequenceId();
        switch (sendError.getError()) {
            case ChecksumError: {
                ((ProducerImpl)this.producers.get(producerId)).recoverChecksumError(this, sequenceId);
                break;
            }
            case TopicTerminatedError: {
                ((ProducerImpl)this.producers.get(producerId)).terminated(this);
                break;
            }
            default: {
                this.ctx.close();
            }
        }
    }

    protected void handleError(PulsarApi.CommandError error) {
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.SentConnectFrame || this.state == State.Ready ? 1 : 0) != 0);
        log.warn("{} Received error from server: {}", (Object)this.ctx.channel(), (Object)error.getMessage());
        long requestId = error.getRequestId();
        if (error.getError() == PulsarApi.ServerError.ProducerBlockedQuotaExceededError) {
            log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", (Object)this.ctx.channel());
        }
        if (error.getError() == PulsarApi.ServerError.AuthenticationError) {
            this.connectionFuture.completeExceptionally((Throwable)new PulsarClientException.AuthenticationException(error.getMessage()));
            log.error("{} Failed to authenticate the client", (Object)this.ctx.channel());
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId)) != null) {
            requestFuture.completeExceptionally(this.getPulsarClientException(error.getError(), error.getMessage()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)error.getRequestId());
        }
    }

    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", (Object)this.remoteAddress, (Object)closeProducer.getProducerId());
        long producerId = closeProducer.getProducerId();
        ProducerImpl producer = (ProducerImpl)this.producers.get(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", (Object)producerId);
        }
    }

    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        log.info("[{}] Broker notification of Closed consumer: {}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long consumerId = closeConsumer.getConsumerId();
        ConsumerImpl consumer = (ConsumerImpl)this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.connectionClosed(this);
        } else {
            log.warn("Consumer with id {} not found while closing consumer ", (Object)consumerId);
        }
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Ready;
    }

    public CompletableFuture<BinaryProtoLookupService.LookupDataResult> newLookup(ByteBuf request, long requestId) {
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> future = new CompletableFuture<BinaryProtoLookupService.LookupDataResult>();
        if (this.pendingLookupRequestSemaphore.tryAcquire()) {
            this.addPendingLookupRequests(requestId, future);
            this.ctx.writeAndFlush((Object)request).addListener(writeFuture -> {
                if (!writeFuture.isSuccess()) {
                    log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), requestId, writeFuture.cause().getMessage()});
                    this.getAndRemovePendingLookupRequest(requestId);
                    future.completeExceptionally(writeFuture.cause());
                }
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("{} Failed to add lookup-request into pending queue", (Object)requestId);
            }
            if (this.maxLookupRequestSemaphore.tryAcquire()) {
                this.waitingLookupRequests.add((Pair<Long, Pair<ByteBuf, CompletableFuture<BinaryProtoLookupService.LookupDataResult>>>)Pair.of((Object)requestId, (Object)Pair.of((Object)request, future)));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("{} Failed to add lookup-request into waiting queue", (Object)requestId);
                }
                future.completeExceptionally((Throwable)new PulsarClientException.TooManyRequestsException(String.format("Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests pending.", this.pendingLookupRequestSemaphore.availablePermits(), this.waitingLookupRequests.size())));
            }
        }
        return future;
    }

    public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
        return this.sendRequestAndHandleTimeout(request, requestId, RequestType.GetTopics);
    }

    protected void handleGetTopicsOfNamespaceSuccess(PulsarApi.CommandGetTopicsOfNamespaceResponse success) {
        CompletableFuture requestFuture;
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        long requestId = success.getRequestId();
        List topics = success.getTopicsList();
        if (log.isDebugEnabled()) {
            log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}", new Object[]{this.ctx.channel(), success.getRequestId(), topics.size()});
        }
        if ((requestFuture = (CompletableFuture)this.pendingRequests.remove(requestId)) != null) {
            requestFuture.complete(topics);
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    protected void handleGetSchemaResponse(PulsarApi.CommandGetSchemaResponse commandGetSchemaResponse) {
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        long requestId = commandGetSchemaResponse.getRequestId();
        CompletableFuture future = (CompletableFuture)this.pendingRequests.remove(requestId);
        if (future == null) {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)requestId);
            return;
        }
        future.complete(commandGetSchemaResponse);
    }

    protected void handleGetOrCreateSchemaResponse(PulsarApi.CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
        Preconditions.checkArgument((this.state == State.Ready ? 1 : 0) != 0);
        long requestId = commandGetOrCreateSchemaResponse.getRequestId();
        CompletableFuture future = (CompletableFuture)this.pendingRequests.remove(requestId);
        if (future == null) {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)requestId);
            return;
        }
        future.complete(commandGetOrCreateSchemaResponse);
    }

    Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    Channel channel() {
        return this.ctx.channel();
    }

    SocketAddress serverAddrees() {
        return this.remoteAddress;
    }

    CompletableFuture<Void> connectionFuture() {
        return this.connectionFuture;
    }

    CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
        return this.sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command);
    }

    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
        CompletableFuture future = new CompletableFuture();
        this.pendingRequests.put(requestId, future);
        this.ctx.writeAndFlush((Object)requestMessage).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send {} to broker: {}", new Object[]{this.ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage()});
                this.pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        this.requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
        return future;
    }

    public CompletableFuture<PulsarApi.MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) {
        return this.sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId);
    }

    public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) {
        return this.sendGetRawSchema(request, requestId).thenCompose(commandGetSchemaResponse -> {
            if (commandGetSchemaResponse.hasErrorCode()) {
                PulsarApi.ServerError rc = commandGetSchemaResponse.getErrorCode();
                if (rc == PulsarApi.ServerError.TopicNotFound) {
                    return CompletableFuture.completedFuture(Optional.empty());
                }
                return FutureUtil.failedFuture((Throwable)this.getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage()));
            }
            return CompletableFuture.completedFuture(Optional.of(SchemaInfoUtil.newSchemaInfo((PulsarApi.Schema)commandGetSchemaResponse.getSchema())));
        });
    }

    public CompletableFuture<PulsarApi.CommandGetSchemaResponse> sendGetRawSchema(ByteBuf request, long requestId) {
        return this.sendRequestAndHandleTimeout(request, requestId, RequestType.GetSchema);
    }

    public CompletableFuture<byte[]> sendGetOrCreateSchema(ByteBuf request, long requestId) {
        CompletableFuture future = this.sendRequestAndHandleTimeout(request, requestId, RequestType.GetOrCreateSchema);
        return future.thenCompose(response -> {
            if (response.hasErrorCode()) {
                PulsarApi.ServerError rc = response.getErrorCode();
                if (rc == PulsarApi.ServerError.TopicNotFound) {
                    return CompletableFuture.completedFuture(SchemaVersion.Empty.bytes());
                }
                return FutureUtil.failedFuture((Throwable)this.getPulsarClientException(rc, response.getErrorMessage()));
            }
            return CompletableFuture.completedFuture(response.getSchemaVersion().toByteArray());
        });
    }

    protected void handleNewTxnResponse(PulsarApi.CommandNewTxnResponse command) {
        TransactionMetaStoreHandler handler = this.checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
        if (handler != null) {
            handler.handleNewTxnResponse(command);
        }
    }

    protected void handleAddPartitionToTxnResponse(PulsarApi.CommandAddPartitionToTxnResponse command) {
        TransactionMetaStoreHandler handler = this.checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
        if (handler != null) {
            handler.handleAddPublishPartitionToTxnResponse(command);
        }
    }

    protected void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse command) {
        TransactionMetaStoreHandler handler = this.checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
        if (handler != null) {
            handler.handleAddSubscriptionToTxnResponse(command);
        }
    }

    protected void handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse command) {
        log.info("handleEndTxnOnPartitionResponse");
        TransactionBufferHandler handler = this.checkAndGetTransactionBufferHandler();
        if (handler != null) {
            handler.handleEndTxnOnTopicResponse(command.getRequestId(), command);
        }
    }

    protected void handleEndTxnOnSubscriptionResponse(PulsarApi.CommandEndTxnOnSubscriptionResponse command) {
        TransactionBufferHandler handler = this.checkAndGetTransactionBufferHandler();
        if (handler != null) {
            handler.handleEndTxnOnSubscriptionResponse(command.getRequestId(), command);
        }
    }

    protected void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse command) {
        TransactionMetaStoreHandler handler = this.checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
        if (handler != null) {
            handler.handleEndTxnResponse(command);
        }
    }

    private TransactionMetaStoreHandler checkAndGetTransactionMetaStoreHandler(long tcId) {
        TransactionMetaStoreHandler handler = (TransactionMetaStoreHandler)this.transactionMetaStoreHandlers.get(tcId);
        if (handler == null) {
            this.channel().close();
            log.warn("Close the channel since can't get the transaction meta store handler, will reconnect later.");
        }
        return handler;
    }

    private TransactionBufferHandler checkAndGetTransactionBufferHandler() {
        if (this.transactionBufferHandler == null) {
            this.channel().close();
            log.warn("Close the channel since can't get the transaction buffer handler.");
        }
        return this.transactionBufferHandler;
    }

    private void checkServerError(PulsarApi.ServerError error, String errMsg) {
        if (PulsarApi.ServerError.ServiceNotReady.equals((Object)error)) {
            log.error("{} Close connection because received internal-server error {}", (Object)this.ctx.channel(), (Object)errMsg);
            this.ctx.close();
        } else if (PulsarApi.ServerError.TooManyRequests.equals((Object)error)) {
            long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
            if (rejectedRequests == 0L) {
                this.eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(this, 0), 60L, TimeUnit.SECONDS);
            } else if (rejectedRequests >= (long)this.maxNumberOfRejectedRequestPerConnection) {
                log.error("{} Close connection because received {} rejected request in {} seconds ", new Object[]{this.ctx.channel(), NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(this), 60});
                this.ctx.close();
            }
        }
    }

    private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
        ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
        SSLSession sslSession = null;
        if (sslHandler != null) {
            sslSession = ((SslHandler)sslHandler).engine().getSession();
            if (log.isDebugEnabled()) {
                log.debug("Verifying HostName for {}, Cipher {}, Protocols {}", new Object[]{hostname, sslSession.getCipherSuite(), sslSession.getProtocol()});
            }
            return HOSTNAME_VERIFIER.verify(hostname, sslSession);
        }
        return false;
    }

    void registerConsumer(long consumerId, ConsumerImpl<?> consumer) {
        this.consumers.put(consumerId, consumer);
    }

    void registerProducer(long producerId, ProducerImpl<?> producer) {
        this.producers.put(producerId, producer);
    }

    void registerTransactionMetaStoreHandler(long transactionMetaStoreId, TransactionMetaStoreHandler handler) {
        this.transactionMetaStoreHandlers.put(transactionMetaStoreId, (Object)handler);
    }

    public void registerTransactionBufferHandler(TransactionBufferHandler handler) {
        this.transactionBufferHandler = handler;
    }

    void removeProducer(long producerId) {
        this.producers.remove(producerId);
    }

    void removeConsumer(long consumerId) {
        this.consumers.remove(consumerId);
    }

    void setTargetBroker(InetSocketAddress targetBrokerAddress) {
        this.proxyToTargetBrokerAddress = String.format("%s:%d", targetBrokerAddress.getHostString(), targetBrokerAddress.getPort());
    }

    void setRemoteHostName(String remoteHostName) {
        this.remoteHostName = remoteHostName;
    }

    private PulsarClientException getPulsarClientException(PulsarApi.ServerError error, String errorMsg) {
        switch (error) {
            case AuthenticationError: {
                return new PulsarClientException.AuthenticationException(errorMsg);
            }
            case AuthorizationError: {
                return new PulsarClientException.AuthorizationException(errorMsg);
            }
            case ProducerBusy: {
                return new PulsarClientException.ProducerBusyException(errorMsg);
            }
            case ConsumerBusy: {
                return new PulsarClientException.ConsumerBusyException(errorMsg);
            }
            case MetadataError: {
                return new PulsarClientException.BrokerMetadataException(errorMsg);
            }
            case PersistenceError: {
                return new PulsarClientException.BrokerPersistenceException(errorMsg);
            }
            case ServiceNotReady: {
                return new PulsarClientException.LookupException(errorMsg);
            }
            case TooManyRequests: {
                return new PulsarClientException.TooManyRequestsException(errorMsg);
            }
            case ProducerBlockedQuotaExceededError: {
                return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg);
            }
            case ProducerBlockedQuotaExceededException: {
                return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
            }
            case TopicTerminatedError: {
                return new PulsarClientException.TopicTerminatedException(errorMsg);
            }
            case IncompatibleSchema: {
                return new PulsarClientException.IncompatibleSchemaException(errorMsg);
            }
            case TopicNotFound: {
                return new PulsarClientException.TopicDoesNotExistException(errorMsg);
            }
            case ConsumerAssignError: {
                return new PulsarClientException.ConsumerAssignException(errorMsg);
            }
            case NotAllowedError: {
                return new PulsarClientException.NotAllowedException(errorMsg);
            }
            case TransactionConflict: {
                return new PulsarClientException.TransactionConflictException(errorMsg);
            }
        }
        return new PulsarClientException(errorMsg);
    }

    @VisibleForTesting
    public void close() {
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    private void checkRequestTimeout() {
        RequestTime request;
        while (!this.requestTimeoutQueue.isEmpty() && (request = this.requestTimeoutQueue.peek()) != null && System.currentTimeMillis() - request.creationTimeMs >= this.operationTimeoutMs) {
            String timeoutMessage;
            request = this.requestTimeoutQueue.poll();
            CompletableFuture requestFuture = (CompletableFuture)this.pendingRequests.remove(request.requestId);
            if (requestFuture == null || requestFuture.isDone() || !requestFuture.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), this.operationTimeoutMs)))) continue;
            log.warn("{} {}", (Object)this.ctx.channel(), (Object)timeoutMessage);
        }
    }

    public static int getMaxMessageSize() {
        return maxMessageSize;
    }

    private static enum RequestType {
        Command,
        GetLastMessageId,
        GetTopics,
        GetSchema,
        GetOrCreateSchema;


        String getDescription() {
            if (this == Command) {
                return "request";
            }
            return this.name() + " request";
        }
    }

    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;
        final RequestType requestType;

        RequestTime(long creationTime, long requestId, RequestType requestType) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
            this.requestType = requestType;
        }
    }

    static enum State {
        None,
        SentConnectFrame,
        Ready,
        Failed,
        Connecting;

    }
}

