package com.aliyun.odps.graph.local.worker;

import com.aliyun.odps.counter.Counters;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.JobConf;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.WorkerComputer;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.local.BaseRecordReader;
import com.aliyun.odps.graph.local.COUNTER;
import com.aliyun.odps.graph.local.EmptyRecordReader;
import com.aliyun.odps.graph.local.GraphTaskAttemptID;
import com.aliyun.odps.graph.local.InputSplit;
import com.aliyun.odps.graph.local.LocalRecordReader;
import com.aliyun.odps.graph.local.LocalRecordWriter;
import com.aliyun.odps.graph.local.LocalVertexMutations;
import com.aliyun.odps.graph.local.RuntimeContext;
import com.aliyun.odps.graph.local.SQLRecord;
import com.aliyun.odps.graph.local.TaskContextImpl;
import com.aliyun.odps.graph.local.master.Master;
import com.aliyun.odps.graph.local.message.MsgManager;
import com.aliyun.odps.graph.local.utils.LocalGraphRunUtils;
import com.aliyun.odps.graph.utils.VerifyUtils;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableUtils;
import com.aliyun.odps.utils.ReflectionUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/aliyun/odps/graph/local/worker/Worker.class */
public class Worker<VERTEX_ID extends WritableComparable<?>, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable, MESSAGE extends Writable, VALUE extends Writable> {
    private static Log LOG = LogFactory.getLog(Worker.class);
    private List<Aggregator> mAggregators;
    private List<Writable> mAggregatorValues;
    private Master master;
    private RuntimeContext mCtx;
    private InputSplit mInput;
    private JobConf mJob;
    private List<Writable> mLastAggregatorValues;
    private Map<String, TableInfo> mOutputs;
    private GraphTaskAttemptID mTaskAttemptID;
    private TaskContextImpl mTaskContext;
    private WorkerComputer mWorkerComputer;
    private int mWorkerID;
    private int mWorkerNum;
    private Map<String, LocalRecordWriter> mWriters;
    private Writable mWorkerValue;
    private Combiner mCombiner;
    private Map<Vertex, Iterable<Writable>> mLastStepMessage = new HashMap();
    private Map<VERTEX_ID, Vertex> vertices = new HashMap();
    private Counters mCounters = new Counters();
    private MsgManager mMsgManager = new MsgManager();
    private Map<VERTEX_ID, LocalVertexMutations> mVertexMutations = new HashMap();

    public Worker(JobConf jobConf, RuntimeContext runtimeContext, Master master, GraphTaskAttemptID graphTaskAttemptID, int i, int i2, InputSplit inputSplit, Map<String, TableInfo> map) throws InstantiationException, IllegalAccessException, IOException, ClassNotFoundException {
        this.mJob = jobConf;
        this.mCtx = runtimeContext;
        this.master = master;
        this.mTaskAttemptID = graphTaskAttemptID;
        this.mWorkerID = i;
        this.mWorkerNum = i2;
        this.mInput = inputSplit;
        this.mOutputs = map;
        this.mAggregators = LocalGraphRunUtils.getAggregator(this.mJob);
        this.mTaskContext = new TaskContextImpl(this.mCtx, this.mJob, this, this.mWorkerID, this.mWorkerNum, this.mOutputs, this.mCounters);
    }

    public boolean allVertexVoltHalt() {
        boolean z = true;
        Iterator<Vertex> it = this.vertices.values().iterator();
        while (it.hasNext()) {
            z = z && it.next().isHalted();
        }
        return z && !this.mMsgManager.hasNextStepMessages();
    }

    private void initCombiner() {
        Class combinerClass = this.mJob.getCombinerClass();
        if (combinerClass == null) {
            this.mCombiner = null;
            return;
        }
        try {
            this.mCombiner = (Combiner) combinerClass.newInstance();
            this.mCombiner.configure(this.mJob);
        } catch (Exception e) {
            throw new RuntimeException("exception occored when Instantiate combiner ", e);
        }
    }

    private void initWorkerComputer() throws IOException {
        try {
            Class workerComputerClass = this.mJob.getWorkerComputerClass();
            this.mWorkerComputer = (WorkerComputer) workerComputerClass.newInstance();
            List typeArguments = ReflectionUtils.getTypeArguments(WorkerComputer.class, workerComputerClass);
            if (typeArguments.size() > 1) {
                throw new IOException("more than ONE workerValue Type Declared");
            }
            if (typeArguments.size() == 0) {
                this.mWorkerValue = NullWritable.get();
            } else if (typeArguments.get(0) == null) {
                this.mWorkerValue = NullWritable.get();
            } else {
                this.mWorkerValue = (Writable) ReflectionUtils.newInstance((Class) typeArguments.get(0), this.mJob);
            }
            this.mWorkerComputer.setup(this.mTaskContext, this.mWorkerValue);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void initAggregatorValues() throws IOException {
        this.mLastAggregatorValues = new ArrayList();
        Iterator<Aggregator> it = this.mAggregators.iterator();
        while (it.hasNext()) {
            this.mLastAggregatorValues.add(it.next().createStartupValue(this.mTaskContext));
        }
    }

    public void loadGraph() throws IOException {
        BaseRecordReader emptyRecordReader;
        File file;
        GraphLoader graphLoader = (GraphLoader) ReflectionUtils.newInstance(this.mJob.getGraphLoaderClass(), this.mJob);
        graphLoader.setup(this.mTaskContext.getConfiguration(), this.mTaskContext.getWorkerId(), this.mInput.getTable(), this.mTaskContext);
        if (this.mInput == null || this.mInput.getTable() == null) {
            emptyRecordReader = new EmptyRecordReader();
        } else {
            String str = this.mInput.getTable().getProjectName() + "." + this.mInput.getTable().getTableName();
            File parentFile = this.mInput.getFile().getParentFile();
            while (true) {
                file = parentFile;
                if (file.getName().equals(this.mInput.getTable().getTableName())) {
                    break;
                } else {
                    parentFile = file.getParentFile();
                }
            }
            emptyRecordReader = new LocalRecordReader(this.mInput.getFile().getParentFile(), file, this.mCounters.findCounter(COUNTER.TASK_INPUT_RECORD), this.mCounters.findCounter(COUNTER.TASK_INPUT_BYTE));
        }
        this.mWriters = new HashMap();
        for (String str2 : this.mOutputs.keySet()) {
            this.mWriters.put(str2, new LocalRecordWriter(new File(this.mCtx.getOutputDir(str2), this.mTaskAttemptID.toString()), this.mCounters.findCounter(COUNTER.TASK_OUTPUT_RECORD), this.mCounters.findCounter(COUNTER.TASK_OUTPUT_BYTE)));
        }
        while (emptyRecordReader.nextKeyValue()) {
            LongWritable longWritable = new LongWritable();
            longWritable.set(emptyRecordReader.getCurrentKey().get());
            graphLoader.load(longWritable, ((SQLRecord) emptyRecordReader.getCurrentValue()).m7clone(), this.mTaskContext);
        }
        emptyRecordReader.close();
        this.mTaskContext.setOutputWriters(this.mWriters);
    }

    public void init() throws IOException {
        initCombiner();
        initWorkerComputer();
        initAggregatorValues();
        Iterator<Vertex> it = this.vertices.values().iterator();
        while (it.hasNext()) {
            it.next().setup(this.mTaskContext);
        }
    }

    public void processNextStep() throws IOException {
        this.mAggregatorValues = new ArrayList();
        for (int i = 0; i < this.mAggregators.size(); i++) {
            Writable createInitialValue = this.mAggregators.get(i).createInitialValue(this.mTaskContext);
            if (createInitialValue == null) {
                throw new RuntimeException("ODPS-0730001: " + this.mAggregators.get(i).getClass().getName() + " createInitialValue return null");
            }
            this.mAggregatorValues.add(createInitialValue);
        }
        this.mMsgManager.nextSuperStep(this.mCtx);
    }

    public void Compute() throws IOException {
        prepareMsg();
        for (Vertex vertex : this.vertices.values()) {
            Iterable<Writable> iterable = this.mLastStepMessage.get(vertex);
            if (vertex.isHalted() && iterable.iterator().hasNext()) {
                vertex.wakeUp();
            }
            if (!vertex.isHalted()) {
                vertex.compute(this.mTaskContext, iterable);
            }
        }
    }

    public void processMutation(VERTEX_ID vertex_id, LocalVertexMutations localVertexMutations, VertexResolver vertexResolver) throws IOException {
        Vertex vertex = this.vertices.get(vertex_id);
        boolean hasMessageForVertex = this.mMsgManager.hasMessageForVertex(this.mCtx, this.master.getSuperStep(), vertex_id);
        if (vertexResolver == null) {
            throw new IOException("ODPS-0730001: encounter mutations in compute but not set the mutation resolver.");
        }
        Vertex resolve = vertexResolver.resolve(vertex_id, vertex, localVertexMutations, hasMessageForVertex);
        if (resolve == null) {
            this.vertices.remove(vertex_id);
        } else {
            VerifyUtils.verifyVertex(resolve);
            this.vertices.put(vertex_id, resolve);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processWorkerMutations(VertexResolver vertexResolver) throws IOException {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.mVertexMutations.keySet());
        for (WritableComparable<?> writableComparable : this.mMsgManager.getVertexIDList()) {
            if (this.vertices.get(writableComparable) == null) {
                hashSet.add(writableComparable);
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            WritableComparable writableComparable2 = (WritableComparable) it.next();
            processMutation(writableComparable2, this.mVertexMutations.get(writableComparable2), vertexResolver);
        }
        this.mVertexMutations = new HashMap();
    }

    public List<Writable> getAggregatorValues() {
        return this.mAggregatorValues;
    }

    public Counters getCounters() {
        return this.mCounters;
    }

    public long getEgeNumber() {
        long j = 0;
        while (this.vertices.values().iterator().hasNext()) {
            j += r0.next().getNumEdges();
        }
        return j;
    }

    public List<Writable> getLastAggregatedValue() {
        return this.mLastAggregatorValues;
    }

    public Master getMaster() {
        return this.master;
    }

    public WorkerContext getTaskContext() {
        return this.mTaskContext;
    }

    public long getVertexNumber() {
        return this.vertices.size();
    }

    public LocalVertexMutations getVertexMutations(VERTEX_ID vertex_id) {
        LocalVertexMutations localVertexMutations = this.mVertexMutations.get(vertex_id);
        if (localVertexMutations == null) {
            localVertexMutations = new LocalVertexMutations();
            this.mVertexMutations.put(vertex_id, localVertexMutations);
        }
        return localVertexMutations;
    }

    public List<Writable> partialAggregate() {
        return this.mAggregatorValues;
    }

    public void pushMsg(RuntimeContext runtimeContext, long j, WritableComparable<?> writableComparable, Writable writable) {
        this.mMsgManager.pushMsg(runtimeContext, j, writableComparable, writable);
    }

    public void close() throws IOException {
        Iterator<LocalRecordWriter> it = this.mWriters.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.mWriters = null;
        FileUtils.writeStringToFile(new File(this.mCtx.getCounterDir(), String.valueOf(this.mTaskAttemptID.getTaskId())), this.mCounters.toString());
        LOG.debug(this.mCounters);
    }

    public void cleanup() throws IOException {
        Iterator<Vertex> it = this.vertices.values().iterator();
        while (it.hasNext()) {
            it.next().cleanup(this.mTaskContext);
        }
        this.mWorkerComputer.cleanup(this.mTaskContext);
    }

    private Iterable<Writable> combineMsg(WritableComparable writableComparable, Iterable<Writable> iterable) throws IOException {
        if (this.mCombiner == null) {
            return iterable;
        }
        Writable writable = null;
        for (Writable writable2 : iterable) {
            if (writable == null) {
                writable = writable2;
            } else {
                this.mCombiner.combine(writableComparable, writable, writable2);
            }
        }
        ArrayList arrayList = new ArrayList();
        if (writable != null) {
            arrayList.add(writable);
        }
        return arrayList;
    }

    private void prepareMsg() throws IOException {
        long superStep = this.master.getSuperStep();
        LOG.debug("worker super step " + superStep + ", vertices count " + this.vertices.size());
        this.mLastStepMessage.clear();
        for (Vertex vertex : this.vertices.values()) {
            Iterable<Writable> popMsges = this.mMsgManager.popMsges(this.mCtx, superStep, vertex.getId());
            if (this.mCombiner != null) {
                popMsges = combineMsg(vertex.getId(), popMsges);
            }
            this.mLastStepMessage.put(vertex, popMsges);
        }
    }

    public void setLastAggregatedValue(List<Writable> list) {
        this.mLastAggregatorValues = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            Writable writable = null;
            if (list.get(i) != null) {
                writable = WritableUtils.clone(list.get(i), this.mJob);
            }
            this.mLastAggregatorValues.add(writable);
        }
    }

    public void setTotalNumVerticesAndEdges(int i, int i2) {
        this.mTaskContext.setTotalNumVertices(i);
        this.mTaskContext.setTotalNumEdges(i2);
    }

    public Writable getWorkerValue() {
        return this.mWorkerValue;
    }
}
