/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetaStoreHandler
extends HandlerState
implements ConnectionHandler.Connection,
Closeable,
TimerTask {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetaStoreHandler.class);
    private final long transactionCoordinatorId;
    private ConnectionHandler connectionHandler;
    private final ConcurrentLongHashMap<OpBase<?>> pendingRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
    private final boolean blockIfReachMaxPendingOps;
    private final Semaphore semaphore;
    private Timeout requestTimeout;
    private CompletableFuture<Void> connectFuture;

    public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic, CompletableFuture<Void> connectFuture) {
        super(pulsarClient, topic);
        this.transactionCoordinatorId = transactionCoordinatorId;
        this.timeoutQueue = new ConcurrentLinkedQueue();
        this.blockIfReachMaxPendingOps = true;
        this.semaphore = new Semaphore(1000);
        this.requestTimeout = pulsarClient.timer().newTimeout((TimerTask)this, pulsarClient.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS);
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(pulsarClient.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(100L, TimeUnit.MILLISECONDS).create(), this);
        this.connectionHandler.grabCnx();
        this.connectFuture = connectFuture;
    }

    @Override
    public void connectionFailed(PulsarClientException exception) {
        LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", (Object)this.transactionCoordinatorId, (Object)exception);
        this.setState(HandlerState.State.Failed);
        this.connectFuture.completeExceptionally(exception);
    }

    @Override
    public void connectionOpened(ClientCnx cnx) {
        LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", (Object)this.transactionCoordinatorId);
        this.connectionHandler.setClientCnx(cnx);
        cnx.registerTransactionMetaStoreHandler(this.transactionCoordinatorId, this);
        if (!this.changeToReadyState()) {
            cnx.channel().close();
        }
        this.connectFuture.complete(null);
    }

    public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
        CompletableFuture<TxnID> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("New transaction with timeout in ms {}", (Object)unit.toMillis(timeout));
        }
        if (!this.canSendRequest(callback = new CompletableFuture<TxnID>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newTxn((long)this.transactionCoordinatorId, (long)requestId, (long)unit.toMillis(timeout));
        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback);
        this.pendingRequests.put(requestId, (Object)op);
        this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        cmd.retain();
        this.cnx().ctx().writeAndFlush((Object)cmd, this.cnx().ctx().voidPromise());
        return callback;
    }

    void handleNewTxnResponse(PulsarApi.CommandNewTxnResponse response) {
        OpForTxnIdCallBack op = (OpForTxnIdCallBack)this.pendingRequests.remove(response.getRequestId());
        if (op == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got new txn response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        if (!response.hasError()) {
            TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got new txn response {} for request {}", (Object)txnID, (Object)response.getRequestId());
            }
            op.callback.complete(txnID);
        } else {
            LOG.error("Got new txn for request {} error {}", (Object)response.getRequestId(), (Object)response.getError());
            op.callback.completeExceptionally((Throwable)this.getExceptionByServerError(response.getError(), response.getMessage()));
        }
        this.onResponse(op);
    }

    public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add publish partition {} to txn {}", partitions, (Object)txnID);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newAddPartitionToTxn((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), partitions);
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
        this.pendingRequests.put(requestId, (Object)op);
        this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        cmd.retain();
        this.cnx().ctx().writeAndFlush((Object)cmd, this.cnx().ctx().voidPromise());
        return callback;
    }

    void handleAddPublishPartitionToTxnResponse(PulsarApi.CommandAddPartitionToTxnResponse response) {
        OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(response.getRequestId());
        if (op == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got add publish partition to txn response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        if (!response.hasError()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Add publish partition for request {} success.", (Object)response.getRequestId());
            }
            op.callback.complete(null);
        } else {
            LOG.error("Add publish partition for request {} error {}.", (Object)response.getRequestId(), (Object)response.getError());
            op.callback.completeExceptionally((Throwable)this.getExceptionByServerError(response.getError(), response.getMessage()));
        }
        this.onResponse(op);
    }

    public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<PulsarApi.Subscription> subscriptionList) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add subscription {} to txn {}.", subscriptionList, (Object)txnID);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newAddSubscriptionToTxn((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), subscriptionList);
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, completableFuture);
        this.pendingRequests.put(requestId, (Object)op);
        this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        cmd.retain();
        this.cnx().ctx().writeAndFlush((Object)cmd, this.cnx().ctx().voidPromise());
        return completableFuture;
    }

    public void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse response) {
        OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(response.getRequestId());
        if (op == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Add subscription to txn timeout for request {}.", (Object)response.getRequestId());
            }
            return;
        }
        if (!response.hasError()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Add subscription to txn success for request {}.", (Object)response.getRequestId());
            }
            op.callback.complete(null);
        } else {
            LOG.error("Add subscription to txn failed for request {} error {}.", (Object)response.getRequestId(), (Object)response.getError());
            op.callback.completeExceptionally((Throwable)this.getExceptionByServerError(response.getError(), response.getMessage()));
        }
        this.onResponse(op);
    }

    public CompletableFuture<Void> commitAsync(TxnID txnID, List<MessageId> sendMessageIdList) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Commit txn {}", (Object)txnID);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ArrayList<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<PulsarApi.MessageIdData>();
        for (MessageId messageId : sendMessageIdList) {
            messageIdDataList.add(PulsarApi.MessageIdData.newBuilder().setLedgerId(((MessageIdImpl)messageId).getLedgerId()).setEntryId(((MessageIdImpl)messageId).getEntryId()).setPartition(((MessageIdImpl)messageId).getPartitionIndex()).build());
        }
        ByteBuf cmd = Commands.newEndTxn((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), (PulsarApi.TxnAction)PulsarApi.TxnAction.COMMIT, messageIdDataList);
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
        this.pendingRequests.put(requestId, (Object)op);
        this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        cmd.retain();
        this.cnx().ctx().writeAndFlush((Object)cmd, this.cnx().ctx().voidPromise());
        return callback;
    }

    public CompletableFuture<Void> abortAsync(TxnID txnID, List<MessageId> sendMessageIdList) {
        CompletableFuture<Void> callback;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Abort txn {}", (Object)txnID);
        }
        if (!this.canSendRequest(callback = new CompletableFuture<Void>())) {
            return callback;
        }
        long requestId = this.client.newRequestId();
        ArrayList<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<PulsarApi.MessageIdData>();
        for (MessageId messageId : sendMessageIdList) {
            messageIdDataList.add(PulsarApi.MessageIdData.newBuilder().setLedgerId(((MessageIdImpl)messageId).getLedgerId()).setEntryId(((MessageIdImpl)messageId).getEntryId()).setPartition(((MessageIdImpl)messageId).getPartitionIndex()).build());
        }
        ByteBuf cmd = Commands.newEndTxn((long)requestId, (long)txnID.getLeastSigBits(), (long)txnID.getMostSigBits(), (PulsarApi.TxnAction)PulsarApi.TxnAction.ABORT, messageIdDataList);
        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
        this.pendingRequests.put(requestId, (Object)op);
        this.timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        cmd.retain();
        this.cnx().ctx().writeAndFlush((Object)cmd, this.cnx().ctx().voidPromise());
        return callback;
    }

    void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse response) {
        OpForVoidCallBack op = (OpForVoidCallBack)this.pendingRequests.remove(response.getRequestId());
        if (op == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got end txn response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        if (!response.hasError()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got end txn response success for request {}", (Object)response.getRequestId());
            }
            op.callback.complete(null);
        } else {
            LOG.error("Got end txn response for request {} error {}", (Object)response.getRequestId(), (Object)response.getError());
            op.callback.completeExceptionally((Throwable)this.getExceptionByServerError(response.getError(), response.getMessage()));
        }
        this.onResponse(op);
    }

    private TransactionCoordinatorClientException getExceptionByServerError(PulsarApi.ServerError serverError, String msg) {
        switch (serverError) {
            case TransactionCoordinatorNotFound: {
                return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
            }
            case InvalidTxnStatus: {
                return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg);
            }
        }
        return new TransactionCoordinatorClientException(msg);
    }

    private void onResponse(OpBase<?> op) {
        ReferenceCountUtil.safeRelease((Object)op.cmd);
        op.recycle();
        this.semaphore.release();
    }

    private boolean canSendRequest(CompletableFuture<?> callback) {
        if (!this.isValidHandlerState(callback)) {
            return false;
        }
        try {
            if (this.blockIfReachMaxPendingOps) {
                this.semaphore.acquire();
            } else if (!this.semaphore.tryAcquire()) {
                callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException("Reach max pending ops."));
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.completeExceptionally((Throwable)TransactionCoordinatorClientException.unwrap((Throwable)e));
            return false;
        }
        return true;
    }

    private boolean isValidHandlerState(CompletableFuture<?> callback) {
        switch (this.getState()) {
            case Ready: {
                return true;
            }
            case Connecting: {
                callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException("Transaction meta store handler for tcId " + this.transactionCoordinatorId + " is connecting now, please try later."));
                return false;
            }
            case Closing: 
            case Closed: {
                callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException("Transaction meta store handler for tcId " + this.transactionCoordinatorId + " is closing or closed."));
                return false;
            }
            case Failed: 
            case Uninitialized: {
                callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException("Transaction meta store handler for tcId " + this.transactionCoordinatorId + " not connected."));
                return false;
            }
        }
        callback.completeExceptionally((Throwable)new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(this.transactionCoordinatorId));
        return false;
    }

    public void run(Timeout timeout) throws Exception {
        long diff;
        RequestTime lastPolled;
        if (timeout.isCancelled()) {
            return;
        }
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return;
        }
        RequestTime peeked = this.timeoutQueue.peek();
        while (peeked != null && peeked.creationTimeMs + this.client.getConfiguration().getOperationTimeoutMs() - System.currentTimeMillis() <= 0L && (lastPolled = this.timeoutQueue.poll()) != null) {
            OpBase op = (OpBase)this.pendingRequests.remove(lastPolled.requestId);
            if (op != null && !op.callback.isDone()) {
                op.callback.completeExceptionally((Throwable)new PulsarClientException.TimeoutException("Could not get response from transaction meta store within given timeout."));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Transaction coordinator request {} is timeout.", (Object)lastPolled.requestId);
                }
                this.onResponse(op);
            }
            peeked = this.timeoutQueue.peek();
        }
        long timeToWaitMs = peeked == null ? this.client.getConfiguration().getOperationTimeoutMs() : ((diff = peeked.creationTimeMs + this.client.getConfiguration().getOperationTimeoutMs() - System.currentTimeMillis()) <= 0L ? this.client.getConfiguration().getOperationTimeoutMs() : diff);
        this.requestTimeout = this.client.timer().newTimeout((TimerTask)this, timeToWaitMs, TimeUnit.MILLISECONDS);
    }

    private ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void connectionClosed(ClientCnx cnx) {
        this.connectionHandler.connectionClosed(cnx);
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public String getHandlerName() {
        return "Transaction meta store handler [" + this.transactionCoordinatorId + "]";
    }

    private static class OpForVoidCallBack
    extends OpBase<Void> {
        private final Recycler.Handle<OpForVoidCallBack> recyclerHandle;
        private static final Recycler<OpForVoidCallBack> RECYCLER = new Recycler<OpForVoidCallBack>(){

            protected OpForVoidCallBack newObject(Recycler.Handle<OpForVoidCallBack> handle) {
                return new OpForVoidCallBack(handle);
            }
        };

        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback) {
            OpForVoidCallBack op = (OpForVoidCallBack)RECYCLER.get();
            op.callback = callback;
            op.cmd = cmd;
            return op;
        }

        private OpForVoidCallBack(Recycler.Handle<OpForVoidCallBack> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        @Override
        void recycle() {
            this.recyclerHandle.recycle((Object)this);
        }
    }

    private static class OpForTxnIdCallBack
    extends OpBase<TxnID> {
        private final Recycler.Handle<OpForTxnIdCallBack> recyclerHandle;
        private static final Recycler<OpForTxnIdCallBack> RECYCLER = new Recycler<OpForTxnIdCallBack>(){

            protected OpForTxnIdCallBack newObject(Recycler.Handle<OpForTxnIdCallBack> handle) {
                return new OpForTxnIdCallBack(handle);
            }
        };

        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback) {
            OpForTxnIdCallBack op = (OpForTxnIdCallBack)RECYCLER.get();
            op.callback = callback;
            op.cmd = cmd;
            return op;
        }

        private OpForTxnIdCallBack(Recycler.Handle<OpForTxnIdCallBack> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        @Override
        void recycle() {
            this.recyclerHandle.recycle((Object)this);
        }
    }

    private static abstract class OpBase<T> {
        protected ByteBuf cmd;
        protected CompletableFuture<T> callback;

        private OpBase() {
        }

        abstract void recycle();
    }

    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;

        public RequestTime(long creationTime, long requestId) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
        }
    }
}

