package com.allstontrading.disco.worker;

import com.allstontrading.disco.DiscoUtils;
import com.allstontrading.disco.worker.protocol.DiscoIOChannel;
import com.allstontrading.disco.worker.protocol.decode.DiscoWorkerDecoder;
import com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener;
import com.allstontrading.disco.worker.protocol.decode.types.DiscoInput;
import com.allstontrading.disco.worker.protocol.decode.types.DiscoInputReplica;
import com.allstontrading.disco.worker.protocol.decode.types.DiscoTaskMode;
import com.allstontrading.disco.worker.protocol.encode.DoneEncoder;
import com.allstontrading.disco.worker.protocol.encode.ErrorEncoder;
import com.allstontrading.disco.worker.protocol.encode.FatalEncoder;
import com.allstontrading.disco.worker.protocol.encode.OutputEncoder;
import com.allstontrading.disco.worker.protocol.encode.RequestTaskEncoder;
import com.allstontrading.disco.worker.protocol.encode.WorkerAnnounceEncoder;
import com.allstontrading.disco.worker.protocol.encode.types.OutputType;
import com.allstontrading.disco.worker.task.DiscoMapTask;
import com.allstontrading.disco.worker.task.DiscoReduceTask;
import com.allstontrading.disco.worker.task.DiscoTask;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:com/allstontrading/disco/worker/DiscoWorker.class */
public class DiscoWorker implements DiscoWorkerListener {
    private static final String WORKER_PROTOCOL_VERSION = "1.0";
    private final DiscoIOChannel discoIOChannel;
    private DiscoMapTask map = null;
    private DiscoReduceTask reduce = null;
    private final WorkerAnnounceEncoder workerAnnounceEncoder = new WorkerAnnounceEncoder();
    private final RequestTaskEncoder requestTaskEncoder = new RequestTaskEncoder();
    private final OutputEncoder outputEncoder = new OutputEncoder();
    private final DoneEncoder doneEncoder = new DoneEncoder();
    private final ErrorEncoder errorEncoder = new ErrorEncoder();
    private final FatalEncoder fatalEncoder = new FatalEncoder();

    public DiscoWorker(ReadableByteChannel readableByteChannel, WritableByteChannel writableByteChannel) {
        this.discoIOChannel = new DiscoIOChannel(readableByteChannel, writableByteChannel, new DiscoWorkerDecoder().setListener(this));
    }

    public void requestTask() throws IOException {
        if (hasTask()) {
            return;
        }
        this.discoIOChannel.write(this.workerAnnounceEncoder.set(WORKER_PROTOCOL_VERSION, DiscoUtils.getPid()));
        this.discoIOChannel.write(this.requestTaskEncoder);
    }

    public ReadableByteChannel getMapInput() {
        return this.map.getMapInput();
    }

    public List<ReadableByteChannel> getReduceInputs() {
        return this.reduce.getReduceInputs();
    }

    public void reportOutputs(List<File> list) throws IOException {
        Iterator<File> it = list.iterator();
        while (it.hasNext()) {
            reportOutput(it.next(), OutputType.disco);
        }
    }

    public void reportOutput(File file, OutputType outputType) throws IOException {
        this.discoIOChannel.write(this.outputEncoder.set(getWorkingDir(), file, outputType, ""));
    }

    public void doneReportingOutput() throws IOException {
        this.discoIOChannel.write(this.doneEncoder);
    }

    public void reportError(String str) throws IOException {
        this.discoIOChannel.write(this.errorEncoder.set(str));
    }

    public void reportFatalError(String str) throws IOException {
        this.discoIOChannel.write(this.fatalEncoder.set(str));
    }

    public File getWorkingDir() {
        return getTask().getWorkingDir();
    }

    public boolean hasMapTask() {
        return this.map != null;
    }

    public boolean hasReduceTask() {
        return this.reduce != null;
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void task(String str, String str2, String str3, int i, DiscoTaskMode discoTaskMode, int i2, int i3, File file, File file2, File file3) {
        switch (discoTaskMode) {
            case map:
                this.map = new DiscoMapTask(this.discoIOChannel, i, i2);
                return;
            case reduce:
                this.reduce = new DiscoReduceTask(this.discoIOChannel, i, i2);
                return;
            default:
                return;
        }
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void ok() {
        if (hasTask()) {
            getTask().ok();
        }
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void input(boolean z, List<DiscoInput> list) {
        getTask().input(z, list);
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void fail(int i, List<Integer> list) {
        getTask().fail(i, list);
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void retry(List<DiscoInputReplica> list) {
        getTask().retry(list);
    }

    @Override // com.allstontrading.disco.worker.protocol.decode.DiscoWorkerListener
    public void pause(int i) {
        getTask().pause(i);
    }

    private DiscoTask getTask() {
        return hasMapTask() ? this.map : this.reduce;
    }

    private boolean hasTask() {
        return hasMapTask() || hasReduceTask();
    }
}
