package com.taobao.kelude.common.tunnel;

import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.taobao.kelude.common.exception.ExceptionLog;
import com.taobao.kelude.common.search.TSearchDriver;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/taobao/kelude/common/tunnel/OdpsTunnelTransferWorker.class */
class OdpsTunnelTransferWorker implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(OdpsTunnelTransferWorker.class);
    private OdpsTunnelTableWriter odpsTunnelTableWriter;
    private static final int MAX_ODPS_DATA_BLOCK_TRANSFER_TIME_INTERVAL = 30;
    private static final int MAX_ODPS_DATA_BLOCK_WAITING_NUMBER_LMT = 1000;
    private boolean keepWorking = true;
    private OdpsTunnelTransferPostHandler odpsTunnelTransferPostHandler = new OdpsTunnelTransferPostHandler();
    private BlockingQueue<OdpsTunnelDataBlock> queue = new LinkedBlockingQueue(4000);

    public OdpsTunnelTransferWorker(String str, String str2, String str3, TableTunnel tableTunnel) {
        this.odpsTunnelTableWriter = new OdpsTunnelTableWriter(str, str2, str3, tableTunnel);
    }

    public boolean enqueue(OdpsTunnelDataBlock odpsTunnelDataBlock) {
        return this.queue.add(odpsTunnelDataBlock);
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        boolean z = false;
        while (this.keepWorking) {
            if (i >= MAX_ODPS_DATA_BLOCK_TRANSFER_TIME_INTERVAL) {
                z = true;
                i = 0;
            }
            if (this.queue.size() > MAX_ODPS_DATA_BLOCK_WAITING_NUMBER_LMT) {
                z = true;
            }
            if (z) {
                fire();
                z = false;
            } else {
                try {
                    Thread.sleep(1000L);
                    i++;
                } catch (InterruptedException e) {
                }
            }
        }
        fire();
    }

    private void fire() {
        int size = this.queue.size();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (int i = 0; i < size; i++) {
            OdpsTunnelDataBlock poll = this.queue.poll();
            hashMap.put(poll.getId(), poll);
            Long l = (Long) hashMap2.get(poll.getId());
            if (l == null) {
                l = 0L;
            }
            hashMap2.put(poll.getId(), Long.valueOf(l.longValue() + 1));
        }
        try {
            this.odpsTunnelTableWriter.transfer(hashMap.values());
            if (this.odpsTunnelTransferPostHandler != null) {
                this.odpsTunnelTransferPostHandler.onDataBlockTransferSucceed(hashMap2);
                logger.info("write " + size + " records into ODPS table succeed");
            }
        } catch (IOException e) {
            ExceptionLog.printStackTrace(ExceptionLog.BUS_COMMON, e, TSearchDriver.QUERY_OP_NONE);
            if (this.odpsTunnelTransferPostHandler != null) {
                this.odpsTunnelTransferPostHandler.onDataBlockTransferFailure(hashMap2);
                logger.info("write " + size + " records into ODPS table failed because of IOException " + e.getMessage());
            }
        } catch (Exception e2) {
            ExceptionLog.printStackTrace(ExceptionLog.BUS_COMMON, e2, TSearchDriver.QUERY_OP_NONE);
            if (this.odpsTunnelTransferPostHandler != null) {
                this.odpsTunnelTransferPostHandler.onDataBlockTransferFailure(hashMap2);
                logger.info("write " + size + " records into ODPS table failed because of Exception " + e2.getMessage());
            }
        } catch (TunnelException e3) {
            ExceptionLog.printStackTrace(ExceptionLog.BUS_COMMON, e3, TSearchDriver.QUERY_OP_NONE);
            if (this.odpsTunnelTransferPostHandler != null) {
                this.odpsTunnelTransferPostHandler.onDataBlockTransferFailure(hashMap2);
                logger.info("write " + size + " records into ODPS table failed because of TunnelException " + e3.getMessage());
            }
        }
    }

    public void shutdown() {
        this.keepWorking = false;
    }
}
