package com.taobao.kelude.common.search.dump;

import com.taobao.kelude.common.exception.ExceptionLog;
import com.taobao.kelude.common.search.SearchClient;
import com.taobao.kelude.common.search.dataloader.DataLoader;
import com.taobao.kelude.common.search.notify.DumpObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/taobao/kelude/common/search/dump/AsyncSearchDumper.class */
public class AsyncSearchDumper<T> implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(AsyncSearchDumper.class);
    private SearchClient<T> searchClient;
    private DumpObserver dumpObserver;
    private DataLoader<T> dataLoader;
    private final int maxDumpTask = 1024;
    private final int dumpWorkerNumber = 4;
    private LinkedBlockingQueue<T> dumpQueue = new LinkedBlockingQueue<>(1024);
    private LinkedBlockingQueue<Integer> dumpFailureQueue = new LinkedBlockingQueue<>();
    private final int NOTIFY_LATENCY = 60000;

    public SearchClient<T> getSearchClient() {
        return this.searchClient;
    }

    public void setSearchClient(SearchClient<T> searchClient) {
        this.searchClient = searchClient;
    }

    public DumpObserver getDumpObserver() {
        return this.dumpObserver;
    }

    public void setDumpObserver(DumpObserver dumpObserver) {
        this.dumpObserver = dumpObserver;
    }

    public DataLoader<T> getDataLoader() {
        return this.dataLoader;
    }

    public void setDataLoader(DataLoader<T> dataLoader) {
        this.dataLoader = dataLoader;
    }

    public void initialize() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        Runnable runnable = new Runnable() { // from class: com.taobao.kelude.common.search.dump.AsyncSearchDumper.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                AsyncSearchDumper.logger.info(AsyncSearchDumper.this.dataLoader.getDataTypeName() + " dump worker started.");
                while (true) {
                    try {
                        Object take = AsyncSearchDumper.this.dumpQueue.take();
                        if (AsyncSearchDumper.this.searchClient.dump(take)) {
                            AsyncSearchDumper.logger.info("dumping " + AsyncSearchDumper.this.dataLoader.getDataTypeName() + " " + AsyncSearchDumper.this.dataLoader.fetchObjectId(take));
                        } else {
                            AsyncSearchDumper.logger.error("dump " + AsyncSearchDumper.this.dataLoader.getDataTypeName() + " " + AsyncSearchDumper.this.dataLoader.fetchObjectId(take) + " failed");
                            AsyncSearchDumper.this.dumpFailureQueue.add(AsyncSearchDumper.this.dataLoader.fetchObjectId(take));
                        }
                    } catch (InterruptedException e) {
                        AsyncSearchDumper.logger.error(AsyncSearchDumper.this.dataLoader.getDataTypeName() + " dump worker received interrupt exception, exiting worker thread.");
                        ExceptionLog.printStackTrace(ExceptionLog.BUS_SEARCH, e, "dump error");
                        if (AsyncSearchDumper.this.dumpObserver != null) {
                            AsyncSearchDumper.this.dumpObserver.sendNotify("Dumper.Died", AsyncSearchDumper.this.dataLoader.getDataTypeName() + " dump thread has been interrupted");
                        }
                    } catch (Exception e2) {
                        ExceptionLog.printStackTrace(ExceptionLog.BUS_SEARCH, e2, "dump error");
                    }
                }
            }
        };
        for (int i = 0; i < 4; i++) {
            newFixedThreadPool.execute(runnable);
        }
        newFixedThreadPool.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Integer> getDumpFailureObjects() {
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.addAll(this.dumpFailureQueue);
            this.dumpFailureQueue.clear();
        } catch (Exception e) {
        }
        return arrayList;
    }

    public void initializeNotifyWorker() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        Runnable runnable = new Runnable() { // from class: com.taobao.kelude.common.search.dump.AsyncSearchDumper.2
            @Override // java.lang.Runnable
            public void run() {
                AsyncSearchDumper.logger.info(AsyncSearchDumper.this.dataLoader.getDataTypeName() + " notify worker started");
                while (true) {
                    try {
                        Thread.sleep(60000L);
                        if (!AsyncSearchDumper.this.dumpFailureQueue.isEmpty()) {
                            AsyncSearchDumper.this.dumpObserver.sendNotify("Dump.Failure", AsyncSearchDumper.this.dataLoader.getFailureReport(AsyncSearchDumper.this.getDumpFailureObjects()));
                        }
                    } catch (InterruptedException e) {
                        AsyncSearchDumper.logger.error(AsyncSearchDumper.this.dataLoader.getDataTypeName() + " notify worker received interrupt exception, exiting notify worker.");
                        AsyncSearchDumper.this.dumpObserver.sendNotify("Notify.Died", AsyncSearchDumper.this.dataLoader.getDataTypeName() + " notify thread has been interrupted");
                    } catch (Exception e2) {
                        AsyncSearchDumper.this.dumpObserver.sendNotify("Notify.Died", AsyncSearchDumper.this.dataLoader.getDataTypeName() + " notify thread has encounterred an unknown exception" + e2.getMessage());
                    }
                }
            }
        };
        for (int i = 0; i < 1; i++) {
            newFixedThreadPool.execute(runnable);
        }
        newFixedThreadPool.shutdown();
    }

    public boolean enqueue(T t) {
        if (t == null) {
            return false;
        }
        try {
            return this.dumpQueue.add(t);
        } catch (Exception e) {
            return false;
        }
    }

    public void afterPropertiesSet() throws Exception {
        initialize();
        initializeNotifyWorker();
    }
}
