/*
 * Decompiled with CFR 0.152.
 */
package org.irods.jargon.core.transfer;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.Callable;
import org.irods.jargon.core.connection.ConnectionConstants;
import org.irods.jargon.core.connection.ConnectionProgressStatus;
import org.irods.jargon.core.exception.JargonException;
import org.irods.jargon.core.transfer.AbstractParallelTransferThread;
import org.irods.jargon.core.transfer.ParallelPutFileTransferStrategy;
import org.irods.jargon.core.utils.Host;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ParallelPutTransferThread
extends AbstractParallelTransferThread
implements Callable<Object>,
Runnable {
    private final ParallelPutFileTransferStrategy parallelPutFileTransferStrategy;
    private BufferedInputStream bis = null;
    public static final Logger log = LoggerFactory.getLogger(ParallelPutTransferThread.class);

    public static ParallelPutTransferThread instance(ParallelPutFileTransferStrategy parallelPutFileTransferStrategy) throws JargonException {
        return new ParallelPutTransferThread(parallelPutFileTransferStrategy);
    }

    private ParallelPutTransferThread(ParallelPutFileTransferStrategy parallelPutFileTransferStrategy) throws JargonException {
        if (parallelPutFileTransferStrategy == null) {
            throw new JargonException("parallelPutFileTransferStrategy is null");
        }
        this.parallelPutFileTransferStrategy = parallelPutFileTransferStrategy;
        try {
            log.info("opening socket to parallel transfer (high) port at port:{}", parallelPutFileTransferStrategy.getPort());
            this.setS(new Socket(parallelPutFileTransferStrategy.getHost(), parallelPutFileTransferStrategy.getPort()));
            if (parallelPutFileTransferStrategy.getParallelSocketTimeoutInSecs() > 0) {
                log.info("timeout (in seconds) for parallel transfer sockets is:{}", parallelPutFileTransferStrategy.getParallelSocketTimeoutInSecs());
                this.getS().setSoTimeout(parallelPutFileTransferStrategy.getParallelSocketTimeoutInSecs() * 1000);
            }
            this.setOut(new BufferedOutputStream(this.getS().getOutputStream()));
            this.setIn(new BufferedInputStream(this.getS().getInputStream()));
        }
        catch (Exception e) {
            log.error("unable to create transfer thread", e);
            throw new JargonException(e);
        }
    }

    @Override
    public Object call() throws JargonException {
        try {
            log.info("getting input stream for local file");
            this.bis = new BufferedInputStream(new FileInputStream(this.parallelPutFileTransferStrategy.getLocalFile()), ConnectionConstants.OUTPUT_BUFFER_LENGTH);
            log.info("writing the cookie (password) for the output thread");
            byte[] b = new byte[4];
            Host.copyInt(this.parallelPutFileTransferStrategy.getPassword(), b);
            this.getOut().write(b);
            this.getOut().flush();
            log.debug("cookie written for output thread...calling put() to start read/write loop");
            this.put();
            log.debug("put operation completed");
        }
        catch (Exception e) {
            log.error("An exception occurred during a parallel file put operation", e);
            this.setExceptionInTransfer(e);
            throw new JargonException("error during parallel file put", e);
        }
        finally {
            log.info("closing sockets, this eats any exceptions");
            this.close();
            log.info("socket conns for parallel transfer closed, now close the file stream");
            try {
                this.bis.close();
                log.info("streams and files closed");
            }
            catch (IOException iOException) {}
        }
        return "DONE";
    }

    private void seekToStartingPoint(long offset) throws JargonException {
        long totalSkipped = 0L;
        long toSkip = 0L;
        try {
            if (offset > 0L) {
                long skipped = this.bis.skip(offset);
                totalSkipped += skipped;
                while (totalSkipped < offset) {
                    log.warn("did not skip entire offset amount, call skip again");
                    toSkip = offset - totalSkipped;
                    skipped = this.bis.skip(toSkip);
                }
                if (totalSkipped != offset) {
                    throw new JargonException("totalSkipped not equal to offset");
                }
            }
        }
        catch (IOException e) {
            log.error("IOException in seek", e);
            throw new JargonException(e);
        }
    }

    private void put() throws JargonException {
        log.info("put()..");
        byte[] buffer = null;
        boolean done = false;
        buffer = new byte[ConnectionConstants.OUTPUT_BUFFER_LENGTH];
        long currentOffset = 0L;
        try {
            while (!done) {
                log.debug("in main put() loop, reading header data");
                int operation = this.readInt();
                if (log.isInfoEnabled()) {
                    log.info("   operation:" + operation);
                }
                if (operation != 1) {
                    if (operation == 9999) {
                        log.info("done received");
                        done = true;
                        break;
                    }
                    throw new JargonException("unknown operation received");
                }
                log.debug("put operation");
                int flags = this.readInt();
                if (log.isInfoEnabled()) {
                    log.info("   flags:" + flags);
                }
                long offset = this.readLong();
                if (log.isInfoEnabled()) {
                    log.info("   offset:" + offset);
                }
                long length = this.readLong();
                if (log.isInfoEnabled()) {
                    log.info("   length:" + length);
                }
                if (offset != currentOffset) {
                    this.seekToStartingPoint(offset);
                    currentOffset = offset;
                }
                log.info("buffer length for put is: {}", buffer.length);
                this.readWriteLoopForCurrentHeaderDirective(buffer, length);
                currentOffset += length;
            }
        }
        catch (IOException e) {
            log.error("An IO exception occurred during a parallel file put operation", e);
            throw new JargonException("IOException during parallel file put", e);
        }
    }

    private void readWriteLoopForCurrentHeaderDirective(byte[] buffer, long length) throws IOException, JargonException {
        int read = 0;
        long totalRead = 0L;
        long transferLength = length;
        long totalWritten = 0L;
        log.debug("readWriteLoopForCurrentHeaderDirective()");
        try {
            while (transferLength > 0L) {
                log.debug("read/write loop at top");
                read = this.bis.read(buffer, 0, (int)Math.min((long)ConnectionConstants.OUTPUT_BUFFER_LENGTH, transferLength));
                log.debug("bytes read: {}", read);
                if (read > 0) {
                    totalRead += (long)read;
                    log.debug("getting ready to write to iRODS, new txfr length:{}", transferLength -= (long)read);
                    this.getOut().write(buffer, 0, read);
                    if (this.parallelPutFileTransferStrategy.getConnectionProgressStatusListener() != null) {
                        this.parallelPutFileTransferStrategy.getConnectionProgressStatusListener().connectionProgressStatusCallback(ConnectionProgressStatus.instanceForSend(read));
                    }
                    log.debug("wrote data to the buffer");
                    totalWritten += (long)read;
                } else {
                    log.debug("no read...break out of read/write");
                    break;
                }
                Thread.yield();
            }
            log.info("final flush of output buffer");
            this.getOut().flush();
            log.info("for thread, total read: {}", totalRead);
            log.info("   total written: {}", totalWritten);
            log.info("   transferLength: {}", transferLength);
        }
        catch (Exception e) {
            log.error("error writing to iRODS parallel transfer socket", e);
            this.setExceptionInTransfer(e);
            throw new JargonException(e);
        }
        if (totalRead != totalWritten) {
            throw new JargonException("totalRead and totalWritten do not agree");
        }
        if (transferLength != 0L) {
            throw new JargonException("transferLength and totalWritten do not agree");
        }
    }

    @Override
    public void run() {
        try {
            this.call();
        }
        catch (JargonException e) {
            this.setExceptionInTransfer(e);
            log.error("exception set in transfer to be picked up by caller", e);
        }
    }
}

