/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.log.LogOffsetSnapshot;
import kafka.server.DelayedFetchMetrics$;
import kafka.server.DelayedOperation;
import kafka.server.DelayedOperation$;
import kafka.server.FetchHighWatermark$;
import kafka.server.FetchIsolation;
import kafka.server.FetchLogEnd$;
import kafka.server.FetchMetadata;
import kafka.server.FetchPartitionData;
import kafka.server.FetchPartitionStatus;
import kafka.server.FetchTxnCommitted$;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.requests.FetchRequest;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.NonLocalReturnControl$mcZ$sp;

@ScalaSignature(bytes="\u0006\u0001]3AAC\u0006\u0001!!IQ\u0003\u0001B\u0001B\u0003%a\u0003\b\u0005\t;\u0001\u0011\t\u0011)A\u0005=!A\u0011\u0005\u0001B\u0001B\u0003%!\u0005\u0003\u0005&\u0001\t\u0005\t\u0015!\u0003'\u0011!I\u0003A!A!\u0002\u0013Q\u0003\"B$\u0001\t\u0003A\u0005\"B(\u0001\t\u0003\u0002\u0006\"\u0002+\u0001\t\u0003*\u0006\"\u0002,\u0001\t\u0003*&\u0001\u0004#fY\u0006LX\r\u001a$fi\u000eD'B\u0001\u0007\u000e\u0003\u0019\u0019XM\u001d<fe*\ta\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0002C\u0001\n\u0014\u001b\u0005Y\u0011B\u0001\u000b\f\u0005A!U\r\\1zK\u0012|\u0005/\u001a:bi&|g.A\u0004eK2\f\u00170T:\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\t1{gnZ\u0005\u0003+M\tQBZ3uG\"lU\r^1eCR\f\u0007C\u0001\n \u0013\t\u00013BA\u0007GKR\u001c\u0007.T3uC\u0012\fG/Y\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s!\t\u00112%\u0003\u0002%\u0017\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018!B9v_R\f\u0007C\u0001\n(\u0013\tA3B\u0001\u0007SKBd\u0017nY1Rk>$\u0018-\u0001\tsKN\u0004xN\\:f\u0007\u0006dGNY1dWB!qcK\u0017E\u0013\ta\u0003DA\u0005Gk:\u001cG/[8ocA\u0019a&M\u001a\u000e\u0003=R!\u0001\r\r\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u00023_\t\u00191+Z9\u0011\t]!d'Q\u0005\u0003ka\u0011a\u0001V;qY\u0016\u0014\u0004CA\u001c@\u001b\u0005A$BA\u001d;\u0003\u0019\u0019w.\\7p]*\u0011ab\u000f\u0006\u0003yu\na!\u00199bG\",'\"\u0001 \u0002\u0007=\u0014x-\u0003\u0002Aq\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007C\u0001\nC\u0013\t\u00195B\u0001\nGKR\u001c\u0007\u000eU1si&$\u0018n\u001c8ECR\f\u0007CA\fF\u0013\t1\u0005D\u0001\u0003V]&$\u0018A\u0002\u001fj]&$h\b\u0006\u0004J\u0015.cUJ\u0014\t\u0003%\u0001AQ!\u0006\u0004A\u0002YAQ!\b\u0004A\u0002yAQ!\t\u0004A\u0002\tBQ!\n\u0004A\u0002\u0019BQ!\u000b\u0004A\u0002)\n1\u0002\u001e:z\u0007>l\u0007\u000f\\3uKR\t\u0011\u000b\u0005\u0002\u0018%&\u00111\u000b\u0007\u0002\b\u0005>|G.Z1o\u00031yg.\u0012=qSJ\fG/[8o)\u0005!\u0015AC8o\u0007>l\u0007\u000f\\3uK\u0002")
public class DelayedFetch
extends DelayedOperation {
    private final FetchMetadata fetchMetadata;
    private final ReplicaManager replicaManager;
    private final ReplicaQuota quota;
    private final Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> responseCallback;

    @Override
    public boolean tryComplete() {
        boolean bl;
        Object object = new Object();
        try {
            IntRef accumulatedSize = IntRef.create(0);
            this.fetchMetadata.fetchPartitionStatus().foreach((Function1<Tuple2, Object> & Serializable & scala.Serializable)x0$1 -> {
                DelayedFetch.$anonfun$tryComplete$1(this, object, accumulatedSize, x0$1);
                return BoxedUnit.UNIT;
            });
            bl = accumulatedSize.elem >= this.fetchMetadata.fetchMinBytes() ? this.forceComplete() : false;
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                bl = ex.value$mcZ$sp();
            }
            throw ex;
        }
        return bl;
    }

    @Override
    public void onExpiration() {
        if (this.fetchMetadata.isFromFollower()) {
            DelayedFetchMetrics$.MODULE$.followerExpiredRequestMeter().mark();
        } else {
            DelayedFetchMetrics$.MODULE$.consumerExpiredRequestMeter().mark();
        }
    }

    @Override
    public void onComplete() {
        Seq<Tuple2<TopicPartition, LogReadResult>> logReadResults = this.replicaManager.readFromLocalLog(this.fetchMetadata.replicaId(), this.fetchMetadata.fetchOnlyLeader(), this.fetchMetadata.fetchIsolation(), this.fetchMetadata.fetchMaxBytes(), this.fetchMetadata.hardMaxBytesLimit(), this.fetchMetadata.fetchPartitionStatus().map((Function1<Tuple2, Tuple2> & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            FetchPartitionStatus status = (FetchPartitionStatus)tuple2._2();
            Tuple2<TopicPartition, FetchRequest.PartitionData> tuple22 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), status.fetchInfo());
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom()), this.quota);
        Seq fetchPartitionData = logReadResults.map((Function1<Tuple2, Tuple2> & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            LogReadResult result2 = (LogReadResult)tuple2._2();
            Tuple2<TopicPartition, FetchPartitionData> tuple22 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), new FetchPartitionData(result2.error(), result2.highWatermark(), result2.leaderLogStartOffset(), result2.info().records(), result2.lastStableOffset(), result2.info().abortedTransactions()));
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom());
        this.responseCallback.apply(fetchPartitionData);
    }

    public static final /* synthetic */ void $anonfun$tryComplete$1(DelayedFetch $this, Object nonLocalReturnKey1$1, IntRef accumulatedSize$1, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            TopicPartition topicPartition = (TopicPartition)tuple2._1();
            FetchPartitionStatus fetchStatus = (FetchPartitionStatus)tuple2._2();
            LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata();
            Optional<Integer> fetchLeaderEpoch = fetchStatus.fetchInfo().currentLeaderEpoch;
            try {
                LogOffsetMetadata logOffsetMetadata = fetchOffset;
                LogOffsetMetadata logOffsetMetadata2 = LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata();
                if (logOffsetMetadata == null ? logOffsetMetadata2 != null : !((Object)logOffsetMetadata).equals(logOffsetMetadata2)) {
                    LogOffsetMetadata logOffsetMetadata3;
                    Partition partition = $this.replicaManager.getPartitionOrException(topicPartition, $this.fetchMetadata.fetchOnlyLeader());
                    LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, $this.fetchMetadata.fetchOnlyLeader());
                    FetchIsolation fetchIsolation = $this.fetchMetadata.fetchIsolation();
                    if (FetchLogEnd$.MODULE$.equals(fetchIsolation)) {
                        logOffsetMetadata3 = offsetSnapshot.logEndOffset();
                    } else if (FetchHighWatermark$.MODULE$.equals(fetchIsolation)) {
                        logOffsetMetadata3 = offsetSnapshot.highWatermark();
                    } else if (FetchTxnCommitted$.MODULE$.equals(fetchIsolation)) {
                        logOffsetMetadata3 = offsetSnapshot.lastStableOffset();
                    } else {
                        throw new MatchError(fetchIsolation);
                    }
                    LogOffsetMetadata endOffset = logOffsetMetadata3;
                    if (endOffset.messageOffset() != fetchOffset.messageOffset()) {
                        if (endOffset.onOlderSegment(fetchOffset)) {
                            $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Satisfying fetch ").append($this.fetchMetadata).append(" since it is fetching later segments of partition ").append(topicPartition).append(".").toString());
                            throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
                        }
                        if (fetchOffset.onOlderSegment(endOffset)) {
                            $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Satisfying fetch ").append($this.fetchMetadata).append(" immediately since it is fetching older segments.").toString());
                            if (!$this.replicaManager.shouldLeaderThrottle($this.quota, topicPartition, $this.fetchMetadata.replicaId())) {
                                throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
                            }
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        if (fetchOffset.messageOffset() < endOffset.messageOffset()) {
                            int bytesAvailable = package$.MODULE$.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo().maxBytes);
                            if (!$this.replicaManager.shouldLeaderThrottle($this.quota, topicPartition, $this.fetchMetadata.replicaId())) {
                                accumulatedSize$1.elem += bytesAvailable;
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    boxedUnit = BoxedUnit.UNIT;
                }
                boxedUnit = BoxedUnit.UNIT;
            }
            catch (KafkaStorageException kafkaStorageException) {
                $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringBuilder(63).append("Partition ").append(topicPartition).append(" is in an offline log directory, satisfy ").append($this.fetchMetadata).append(" immediately").toString());
                throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
            catch (UnknownTopicOrPartitionException unknownTopicOrPartitionException) {
                $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringBuilder(58).append("Broker no longer knows of partition ").append(topicPartition).append(", satisfy ").append($this.fetchMetadata).append(" immediately").toString());
                throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
            catch (FencedLeaderEpochException fencedLeaderEpochException) {
                $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringBuilder(60).append("Broker is the leader of partition ").append(topicPartition).append(", but the requested epoch ").append(new StringBuilder(59).append(fetchLeaderEpoch).append(" is fenced by the latest leader epoch, satisfy ").append($this.fetchMetadata).append(" immediately").toString()).toString());
                throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
            catch (NotLeaderForPartitionException notLeaderForPartitionException) {
                $this.debug((Function0<String> & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Broker is no longer the leader of %s, satisfy %s immediately")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicPartition, $this.fetchMetadata})));
                throw new NonLocalReturnControl$mcZ$sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
        } else {
            throw new MatchError(tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public DelayedFetch(long delayMs, FetchMetadata fetchMetadata, ReplicaManager replicaManager, ReplicaQuota quota, Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> responseCallback) {
        this.fetchMetadata = fetchMetadata;
        this.replicaManager = replicaManager;
        this.quota = quota;
        this.responseCallback = responseCallback;
        super(delayMs, DelayedOperation$.MODULE$.$lessinit$greater$default$2());
    }
}

