package com.volcengine.service.tls.consumer;

import com.volcengine.model.tls.Const;
import com.volcengine.model.tls.consumer.CheckpointInfo;
import com.volcengine.model.tls.consumer.ConsumeShard;
import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.consumer.ConsumerStatus;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.request.ConsumeLogsRequest;
import com.volcengine.model.tls.request.DescribeCheckpointRequest;
import com.volcengine.model.tls.request.DescribeCursorRequest;
import com.volcengine.model.tls.response.ConsumeLogsResponse;
import com.volcengine.model.tls.response.DescribeCheckpointResponse;
import com.volcengine.service.tls.TLSLogClient;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/volcengine/service/tls/consumer/LogConsumer.class */
public class LogConsumer {
    private static final Log LOG = LogFactory.getLog(LogConsumer.class);
    public static final int BACK_OFF_CODE = 429;
    private ConsumerConfig consumerConfig;
    private TLSLogClient tlsClient;
    private LogProcessor logProcessor;
    private ExecutorService executor;
    private final ConsumeShard shard;
    private String nextCheckpoint;
    private PutLogRequest.LogGroupList currLogGroupList;
    private LocalDateTime lastBackoffTime;
    private CheckpointTracker checkpointTracker;
    private volatile boolean isTaskFinished;
    private ConsumerStatus status = ConsumerStatus.PENDING;
    private ReentrantReadWriteLock statusLock = new ReentrantReadWriteLock();

    public LogConsumer(ConsumerImpl consumerImpl, ConsumeShard consumeShard) {
        this.consumerConfig = consumerImpl.consumerConfig;
        this.tlsClient = consumerImpl.tlsClient;
        this.logProcessor = consumerImpl.logProcessor;
        this.executor = consumerImpl.executor;
        this.shard = consumeShard;
        this.checkpointTracker = new CheckpointTracker(consumerImpl, consumeShard);
    }

    public void run() {
        switch (loadStatus()) {
            case PENDING:
                setStatus(ConsumerStatus.INITIALIZING);
                this.executor.execute(() -> {
                    this.isTaskFinished = false;
                    try {
                        init();
                        setStatus(ConsumerStatus.READY_TO_FETCH);
                    } catch (Exception e) {
                        LOG.error("Init log consumer failed!", e);
                        setStatus(ConsumerStatus.PENDING);
                    } finally {
                        this.isTaskFinished = true;
                    }
                });
                return;
            case READY_TO_FETCH:
                setStatus(ConsumerStatus.FETCHING);
                this.executor.execute(() -> {
                    this.isTaskFinished = false;
                    try {
                        try {
                            try {
                                fetchData();
                                setStatus(ConsumerStatus.READY_TO_CONSUME);
                                this.isTaskFinished = true;
                            } catch (Exception e) {
                                LOG.error("Fetch data failed!", e);
                                setStatus(ConsumerStatus.READY_TO_FETCH);
                                this.isTaskFinished = true;
                            }
                        } catch (LogException e2) {
                            if (e2.getErrorMessage().contains(Const.ERROR_CONSUMER_HEARTBEAT_EXPIRED)) {
                                try {
                                    this.checkpointTracker.uploadCheckpoint();
                                } catch (Exception e3) {
                                    LOG.error("Upload checkpoint failed when consumer expired.", e3);
                                }
                                this.checkpointTracker.stop();
                                setStatus(ConsumerStatus.WAIT_FOR_RESTART);
                            } else if (e2.getHttpCode() == 429) {
                                setStatus(ConsumerStatus.BACKOFF);
                            } else {
                                LOG.error("Fetch log data failed!", e2);
                                setStatus(ConsumerStatus.READY_TO_FETCH);
                            }
                            this.isTaskFinished = true;
                        }
                    } catch (Throwable th) {
                        this.isTaskFinished = true;
                        throw th;
                    }
                });
                return;
            case READY_TO_CONSUME:
                setStatus(ConsumerStatus.CONSUMING);
                this.executor.execute(() -> {
                    this.isTaskFinished = false;
                    try {
                        consume();
                        setStatus(ConsumerStatus.READY_TO_FETCH);
                    } catch (Exception e) {
                        LOG.error("consume error.", e);
                        setStatus(ConsumerStatus.READY_TO_CONSUME);
                    } finally {
                        this.isTaskFinished = true;
                    }
                });
                return;
            case BACKOFF:
                if (backoff()) {
                    setStatus(ConsumerStatus.BACKOFF);
                    return;
                } else {
                    setStatus(ConsumerStatus.READY_TO_FETCH);
                    return;
                }
            case INITIALIZING:
            case FETCHING:
            case CONSUMING:
            case WAIT_FOR_RESTART:
            default:
                return;
        }
    }

    public void setStatus(ConsumerStatus consumerStatus) {
        this.statusLock.writeLock().lock();
        this.status = consumerStatus;
        this.statusLock.writeLock().unlock();
    }

    public ConsumerStatus loadStatus() {
        this.statusLock.readLock().lock();
        ConsumerStatus consumerStatus = this.status;
        this.statusLock.readLock().unlock();
        return consumerStatus;
    }

    public void stop() {
        int i = 0;
        while (!this.isTaskFinished) {
            int i2 = i;
            i++;
            if (i2 >= this.consumerConfig.getStopTimeout()) {
                break;
            }
            ConsumerUtil.sleep(1000L);
            LOG.debug("LogConsumer stop failed " + this.status + " times: " + i);
        }
        this.checkpointTracker.stop();
        try {
            this.checkpointTracker.uploadCheckpoint();
        } catch (Exception e) {
            LOG.error("Upload checkpoint failed when received stop signal.", e);
        }
    }

    private void init() throws LogException {
        this.checkpointTracker.start();
        String projectID = this.consumerConfig.getProjectID();
        String topicID = this.shard.getTopicID();
        int shardID = this.shard.getShardID();
        try {
            DescribeCheckpointResponse describeCheckPoint = this.tlsClient.describeCheckPoint(new DescribeCheckpointRequest(projectID, topicID, shardID, this.consumerConfig.getConsumerGroupName()));
            if (!describeCheckPoint.getCheckpoint().isEmpty()) {
                this.nextCheckpoint = describeCheckPoint.getCheckpoint();
                return;
            }
            try {
                this.nextCheckpoint = this.tlsClient.describeCursor(new DescribeCursorRequest(topicID, Integer.valueOf(shardID), this.consumerConfig.getConsumeFrom())).getCursor();
            } catch (LogException e) {
                LOG.error("Initializing log consumer failed in getting cursor.");
                throw e;
            }
        } catch (LogException e2) {
            LOG.error("Initializing log consumer failed in getting checkpoint.");
            throw e2;
        }
    }

    private void fetchData() throws LogException {
        this.lastBackoffTime = LocalDateTime.now();
        ConsumeLogsRequest consumeLogsRequest = new ConsumeLogsRequest();
        consumeLogsRequest.setTopicId(this.shard.getTopicID());
        consumeLogsRequest.setShardId(Integer.valueOf(this.shard.getShardID()));
        consumeLogsRequest.setCursor(this.nextCheckpoint);
        consumeLogsRequest.setLogGroupCount(Integer.valueOf(this.consumerConfig.getMaxFetchLogGroupCount()));
        consumeLogsRequest.setCompression(Const.LZ4);
        consumeLogsRequest.setConsumerGroupName(this.consumerConfig.getConsumerGroupName());
        consumeLogsRequest.setConsumerName(this.consumerConfig.getConsumerName());
        ConsumeLogsResponse consumeLogs = this.tlsClient.consumeLogs(consumeLogsRequest);
        this.currLogGroupList = consumeLogs.getLogGroupList();
        this.nextCheckpoint = consumeLogs.getXTlsCursor();
    }

    private void consume() {
        if (this.currLogGroupList == null || this.currLogGroupList.getLogGroupsCount() == 0) {
            return;
        }
        this.logProcessor.process(this.shard.getTopicID(), this.shard.getShardID(), this.currLogGroupList);
        this.checkpointTracker.setCheckpoint(new CheckpointInfo(this.nextCheckpoint, this.shard));
    }

    private boolean backoff() {
        return Duration.between(this.lastBackoffTime, LocalDateTime.now()).compareTo(Duration.ofSeconds(5L)) <= 0;
    }
}
