/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.core;

import com.alicloud.openservices.tablestore.model.StreamDetails;
import com.aliyun.openservices.ots.internal.streamclient.ClientConfig;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.CheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.core.ShardConsumer;
import com.aliyun.openservices.ots.internal.streamclient.core.ShardSyncer;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseCoordinator;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseManagerRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLeaseSerializer;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.model.ICheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessor;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessorFactory;
import com.aliyun.openservices.ots.internal.streamclient.model.IRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import com.aliyun.openservices.ots.internal.streamclient.model.ShutdownReason;
import com.aliyun.openservices.ots.internal.streamclient.model.WorkerStatus;
import com.aliyun.openservices.ots.internal.streamclient.utils.OTSHelper;
import com.aliyun.openservices.ots.internal.streamclient.utils.Preconditions;
import com.aliyun.openservices.ots.internal.streamclient.utils.TimeUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InnerWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(InnerWorker.class);
    private final String workerIdentifier;
    private final ClientConfig clientConfig;
    private final StreamConfig streamConfig;
    private final IRecordProcessorFactory recordProcessorFactory;
    private final ILeaseManager<ShardLease> leaseManager;
    private final ExecutorService executorService;
    private final LeaseCoordinator<ShardLease> leaseCoordinator;
    private final ShardSyncer shardSyncer;
    private final ICheckpointTracker checkpointTracker;
    private final IRetryStrategy taskRetryStrategy;
    private final IRetryStrategy leaseManagerRetryStrategy;
    long lastSyncShardTimeMillis;
    private ConcurrentMap<ShardInfo, ShardConsumer> shardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
    private volatile boolean running;
    private volatile boolean shutdown;
    private Exception exception;
    private String streamId;

    public InnerWorker(String workerIdentifier, ClientConfig clientConfig, StreamConfig streamConfig, IRecordProcessorFactory recordProcessorFactory, ExecutorService executorService, ILeaseManager<ShardLease> leaseManager, LeaseCoordinator<ShardLease> shardLeaseCoordinator, ICheckpointTracker checkpointTracker) {
        Preconditions.checkArgument(workerIdentifier != null && !workerIdentifier.isEmpty(), "workerIdentifier should not be null or empty");
        Preconditions.checkNotNull(clientConfig);
        Preconditions.checkNotNull(streamConfig);
        Preconditions.checkNotNull(streamConfig.getOTSClient());
        Preconditions.checkNotNull(recordProcessorFactory);
        Preconditions.checkNotNull(executorService);
        LOG.info("Initialize inner worker.");
        LOG.info("ClientConfig: {}", (Object)clientConfig);
        LOG.info("StreamConfig: {}", (Object)streamConfig);
        this.workerIdentifier = workerIdentifier;
        this.clientConfig = clientConfig;
        this.streamConfig = streamConfig;
        this.recordProcessorFactory = recordProcessorFactory;
        this.executorService = executorService;
        StreamDetails streamDetails = OTSHelper.getStreamDetails(this.streamConfig.getOTSClient(), this.streamConfig.getDataTableName());
        if (!streamDetails.isEnableStream()) {
            throw new IllegalArgumentException("The data table does not enable stream.");
        }
        this.streamId = streamDetails.getStreamId();
        this.taskRetryStrategy = clientConfig.getTaskRetryStrategy() != null ? clientConfig.getTaskRetryStrategy() : new TaskRetryStrategy();
        this.leaseManagerRetryStrategy = clientConfig.getLeaseManagerRetryStrategy() != null ? clientConfig.getLeaseManagerRetryStrategy() : new LeaseManagerRetryStrategy();
        this.leaseManager = leaseManager != null ? leaseManager : new LeaseManager<ShardLease>(this.streamConfig.getOTSClient(), this.streamConfig.getStatusTableName(), new ShardLeaseSerializer(this.streamConfig.getStatusTableName(), this.streamId), this.leaseManagerRetryStrategy, this.clientConfig.getCheckTableReadyIntervalMillis());
        this.leaseCoordinator = shardLeaseCoordinator != null ? shardLeaseCoordinator : new LeaseCoordinator<ShardLease>(this.leaseManager, this.workerIdentifier, this.clientConfig);
        this.checkpointTracker = checkpointTracker != null ? checkpointTracker : new CheckpointTracker(this.leaseManager, this.leaseCoordinator);
        this.shardSyncer = new ShardSyncer(this.streamConfig, this.leaseManager, this.executorService, this.taskRetryStrategy);
    }

    @Override
    public void run() {
        try {
            if (this.running || this.shutdown) {
                throw new StreamClientException("Can't rerun a worker.");
            }
            this.running = true;
            this.initialize();
            this.leaseCoordinator.start();
            this.lastSyncShardTimeMillis = System.currentTimeMillis();
            while (!this.shutdown) {
                this.runProcessLoop();
                TimeUtils.sleepMillis(this.clientConfig.getWorkerIdleTimeMillis());
            }
        }
        catch (Throwable ex) {
            LOG.error("Exception: {}.", ex);
            this.exception = ex instanceof Exception ? (Exception)ex : new RuntimeException(ex);
            this.shutdown();
        }
    }

    void runProcessLoop() throws Exception {
        ArrayList<ShardInfo> heldShards = new ArrayList<ShardInfo>();
        ArrayList<ShardInfo> stolenShards = new ArrayList<ShardInfo>();
        this.getCurrentlyHeldShards(heldShards, stolenShards);
        for (ShardInfo shardInfo : heldShards) {
            ShardConsumer shardConsumer = this.createOrGetShardConsumer(shardInfo);
            if (shardConsumer.isShutdown()) continue;
            shardConsumer.consumeShard();
        }
        if (System.currentTimeMillis() - this.lastSyncShardTimeMillis > this.clientConfig.getSyncShardIntervalMillis() && this.shardSyncer.syncShardAndLeaseInfo(false)) {
            this.lastSyncShardTimeMillis = System.currentTimeMillis();
        }
        this.cleanupShardConsumers(heldShards, stolenShards);
        this.leaseCoordinator.checkRenewerAndTakerStatus(this.clientConfig.getMaxDurationBeforeLastSuccessfulRenewOrTakeLease());
    }

    public void shutdown() {
        this.running = false;
        this.shutdown = true;
        this.leaseCoordinator.stop();
    }

    public WorkerStatus getWorkerStatus() {
        if (this.exception != null) {
            return WorkerStatus.ERROR;
        }
        if (this.running) {
            return WorkerStatus.RUNNING;
        }
        if (this.shutdown) {
            return WorkerStatus.SHUTDOWN;
        }
        return WorkerStatus.NOT_RUNNING;
    }

    public Exception getException() {
        return this.exception;
    }

    private void initialize() throws StreamClientException, DependencyException {
        this.leaseCoordinator.initialize();
        this.shardSyncer.syncShardAndLeaseInfo(true);
    }

    private void getCurrentlyHeldShards(List<ShardInfo> heldShards, List<ShardInfo> stolenShards) {
        heldShards.clear();
        stolenShards.clear();
        Collection<ShardLease> heldLeases = this.leaseCoordinator.getCurrentlyHeldLeases();
        for (ShardLease shardLease : heldLeases) {
            if (shardLease.getLeaseStealer().isEmpty()) {
                LOG.debug("Currently held shard: {}", (Object)shardLease);
                heldShards.add(shardLease.toShardInfo());
                continue;
            }
            LOG.debug("Currently stolen shard: {}", (Object)shardLease);
            stolenShards.add(shardLease.toShardInfo());
        }
    }

    private ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo) {
        ShardConsumer consumer = (ShardConsumer)this.shardConsumerMap.get(shardInfo);
        if (consumer == null || consumer.isShutdown() && consumer.getShutdownReason() == ShutdownReason.PROCESS_RESTART) {
            IRecordProcessor recordProcessor = this.recordProcessorFactory.createProcessor();
            consumer = new ShardConsumer(shardInfo, this.streamConfig, this.checkpointTracker, recordProcessor, this.leaseManager, this.clientConfig.getParentShardPollIntervalMillis(), this.executorService, this.shardSyncer, this.taskRetryStrategy);
            this.shardConsumerMap.put(shardInfo, consumer);
            LOG.info("CreateNewConsumer, ShardInfo: {}.", (Object)shardInfo);
        }
        return consumer;
    }

    private void cleanupShardConsumers(List<ShardInfo> heldShards, List<ShardInfo> stolenShards) throws StreamClientException, DependencyException {
        HashSet<ShardInfo> heldShardSet = new HashSet<ShardInfo>();
        HashSet<ShardInfo> stolenShardSet = new HashSet<ShardInfo>();
        for (ShardInfo shardInfo : heldShards) {
            heldShardSet.add(shardInfo);
        }
        for (ShardInfo shardInfo : stolenShards) {
            stolenShardSet.add(shardInfo);
        }
        for (ShardInfo shardInfo : this.shardConsumerMap.keySet()) {
            if (heldShardSet.contains(shardInfo)) continue;
            boolean stolen = stolenShardSet.contains(shardInfo);
            ShutdownReason reason = stolen ? ShutdownReason.STOLEN : ShutdownReason.ZOMBIE;
            boolean isShutdown = ((ShardConsumer)this.shardConsumerMap.get(shardInfo)).beginShutdown(reason);
            LOG.info("CleanConsumer, ShardInfo: {}, Reason: {}, IsShutDown: {}.", new Object[]{shardInfo, reason, isShutdown});
            if (!isShutdown) continue;
            this.shardConsumerMap.remove(shardInfo);
            if (!stolen) continue;
            LOG.info("Transfer lease: {}.", (Object)shardInfo);
            this.leaseCoordinator.transferLease(shardInfo.getShardId(), shardInfo.getLeaseIdentifier());
        }
    }
}

