package com.aliyun.odps.mapred.bridge.streaming;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Reducer;
import com.aliyun.odps.mapred.bridge.streaming.io.InputWriter;
import com.aliyun.odps.mapred.bridge.streaming.io.OutputReader;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.UTF8ByteArrayUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.Iterator;

/* loaded from: input_file:com/aliyun/odps/mapred/bridge/streaming/PipeReducer.class */
public class PipeReducer extends PipeMapRed implements Reducer {
    private byte[] reduceOutFieldSeparator;
    private byte[] reduceInputFieldSeparator;
    private int numOfReduceOutputKeyFields = 1;
    private boolean skipping = false;

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    String getPipeCommand(JobConf jobConf) {
        String str = jobConf.get("stream.reduce.streamprocessor");
        if (str == null) {
            return str;
        }
        try {
            return URLDecoder.decode(str, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            System.err.println("stream.reduce.streamprocessor in jobconf not found");
            return null;
        }
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    boolean getDoPipe() {
        String pipeCommand = getPipeCommand(this.job_);
        return (pipeCommand == null || "NONE".equals(pipeCommand)) ? false : true;
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    public void configure(JobConf jobConf) {
        super.configure(jobConf);
        try {
            this.reduceOutFieldSeparator = UTF8ByteArrayUtils.unescapeSeparator(this.job_.get("stream.reduce.output.field.separator", "\t")).getBytes("UTF-8");
            this.reduceInputFieldSeparator = UTF8ByteArrayUtils.unescapeSeparator(this.job_.get("stream.reduce.input.field.separator", "\t")).getBytes("UTF-8");
            this.numOfReduceOutputKeyFields = this.job_.getInt("stream.num.reduce.output.key.fields", 1);
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
        }
    }

    public void setup(Reducer.TaskContext taskContext) throws IOException {
        configure(taskContext.getJobConf());
    }

    public void cleanup(Reducer.TaskContext taskContext) throws IOException {
        close();
    }

    public void reduce(Record record, Iterator<Record> it, Reducer.TaskContext taskContext) throws IOException {
        String str;
        if (this.doPipe_ && this.outThread_ == null) {
            startOutputThreads(taskContext);
        }
        while (it.hasNext()) {
            try {
                Record next = it.next();
                this.numRecRead_++;
                maybeLogRecord();
                if (!this.doPipe_) {
                    Record createOutputRecord = taskContext.createOutputRecord();
                    createOutputRecord.setString(0, record.getString(0));
                    createOutputRecord.setString(1, next.getString(0));
                    taskContext.write(createOutputRecord);
                } else {
                    if (this.outerrThreadsThrowable != null) {
                        mapRedFinished();
                        throw new IOException("MROutput/MRErrThread failed:", this.outerrThreadsThrowable);
                    }
                    this.inWriter_.writeKey(record.get(0));
                    this.inWriter_.writeValue(next.get(0));
                }
            } catch (IOException e) {
                try {
                    int exitValue = this.sim.exitValue();
                    str = exitValue == 0 ? "subprocess exited successfully\n" : "subprocess exited with error code " + exitValue + "\n";
                } catch (IllegalThreadStateException e2) {
                    str = "subprocess still running\n";
                }
                mapRedFinished();
                throw new IOException(str + getContext() + e.getMessage());
            }
        }
        if (this.doPipe_ && this.skipping) {
            this.clientOut_.flush();
        }
    }

    public void close() {
        mapRedFinished();
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    public byte[] getInputSeparator() {
        return this.reduceInputFieldSeparator;
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    public byte[] getFieldSeparator() {
        return this.reduceOutFieldSeparator;
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    public int getNumOfKeyFields() {
        return this.numOfReduceOutputKeyFields;
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    InputWriter createInputWriter() throws IOException {
        return super.createInputWriter(this.reduceInputWriterClass_);
    }

    @Override // com.aliyun.odps.mapred.bridge.streaming.PipeMapRed
    OutputReader createOutputReader() throws IOException {
        return super.createOutputReader(this.reduceOutputReaderClass_);
    }
}
