package fr.ina.dlweb.lap.writer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.sleepycat.je.rep.impl.node.FeederManager;
import com.sleepycat.je.rep.utilint.HostPortPair;
import fr.ina.dlweb.lap.writer.SafeInputStream;
import fr.ina.dlweb.lap.writer.heartbeat.HeartbeatHelper;
import fr.ina.dlweb.lap.writer.metadata.Metadata;
import fr.ina.dlweb.lap.writer.metadata.MetadataParser;
import fr.ina.dlweb.lap.writer.writerInfo.WriterInfo;
import fr.ina.dlweb.lap.writer.writerInfo.WriterInfoHelper;
import fr.ina.dlweb.lap.writer.writerInfo.WriterInfoResponse;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/ina/dlweb/lap/writer/AbstractLapWriter.class */
public abstract class AbstractLapWriter<M extends Metadata> implements LapWriter<M>, PersistenceListener {
    private static final Logger log = LoggerFactory.getLogger(AbstractLapWriter.class);
    private static final String HANDSHAKE_SEND = "LAP-WRITER";
    private static final String HANDSHAKE_EXPECT = "LAP-MASTER";
    private static final String CONNECTION_CLOSED_MESSAGE = "the Live Archiving Proxy closed the connection";
    private final int lapPort;
    private final String lapHost;
    private MetadataParser<M> metadataParser;
    private Socket lapChannel;
    private boolean started = false;
    private SafeInputStream.OnSafetyTrigger streamSafety = SafeInputStream.OnSafetyTrigger.QUIET;
    private Long startTime = null;
    private long receivedCount = 0;
    private long receivedSize = 0;
    private final JsonFactory json = new JsonFactory();

    public AbstractLapWriter(String str, int i) {
        this.lapHost = str;
        this.lapPort = i;
    }

    @Override // fr.ina.dlweb.lap.writer.LapWriter
    public final synchronized void start(Integer num) throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        this.startTime = Long.valueOf(System.currentTimeMillis());
        boolean z = false;
        WriterInfo info = getInfo();
        SafeInputStream.OnSafetyTrigger streamSafety = getStreamSafety();
        if (this.metadataParser == null) {
            this.metadataParser = createMetadataParser();
        }
        try {
            try {
                connectSocket(num);
                onStarted(doHandshake(num, info));
                mainLoop(streamSafety, createHeartbeatHelper(), new BufferedInputStream(this.lapChannel.getInputStream()));
                this.startTime = null;
                onStopped(false);
            } catch (Exception e) {
                z = true;
                forceStop();
                throw e;
            }
        } catch (Throwable th) {
            this.startTime = null;
            onStopped(z);
            throw th;
        }
    }

    private void mainLoop(SafeInputStream.OnSafetyTrigger onSafetyTrigger, HeartbeatHelper heartbeatHelper, BufferedInputStream bufferedInputStream) throws Exception {
        while (this.started) {
            checkOEF(bufferedInputStream);
            byte[] readLine = readLine(bufferedInputStream);
            JsonParser createJsonParser = this.json.createJsonParser(readLine);
            if (createJsonParser.nextToken() != JsonToken.START_ARRAY) {
                throw new Exception("unexpected message '" + new String(readLine, "UTF-8"));
            }
            createJsonParser.nextToken();
            String text = createJsonParser.getText();
            if ("report".equals(text)) {
                heartbeatHelper.emit();
            } else if ("meta".equals(text)) {
                M readMetadata = this.metadataParser.readMetadata(createJsonParser);
                SafeInputStream safeInputStream = null;
                Long toRead = readMetadata.getToRead();
                if (toRead != null) {
                    safeInputStream = new SafeInputStream(bufferedInputStream, toRead.longValue(), onSafetyTrigger);
                }
                this.receivedCount++;
                if (toRead != null) {
                    this.receivedSize += toRead.longValue();
                }
                try {
                    onContent(readMetadata, safeInputStream, readMetadata.getId(), toRead, this);
                    if (safeInputStream != null) {
                        safeInputStream.skipToLimit();
                    }
                } catch (Throwable th) {
                    throw new Exception("onContent implementation failed", th);
                }
            } else {
                continue;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onStopped(boolean z) {
    }

    protected byte[] readLine(BufferedInputStream bufferedInputStream) throws IOException {
        byte[] bArr = new byte[FeederManager.MASTER_CHANGE_CHECK_TIMEOUT];
        int i = 0;
        while (true) {
            int read = bufferedInputStream.read(bArr, i, 1);
            if (read == -1) {
                break;
            }
            i += read;
            if (bArr[i - 1] == 10) {
                break;
            }
            if (i == bArr.length) {
                byte[] bArr2 = bArr;
                bArr = new byte[bArr.length * 2];
                System.arraycopy(bArr2, 0, bArr, 0, i);
            }
        }
        return Arrays.copyOf(bArr, i);
    }

    protected HeartbeatHelper createHeartbeatHelper() throws IOException {
        return new HeartbeatHelper(this, this.lapChannel.getOutputStream());
    }

    protected final SafeInputStream.OnSafetyTrigger getStreamSafety() {
        return this.streamSafety;
    }

    public void setStreamSafety(SafeInputStream.OnSafetyTrigger onSafetyTrigger) {
        this.streamSafety = onSafetyTrigger;
    }

    public void setNoDelay(boolean z) throws SocketException {
        if (!this.started || this.lapChannel == null || this.lapChannel.isClosed()) {
            return;
        }
        this.lapChannel.setTcpNoDelay(z);
    }

    private void checkOEF(BufferedInputStream bufferedInputStream) throws IOException {
        bufferedInputStream.mark(1);
        try {
        } catch (SocketException e) {
            if ("Connection reset".equals(e.getMessage())) {
                throw new EOFException(CONNECTION_CLOSED_MESSAGE);
            }
        }
        if (bufferedInputStream.read() == -1) {
            throw new EOFException(CONNECTION_CLOSED_MESSAGE);
        }
        bufferedInputStream.reset();
    }

    private WriterInfoResponse doHandshake(Integer num, WriterInfo writerInfo) throws Exception {
        if (num.intValue() >= 0) {
            this.lapChannel.setSoTimeout(num.intValue() * FeederManager.MASTER_CHANGE_CHECK_TIMEOUT);
        }
        sendString(HANDSHAKE_SEND);
        String receiveString = receiveString(HANDSHAKE_EXPECT.length());
        if (!HANDSHAKE_EXPECT.equals(receiveString)) {
            throw new HandshakeException("unexpected handshake response (" + receiveString + ")");
        }
        WriterInfoHelper writerInfoHelper = new WriterInfoHelper();
        writerInfoHelper.sendWriterInfo(this.lapChannel.getOutputStream(), writerInfo);
        WriterInfoResponse receiveWriterInfoResponse = writerInfoHelper.receiveWriterInfoResponse(this.lapChannel.getInputStream());
        if (receiveWriterInfoResponse.getOk() != 1) {
            throw new HandshakeException("WriterInfo response: ok='" + receiveWriterInfoResponse.getOk() + "' status='" + receiveWriterInfoResponse.getStatus() + "'");
        }
        if (writerInfo.getDigest() != null && !writerInfo.getDigest().equalsIgnoreCase(receiveWriterInfoResponse.getDigest())) {
            throw new HandshakeException("Writer wants a digest (" + writerInfo.getDigest() + ") that is not available in this LAP (" + receiveWriterInfoResponse.getDigest() + ")");
        }
        this.lapChannel.setSoTimeout(0);
        return receiveWriterInfoResponse;
    }

    @Override // fr.ina.dlweb.lap.writer.PersistenceListener
    public void onDataPersisted(String str) {
        try {
            sendString("[\"done\",\"" + str + "\"]\n");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private synchronized void sendString(String str) throws IOException {
        this.lapChannel.getOutputStream().write(str.getBytes());
        this.lapChannel.getOutputStream().flush();
    }

    private String receiveString(int i) throws IOException {
        byte[] bArr = new byte[i];
        this.lapChannel.getInputStream().read(bArr, 0, bArr.length);
        return new String(bArr, Charset.forName("UTF-8"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onStarted(WriterInfoResponse writerInfoResponse) {
        log.info("LAP: '{}', LAP-Writer: '{}' (status: {})", new Object[]{writerInfoResponse.getVersion(), getInfo().getWriterAgent(), writerInfoResponse.getStatus()});
    }

    protected abstract MetadataParser<M> createMetadataParser();

    protected void forceStop() {
        this.started = false;
        try {
            this.lapChannel.close();
        } catch (Throwable th) {
        }
    }

    @Override // fr.ina.dlweb.lap.writer.LapWriter
    public final synchronized void stop() {
        this.started = false;
    }

    private void connectSocket(Integer num) throws Exception {
        try {
            this.lapChannel = new Socket();
            this.lapChannel.connect(new InetSocketAddress(this.lapHost, this.lapPort), num == null ? 0 : num.intValue() * FeederManager.MASTER_CHANGE_CHECK_TIMEOUT);
        } catch (Throwable th) {
            throw new Exception("Couldn't not connect to Lie Archiving Proxy at '" + this.lapHost + HostPortPair.SEPARATOR + this.lapPort + "' (timeout:" + num + "s)", th);
        }
    }

    public final Long getStartTime() {
        return this.startTime;
    }

    public long getReceivedCount() {
        return this.receivedCount;
    }

    public long getReceivedSize() {
        return this.receivedSize;
    }
}
