/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.core.task;

import com.aliyun.openservices.ots.internal.streamclient.core.DataFetcher;
import com.aliyun.openservices.ots.internal.streamclient.core.RecordProcessorCheckpointer;
import com.aliyun.openservices.ots.internal.streamclient.core.exceptions.ApplicationException;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ITask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskResult;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskType;
import com.aliyun.openservices.ots.internal.streamclient.model.ICheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessor;
import com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker;
import com.aliyun.openservices.ots.internal.streamclient.model.InitializationInput;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InitializeTask
implements ITask {
    private static final Logger LOG = LoggerFactory.getLogger(InitializeTask.class);
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final DataFetcher dataFetcher;
    private final ICheckpointTracker checkpointTracker;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final IShutdownMarker shutdownMarker;

    public InitializeTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, ICheckpointTracker checkpointTracker, RecordProcessorCheckpointer recordProcessorCheckpointer, DataFetcher dataFetcher, IShutdownMarker shutdownMarker) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.checkpointTracker = checkpointTracker;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = dataFetcher;
        this.shutdownMarker = shutdownMarker;
    }

    @Override
    public TaskResult call() {
        LOG.debug("Start, ShardId: {}.", (Object)this.shardInfo.getShardId());
        try {
            String initialCheckpoint = this.checkpointTracker.getCheckpoint(this.shardInfo.getShardId());
            if (initialCheckpoint == null || initialCheckpoint.isEmpty()) {
                initialCheckpoint = "TRIM_HORIZON";
            }
            LOG.debug("ShardId: {}, InitialCheckpoint: {}.", (Object)this.shardInfo.getShardId(), (Object)initialCheckpoint);
            this.dataFetcher.initialize(initialCheckpoint);
            this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
            InitializationInput initializationInput = new InitializationInput();
            initializationInput.setShardInfo(this.shardInfo);
            initializationInput.setInitialCheckpoint(initialCheckpoint);
            initializationInput.setCheckpointer(this.recordProcessorCheckpointer);
            initializationInput.setShutdownMarker(this.shutdownMarker);
            try {
                this.recordProcessor.initialize(initializationInput);
            }
            catch (Exception e) {
                throw new ApplicationException("ApplicationInitializeError", e);
            }
            LOG.debug("Complete, ShardId: {}.", (Object)this.shardInfo.getShardId());
            return new TaskResult(true);
        }
        catch (Exception e) {
            LOG.warn("ShardId: {}, Exception: {}.", (Object)this.shardInfo.getShardId(), (Object)e);
            return new TaskResult(e);
        }
    }

    @Override
    public TaskType getTaskType() {
        return TaskType.INITIALIZE;
    }
}

