R66Session.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */
package org.waarp.openr66.context;

import org.waarp.common.command.exception.CommandAbstractException;
import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.exception.IllegalFiniteStateException;
import org.waarp.common.exception.NoRestartException;
import org.waarp.common.file.SessionInterface;
import org.waarp.common.file.filesystembased.FilesystemBasedFileParameterImpl;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.state.MachineState;
import org.waarp.compress.WaarpZstdCodec;
import org.waarp.openr66.context.authentication.R66Auth;
import org.waarp.openr66.context.filesystem.R66Dir;
import org.waarp.openr66.context.filesystem.R66File;
import org.waarp.openr66.context.filesystem.R66Restart;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
import org.waarp.openr66.protocol.localhandler.packet.compression.ZstdCompressionCodecDataPacket;
import org.waarp.openr66.protocol.utils.FileUtils;

import java.io.File;
import java.lang.ref.SoftReference;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * The global object session in OpenR66, a session by local channel
 */
public class R66Session implements SessionInterface {
  /**
   * Internal Logger
   */
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(R66Session.class);
  private static final String FILE_IS_IN_THROUGH_MODE =
      "File is in through mode: {}";
  private static final String FILE_CANNOT_BE_WRITE = "File cannot be write";

  /**
   * Global Codec for packet compression
   */
  private static final ZstdCompressionCodecDataPacket codec =
      Configuration.configuration.isCompressionAvailable()?
          new ZstdCompressionCodecDataPacket() : null;

  /**
   * Block size used during file transfer
   */
  private int blockSize = Configuration.configuration.getBlockSize();
  /**
   * The local channel reference
   */
  private LocalChannelReference localChannelReference;
  /**
   * Authentication
   */
  private final R66Auth auth;
  /**
   * Remote Address
   */
  private SocketAddress raddress;
  /**
   * Local Address
   */
  private SocketAddress laddress;

  /**
   * Current directory
   */
  private final R66Dir dir;
  /**
   * Current file
   */
  private R66File file;
  /**
   * Does this session is Ready to serve a request
   */
  private boolean isReady;
  /**
   * Used to prevent deny of service
   */
  private final AtomicInteger numOfError = new AtomicInteger(0);

  /**
   * Current Restart information
   */
  private final R66Restart restart;

  /**
   * DbTaskRunner
   */
  private DbTaskRunner runner;

  private String status = "NoStatus";

  /**
   * The Finite Machine State
   */
  private final MachineState<R66FiniteDualStates> state;
  private Exception traceState = null;
  /**
   * Business Object if used
   */
  private R66BusinessInterface businessObject;
  /**
   * Extended protocol or not
   */
  private final boolean extendedProtocol =
      Configuration.configuration.isExtendedProtocol();

  private final HashMap<String, R66Dir> dirsFromSession =
      new HashMap<String, R66Dir>();
  private static SoftReference<byte[]> reusableBufferStatic = null;
  private static SoftReference<byte[]> reusableDataPacketBufferStatic = null;
  private static SoftReference<byte[]> reusableCompressionBufferStatic = null;
  private SoftReference<byte[]> reusableBuffer = null;
  private SoftReference<byte[]> reusableDataPacketBuffer = null;
  private SoftReference<byte[]> reusableCompressionBuffer = null;
  private FilesystemBasedDigest digestBlock = null;
  private boolean isSender;
  private boolean isCompressionEnabled;
  private int compressionMaxSize =
      WaarpZstdCodec.getMaxCompressedSize(blockSize);

  /**
   * @return the compression codec
   */
  public static ZstdCompressionCodecDataPacket getCodec() {
    return codec;
  }

  /**
   * Create the session
   */
  public R66Session() {
    isReady = false;
    auth = new R66Auth(this);
    dir = new R66Dir(this);
    restart = new R66Restart(this);
    state = R66FiniteDualStates.newSessionMachineState();
    isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
    synchronized (logger) {
      reusableBuffer = reusableBufferStatic;
      reusableBufferStatic = null;
      reusableDataPacketBuffer = reusableDataPacketBufferStatic;
      reusableDataPacketBufferStatic = null;
    }
  }

  /**
   * Create the session without Buffers
   */
  public R66Session(final boolean noBuffer) {
    isReady = false;
    auth = new R66Auth(this);
    dir = new R66Dir(this);
    restart = new R66Restart(this);
    state = R66FiniteDualStates.newSessionMachineState();
    isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
  }

  /**
   * @return extendedProtocol
   */
  public final boolean getExtendedProtocol() {
    return extendedProtocol;
  }

  /**
   * @return the businessObject
   */
  public final R66BusinessInterface getBusinessObject() {
    return businessObject;
  }

  /**
   * @param businessObject the businessObject to set
   */
  public final void setBusinessObject(
      final R66BusinessInterface businessObject) {
    this.businessObject = businessObject;
  }

  /**
   * Propose a new State
   *
   * @param desiredstate
   *
   * @throws IllegalFiniteStateException if the new status if not ok
   */
  public final void newState(final R66FiniteDualStates desiredstate) {
    try {
      state.setCurrent(desiredstate);
      if (logger.isDebugEnabled()) {
        traceState = new Exception("Trace for debugging");
      }
    } catch (final IllegalFiniteStateException e) {
      logger.warn("Should not changed of State: {} {}", this, e.getMessage(),
                  e);
      if (logger.isDebugEnabled()) {
        logger.warn("Previous condition of state: {}", this, traceState);
        traceState = new Exception("Trace for debugging");
      }
      state.setDryCurrent(desiredstate);
    }
  }

  public final void setErrorState() {
    try {
      state.setCurrent(R66FiniteDualStates.ERROR);
    } catch (final IllegalFiniteStateException e) {
      logger.error("Couldn't pass to error state. This should not happen");
    }
  }

  /**
   * @return the current state in the finite state machine
   */
  public final R66FiniteDualStates getState() {
    return state.getCurrent();
  }

  /**
   * Debugging purpose (trace)
   *
   * @param stat
   */
  public final void setStatus(final int stat) {
    if (logger.isDebugEnabled()) {
      final StackTraceElement elt = Thread.currentThread().getStackTrace()[2];
      status =
          '(' + elt.getFileName() + ':' + elt.getLineNumber() + "):" + stat;
    } else {
      status = ":" + stat;
    }
  }

  @Override
  public final void clear() {
    partialClear();
    if (dir != null) {
      dir.clear();
    }
    if (auth != null) {
      auth.clear();
    }
    if (runner != null) {
      runner.clear();
    }
    if (state != null) {
      try {
        state.setCurrent(R66FiniteDualStates.CLOSEDCHANNEL);
      } catch (final IllegalFiniteStateException ignored) {
        // nothing
      }
      R66FiniteDualStates.endSessionMachineSate(state);
    }
  }

  public final void partialClear() {
    // First check if a transfer was on going
    if (runner != null && !runner.isFinished() && !runner.continueTransfer()) {
      if (localChannelReference != null) {
        if (!localChannelReference.getFutureRequest().isDone()) {
          final R66Result result = new R66Result(
              new OpenR66RunnerErrorException("Close before ending"), this,
              true, ErrorCode.Disconnection,
              runner);// True since called from closed
          result.setRunner(runner);
          try {
            setFinalizeTransfer(false, result);
          } catch (final OpenR66RunnerErrorException ignored) {
            // nothing
          } catch (final OpenR66ProtocolSystemException ignored) {
            // nothing
          }
        }
      }
    }
    // No clean of file since it can be used after channel is closed
    isReady = false;
    if (businessObject != null) {
      businessObject.releaseResources(this);
      businessObject = null;
    }
    digestBlock = null;
    if (reusableBuffer != null && reusableBufferStatic == null) {
      reusableBufferStatic = reusableBuffer;
    }
    if (reusableDataPacketBuffer != null &&
        reusableDataPacketBufferStatic == null) {
      reusableDataPacketBufferStatic = reusableDataPacketBuffer;
    }
    if (reusableCompressionBuffer != null &&
        reusableCompressionBufferStatic == null) {
      reusableCompressionBufferStatic = reusableCompressionBuffer;
    }
    reusableBuffer = null;
    reusableDataPacketBuffer = null;
    reusableCompressionBuffer = null;
  }

  @Override
  public final R66Auth getAuth() {
    return auth;
  }

  @Override
  public final int getBlockSize() {
    return blockSize;
  }

  /**
   * @param blocksize the blocksize to set
   */
  public final void setBlockSize(final int blocksize) {
    blockSize = blocksize;
    compressionMaxSize = WaarpZstdCodec.getMaxCompressedSize(blockSize);
  }

  private SoftReference<byte[]> getBuffer(SoftReference<byte[]> softReference,
                                          final int length) {
    if (softReference == null) {
      softReference = new SoftReference<byte[]>(new byte[length]);
      return softReference;
    }
    final byte[] buffer = softReference.get();
    if (buffer != null && buffer.length >= length) {
      return softReference;
    }
    softReference.clear();
    softReference = new SoftReference<byte[]>(new byte[length]);
    return softReference;
  }

  /**
   * @param length the target size
   *
   * @return the reusable buffer for sending per Session
   */
  public final byte[] getReusableBuffer(final int length) {
    reusableBuffer = getBuffer(reusableBuffer, length);
    return reusableBuffer.get();
  }

  /**
   * @param length the target size
   *
   * @return the reusable buffer for received DataPacket per Session
   */
  public final byte[] getReusableDataPacketBuffer(final int length) {
    reusableDataPacketBuffer = getBuffer(reusableDataPacketBuffer, length);
    return reusableDataPacketBuffer.get();
  }

  /**
   * @param length the original size
   *
   * @return the possible reusable compression buffer per Session
   */
  public final byte[] getSessionReusableCompressionBuffer(final int length) {
    reusableCompressionBuffer = getBuffer(reusableCompressionBuffer,
                                          WaarpZstdCodec.getMaxCompressedSize(
                                              length));
    return reusableCompressionBuffer.get();
  }

  /**
   * @return True if compression is enabled in this session
   */
  public final boolean isCompressionEnabled() {
    return isCompressionEnabled;
  }

  /**
   * @param compressionEnabled True if compression is enabled in this session
   */
  public final void setCompressionEnabled(final boolean compressionEnabled) {
    logger.debug("Compression enabled? {} => {}", isCompressionEnabled,
                 compressionEnabled);
    isCompressionEnabled = compressionEnabled;
    if (isCompressionEnabled && reusableCompressionBuffer == null) {
      synchronized (logger) {
        reusableCompressionBuffer = reusableCompressionBufferStatic;
        reusableCompressionBufferStatic = null;
      }
    } else if (!isCompressionEnabled && reusableCompressionBuffer != null) {
      synchronized (logger) {
        reusableDataPacketBufferStatic = reusableCompressionBuffer;
        reusableCompressionBuffer = null;
      }
    }
  }

  /**
   * @return the current max compression size according to block size
   */
  public final int getCompressionMaxSize() {
    return compressionMaxSize;
  }

  /**
   * Initialize per block digest
   */
  public final void initializeDigest() {
    if (digestBlock == null && RequestPacket.isMD5Mode(getRunner().getMode())) {
      try {
        digestBlock = new FilesystemBasedDigest(
            localChannelReference.getPartner().getDigestAlgo());
      } catch (final NoSuchAlgorithmException e) {
        SysErrLogger.FAKE_LOGGER.ignoreLog(e);
      }
    }
  }

  /**
   * @return the digest used per block
   */
  public final FilesystemBasedDigest getDigestBlock() {
    return digestBlock;
  }

  @Override
  public final R66Dir getDir() {
    return dir;
  }

  @Override
  public final FilesystemBasedFileParameterImpl getFileParameter() {
    return Configuration.getFileParameter();
  }

  @Override
  public final R66Restart getRestart() {
    return restart;
  }

  /**
   * @return True if the connection is currently authenticated
   */
  public final boolean isAuthenticated() {
    if (auth == null) {
      return false;
    }
    return auth.isIdentified();
  }

  /**
   * @return True if the Channel is ready to accept transfer
   */
  public final boolean isReady() {
    return isReady;
  }

  /**
   * @param isReady the isReady for transfer to set
   */
  public final void setReady(final boolean isReady) {
    this.isReady = isReady;
  }

  /**
   * @return the runner
   */
  public final DbTaskRunner getRunner() {
    return runner;
  }

  /**
   * @param localChannelReference the localChannelReference to set
   */
  public final void setLocalChannelReference(
      final LocalChannelReference localChannelReference) {
    this.localChannelReference = localChannelReference;
    this.localChannelReference.setSession(this);
    if (this.localChannelReference.getNetworkChannel() != null) {
      raddress = this.localChannelReference.getNetworkChannel().remoteAddress();
      laddress = this.localChannelReference.getNetworkChannel().localAddress();
    } else {
      raddress = laddress = new InetSocketAddress(0);
    }
  }

  /**
   * @return the remote SocketAddress
   */
  public final SocketAddress getRemoteAddress() {
    return raddress;
  }

  /**
   * @return the local SocketAddress
   */
  public final SocketAddress getLocalAddress() {
    return laddress;
  }

  /**
   * @return the localChannelReference
   */
  public final LocalChannelReference getLocalChannelReference() {
    return localChannelReference;
  }

  /**
   * To be called in case of No Session not from a valid LocalChannelHandler
   *
   * @param runner
   * @param localChannelReference
   */
  public final void setNoSessionRunner(final DbTaskRunner runner,
                                       final LocalChannelReference localChannelReference) {
    this.runner = runner;
    // Warning: the file is not correctly setup
    auth.specialNoSessionAuth(false, Configuration.configuration.getHostId());
    try {
      file = (R66File) dir.setFile(this.runner.getFilename(), false);
    } catch (final CommandAbstractException ignored) {
      // nothing
    }
    this.localChannelReference = localChannelReference;
    if (this.localChannelReference == null) {
      if (this.runner.getLocalChannelReference() != null) {
        this.localChannelReference = this.runner.getLocalChannelReference();
      } else {
        this.localChannelReference = new LocalChannelReference();
      }
      this.localChannelReference.setErrorMessage(
          this.runner.getErrorInfo().getMesg(), this.runner.getErrorInfo());
    }
    runner.setLocalChannelReference(this.localChannelReference);
    this.localChannelReference.setSession(this);
  }

  /**
   * Set the File from the runner before PRE operation are done
   *
   * @throws OpenR66RunnerErrorException
   */
  public final void setFileBeforePreRunner()
      throws OpenR66RunnerErrorException {
    // check first if the next step is the PRE task from beginning
    try {
      file = FileUtils.getFile(logger, this, runner.getOriginalFilename(),
                               runner.isPreTaskStarting(), isSender,
                               runner.isSendThrough(), file);
    } catch (final OpenR66RunnerErrorException e) {
      runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
      throw e;
    }
    if (isSender && !runner.isSendThrough()) {
      // possibly resolved filename
      try {
        runner.setOriginalFilename(file.getFile());
        runner.setFilename(file.getFile());
        logger.debug("Old size: {} => {}", runner.getOriginalSize(),
                     file.length());
        if (runner.getOriginalSize() <= 0) {
          final long originalSize = file.length();
          if (originalSize > 0) {
            runner.setOriginalSize(originalSize);
          }
        }
      } catch (final CommandAbstractException e) {
        throw new OpenR66RunnerErrorException(e);
      }
    }
  }

  /**
   * Set the File from the runner once PRE operation are done, only for Receiver
   *
   * @param createFile When True, the file can be newly created if
   *     needed.
   *     If False, no new file will be
   *     created, thus having an Exception.
   *
   * @throws OpenR66RunnerErrorException
   * @throws CommandAbstractException only when new received created
   *     file
   *     cannot be created
   */
  public final void setFileAfterPreRunnerReceiver(final boolean createFile)
      throws OpenR66RunnerErrorException, CommandAbstractException {
    if (businessObject != null) {
      businessObject.checkAtChangeFilename(this);
    }
    // File should not exist except if restart
    if (runner.getRank() > 0) {
      // Filename should be get back from runner load from database
      try {
        file = (R66File) dir.setFile(runner.getFilename(), true);
        if (runner.isRecvThrough()) {
          // no test on file since it does not really exist
          logger.debug(FILE_IS_IN_THROUGH_MODE, file);
        } else if (!file.canWrite()) {
          throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
        }
      } catch (final CommandAbstractException e) {
        throw new OpenR66RunnerErrorException(e);
      }
    } else {
      // New FILENAME if necessary and store it
      if (createFile) {
        file = null;
        String newfilename = runner.getOriginalFilename();
        if (newfilename.charAt(1) == ':') {
          // Windows path
          newfilename = newfilename.substring(2);
        }
        newfilename = R66File.getBasename(newfilename);
        try {
          file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
          runner.setFilename(file.getBasename());
        } catch (final CommandAbstractException e) {
          runner.deleteTempFile();
          throw e;
        }
        try {
          if (runner.isRecvThrough()) {
            // no test on file since it does not really exist
            logger.debug(FILE_IS_IN_THROUGH_MODE, file);
            runner.deleteTempFile();
          } else if (!file.canWrite()) {
            runner.deleteTempFile();
            throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
          }
        } catch (final CommandAbstractException e) {
          runner.deleteTempFile();
          throw new OpenR66RunnerErrorException(e);
        }
      } else {
        throw new OpenR66RunnerErrorException("No file created");
      }
    }
    // Store TRUEFILENAME
    try {
      if (runner.isFileMoved()) {
        runner.setFileMoved(file.getFile(), true);
      } else {
        runner.setFilename(file.getFile());
      }
    } catch (final CommandAbstractException e) {
      runner.deleteTempFile();
      throw new OpenR66RunnerErrorException(e);
    }
  }

  /**
   * Set the File from the runner once PRE operation are done
   *
   * @param createFile When True, the file can be newly created if
   *     needed.
   *     If False, no new file will be
   *     created, thus having an Exception.
   *
   * @throws OpenR66RunnerErrorException
   * @throws CommandAbstractException only when new received created
   *     file
   *     cannot be created
   */
  public final void setFileAfterPreRunner(final boolean createFile)
      throws OpenR66RunnerErrorException, CommandAbstractException {
    if (businessObject != null) {
      businessObject.checkAtChangeFilename(this);
    }
    // Now create the associated file
    if (isSender != runner.isSender()) {
      logger.warn("Not same SenderSide {} {}", isSender, runner.isSender());
    }
    if (isSender) {
      try {
        if (file == null) {
          try {
            file = (R66File) dir.setFile(runner.getFilename(), false);
          } catch (final CommandAbstractException e) {
            // file is not under normal base directory, so is external
            // File must already exist but can be using special code ('*?')
            file = dir.setFileNoCheck(runner.getFilename());
          }
        }
        if (runner.isSendThrough()) {
          // no test on file since it does not really exist
          logger.debug(FILE_IS_IN_THROUGH_MODE, file);
        } else if (!file.canRead()) {
          // file is not under normal base directory, so is external
          // File must already exist but cannot used special code ('*?')
          final R66File newFile = new R66File(this, dir, runner.getFilename());
          if (!newFile.canRead()) {
            runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
            throw new OpenR66RunnerErrorException(
                "File cannot be read: " + file.getTrueFile().getAbsolutePath() +
                " to " + newFile.getTrueFile().getAbsolutePath());
          }
        }
      } catch (final CommandAbstractException e) {
        throw new OpenR66RunnerErrorException(e);
      }
    } else {
      // File should not exist except if restart
      if (runner.getRank() > 0) {
        // Filename should be get back from runner load from database
        try {
          file = (R66File) dir.setFile(runner.getFilename(), true);
          if (runner.isRecvThrough()) {
            // no test on file since it does not really exist
            logger.debug(FILE_IS_IN_THROUGH_MODE, file);
          } else if (!file.canWrite()) {
            throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
          }
        } catch (final CommandAbstractException e) {
          throw new OpenR66RunnerErrorException(e);
        }
      } else {
        // New FILENAME if necessary and store it
        if (createFile) {
          file = null;
          String newfilename = runner.getOriginalFilename();
          if (newfilename.charAt(1) == ':') {
            // Windows path
            newfilename = newfilename.substring(2);
          }
          newfilename = R66File.getBasename(newfilename);
          try {
            file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
            runner.setFilename(file.getBasename());
          } catch (final CommandAbstractException e) {
            runner.deleteTempFile();
            throw e;
          }
          try {
            if (runner.isRecvThrough()) {
              // no test on file since it does not really exist
              logger.debug(FILE_IS_IN_THROUGH_MODE, file);
              runner.deleteTempFile();
            } else if (!file.canWrite()) {
              runner.deleteTempFile();
              throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
            }
          } catch (final CommandAbstractException e) {
            runner.deleteTempFile();
            throw new OpenR66RunnerErrorException(e);
          }
        } else {
          throw new OpenR66RunnerErrorException("No file created");
        }
      }
    }
    // Store TRUEFILENAME
    try {
      if (runner.isFileMoved()) {
        runner.setFileMoved(file.getFile(), true);
      } else {
        runner.setFilename(file.getFile());
      }
    } catch (final CommandAbstractException e) {
      runner.deleteTempFile();
      throw new OpenR66RunnerErrorException(e);
    }
    // check fileSize
    if (isSender && file != null) {
      logger.debug("could change size: {} => {}", runner.getOriginalSize(),
                   file.length());
      if (runner.getOriginalSize() < 0) {
        final long originalSize = file.length();
        if (originalSize > 0) {
          runner.setOriginalSize(originalSize);
        }
      }
    }
  }

  /**
   * To be used when a request comes with a bad code so it cannot be set
   * normally
   *
   * @param runner
   * @param code
   */
  public final void setBadRunner(final DbTaskRunner runner,
                                 final ErrorCode code) {
    this.runner = runner;
    if (code == ErrorCode.QueryAlreadyFinished) {
      if (this.runner.isSender()) {
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getSendPath());
        } catch (final CommandAbstractException ignored) {
          // nothing
        }
      } else {
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getWorkPath());
        } catch (final CommandAbstractException ignored) {
          // nothing
        }
      }
      if (businessObject != null) {
        businessObject.checkAtError(this);
      }
      this.runner.setPostTask();
      try {
        setFileAfterPreRunner(false);
      } catch (final OpenR66RunnerErrorException ignored) {
        // nothing
      } catch (final CommandAbstractException ignored) {
        // nothing
      }
    }
  }

  /**
   * Set the runner, and setup the directory first.
   * <p>
   * This call should be followed by a startup() call.
   *
   * @param runner the runner to set
   *
   * @throws OpenR66RunnerErrorException
   */
  public final void setRunner(final DbTaskRunner runner)
      throws OpenR66RunnerErrorException {
    this.runner = runner;
    if (localChannelReference != null) {
      this.runner.setLocalChannelReference(localChannelReference);
    }
    this.isSender = runner.isSender();
    logger.debug("Runner to set: {} {}", runner.shallIgnoreSave(), runner);
    this.runner.checkThroughMode();
    if (businessObject != null) {
      businessObject.checkAtStartup(this);
    }
    if (this.runner.isSender()) {
      if (runner.isSendThrough()) {
        // May not change dir as needed
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getSendPath());
        } catch (final CommandAbstractException e) {
          // ignore
        }
      } else {
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getSendPath());
        } catch (final CommandAbstractException e) {
          throw new OpenR66RunnerErrorException(e);
        }
      }
    } else {
      if (runner.isRecvThrough()) {
        // May not change dir as needed
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getWorkPath());
        } catch (final CommandAbstractException ignored) {
          // nothing
        }
      } else {
        // Change dir
        try {
          dir.changeDirectory(this.runner.getRule().getWorkPath());
        } catch (final CommandAbstractException e) {
          throw new OpenR66RunnerErrorException(e);
        }
      }
    }
    logger.debug("Dir is: {}", dir.getFullPath());
  }

  /**
   * START from the PreTask if necessary, and prepare the file
   *
   * @param checkNotExternal if True, the file as Sender should not be
   *     external to current directory
   *
   * @throws OpenR66RunnerErrorException
   */
  public final void startup(final boolean checkNotExternal)
      throws OpenR66RunnerErrorException {
    setRestartMarker();
    logger.debug("GlobalLastStep: {} vs {}:{}", runner.getGloballaststep(),
                 TASKSTEP.NOTASK.ordinal(), TASKSTEP.PRETASK.ordinal());
    initializeTransfer(checkNotExternal);
    // Now create the associated file
    try {
      setFileAfterPreRunner(true);
    } catch (final CommandAbstractException e2) {
      // generated due to a possible wildcard not ready
      file = null;
    }
    logger.debug("GlobalLastStep: {}", runner.getGloballaststep());
    if (runner.getGloballaststep() == TASKSTEP.TRANSFERTASK.ordinal()) {
      if (businessObject != null) {
        businessObject.checkAfterPreCommand(this);
      }
      if (!runner.isSender()) {
        if (initializeReceiver()) {
          return;
        }
      } else {
        initializeSender();
      }
    }
    runner.saveStatus();
    logger.debug("Final init: {} {}", runner, file != null);
  }

  private void initializeSender() throws OpenR66RunnerErrorException {
    if (file != null) {
      try {
        localChannelReference.getFutureRequest().setFilesize(file.length());
      } catch (final CommandAbstractException ignored) {
        // nothing
      }
      try {
        file.restartMarker(restart);
      } catch (final CommandAbstractException e) {
        runner.deleteTempFile();
        throw new OpenR66RunnerErrorException(e);
      }
    }
  }

  private boolean initializeReceiver() throws OpenR66RunnerErrorException {
    // Check file length according to rank
    if (runner.isRecvThrough()) {
      // no size can be checked
    } else {
      long length = 0;
      if (file != null) {
        try {
          length = file.length();
        } catch (final CommandAbstractException ignored) {
          // nothing
        }
      }
      final long needed = runner.getOriginalSize() - length;
      long available = 0;
      String targetDir = null;
      try {
        available = dir.getFreeSpace();
        targetDir = dir.getPwd();
      } catch (final CommandAbstractException ignored) {
        // nothing
      }
      if (file != null) {
        final File truefile = file.getTrueFile().getParentFile();
        available = truefile.getFreeSpace();
        targetDir = truefile.getPath();
      }
      logger.debug("Check available space: {} >? {}(+{})", available, needed,
                   length);
      // Available > 0 since some system returns 0 (wrong size)
      if (available > 0 && needed > available) {
        // not enough space
        runner.setErrorExecutionStatus(ErrorCode.Internal);
        throw new OpenR66RunnerErrorException(
            "File cannot be written due to unsufficient space available: " +
            targetDir + " need " + needed + " more while available is " +
            available);
      }
      if (file == null) {
        runner.saveStatus();
        logger.info("Final PARTIAL init: {}", runner);
        return true;
      }
      checkPosition(length);
    }
    return false;
  }

  private void checkPosition(final long length)
      throws OpenR66RunnerErrorException {
    // First check available space
    try {
      final long oldPosition = restart.getPosition();
      restart.setSet(true);
      if (oldPosition > length) {
        int newRank = (int) (length / runner.getBlocksize()) -
                      Configuration.getRankRestart();
        if (newRank <= 0) {
          newRank = 1;
        }
        logger.info("OldPos: {}:{} curLength: {}:{}", oldPosition,
                    runner.getRank(), length, newRank);
        logger.warn("Decreased Rank Restart for {} at " + newRank, runner);
        runner.setTransferTask(newRank);
        restart.restartMarker(runner.getBlocksize() * runner.getRank());
      }
      try {
        file.restartMarker(restart);
      } catch (final CommandAbstractException e) {
        runner.deleteTempFile();
        throw new OpenR66RunnerErrorException(e);
      }
    } catch (final NoRestartException e) {
      // length is not to be changed
    }
  }

  private void initializeTransfer(final boolean checkNotExternal)
      throws OpenR66RunnerErrorException {
    if (runner.isSelfRequest() && runner.getRule().isSendMode()) {
      // It might be the sender and initiator side, already changed by
      // receiver, reset globalLastStep and globalStep
      runner.setInitialTask();
    }
    if (runner.getGloballaststep() == TASKSTEP.NOTASK.ordinal() ||
        runner.getGloballaststep() == TASKSTEP.PRETASK.ordinal()) {
      setFileBeforePreRunner();
      if (runner.isSender() && !runner.isSendThrough() && file != null &&
          checkNotExternal) {
        String path = null;
        try {
          path = file.getFile();
        } catch (final CommandAbstractException ignored) {
          // nothing
        }
        if (file.isExternal() ||
            path != null && !dir.isPathInCurrentDir(path)) {
          // should not be
          logger.error(
              "File cannot be found in the current output directory: {} not in {}",
              file, dir);
          runner.setErrorExecutionStatus(ErrorCode.FileNotAllowed);
          throw new OpenR66RunnerErrorException(
              "File cannot be found in the current output directory");
        }
      }
      runner.setPreTask();
      runner.run();
      if (runner.isSender() && !runner.isSendThrough()) {
        if (file != null) {
          try {
            final long originalSize = file.length();
            if (originalSize > 0) {
              runner.setOriginalSize(originalSize);
            }
          } catch (final CommandAbstractException e) {
            // ignore
          }
        }
      }
      runner.setTransferTask(runner.getRank());
    } else {
      runner.reset();
      runner.changeUpdatedInfo(UpdatedInfo.RUNNING);
    }
  }

  private void setRestartMarker() {
    if (runner.getRank() > 0) {
      logger.debug("restart at {} {}", runner.getRank(), runner);
      logger.debug("restart at {} {}", runner.getRank(), dir);
      runner.setTransferTask(runner.getRank());
      restart.restartMarker(runner.getBlocksize() * runner.getRank());
    } else {
      restart.restartMarker(0);
    }
  }

  /**
   * Rename the current receive file from the very beginning since the sender
   * has a post action that changes its
   * name
   *
   * @param newFilename
   *
   * @throws OpenR66RunnerErrorException
   */
  public final void renameReceiverFile(final String newFilename)
      throws OpenR66RunnerErrorException {
    if (runner == null) {
      return;
    }
    // First delete the temporary file if needed
    if (runner.getRank() > 0) {
      logger.error(
          "Renaming file is not correct since transfer does not start from first block");
      // Not correct
      throw new OpenR66RunnerErrorException(
          "Renaming file not correct since transfer already started");
    }
    if (!runner.isRecvThrough()) {
      runner.deleteTempFile();
    }
    // Now rename it
    runner.setOriginalFilename(newFilename);
    try {
      setFileAfterPreRunnerReceiver(true);
    } catch (final CommandAbstractException e) {
      throw new OpenR66RunnerErrorException(e);
    }
  }

  /**
   * Finalize the transfer step by running the error or post operation
   * according
   * to the status.
   *
   * @param status
   * @param finalValue
   *
   * @throws OpenR66RunnerErrorException
   * @throws OpenR66ProtocolSystemException
   */
  public final void setFinalizeTransfer(final boolean status,
                                        final R66Result finalValue)
      throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
    logger.debug("{}:{}:{}", status, finalValue, runner);
    if (runner == null) {
      if (localChannelReference != null) {
        if (status) {
          localChannelReference.validateRequest(finalValue);
        } else {
          localChannelReference.invalidateRequest(finalValue);
        }
      }
      if (businessObject != null) {
        if (status) {
          businessObject.checkAfterTransfer(this);
        } else {
          businessObject.checkAtError(this);
        }
      }
      return;
    }
    if (businessObject != null) {
      if (status) {
        businessObject.checkAfterTransfer(this);
      } else {
        businessObject.checkAtError(this);
      }
    }
    if (runner.isAllDone()) {
      if (logger.isDebugEnabled()) {
        logger.debug("Transfer already done but {} on {} {}", status, file,
                     runner.toShortString(),
                     new OpenR66RunnerErrorException(finalValue.toString()));
      }
      // FIXME ??
      /*
       * if (! status) runner.finalizeTransfer(localChannelReference, file, finalValue, status)
       */
      return;
    }
    if (runner.isInError()) {
      if (logger.isDebugEnabled()) {
        logger.debug("Transfer already done in error but {} on {} {}", status,
                     file, runner.toShortString(),
                     new OpenR66RunnerErrorException(finalValue.toString()));
      }
      // FIXME ??
      /*
       * if (! status) runner.finalizeTransfer(localChannelReference, file, finalValue, status)
       */
      return;
    }
    if (localChannelReference.getFutureRequest().isDone()) {
      if (logger.isDebugEnabled()) {
        logger.debug("Request already done but {} on {} {}" + status, file,
                     runner.toShortString(),
                     new OpenR66RunnerErrorException(finalValue.toString()));
      }
      // Already finished once so do nothing more
      return;
    }
    if (!status) {
      runner.deleteTempFile();
      runner.setErrorExecutionStatus(finalValue.getCode());
    }
    if (status) {
      runner.finishTransferTask(ErrorCode.TransferOk);
    } else {
      runner.finishTransferTask(finalValue.getCode());
    }
    logger.debug("Transfer {} on {} and {}", status, file, runner);
    if (!runner.ready()) {
      // Pre task in error (or even before)
      final OpenR66RunnerErrorException runnerErrorException;
      if (!status && finalValue.getException() != null) {
        runnerErrorException = new OpenR66RunnerErrorException(
            "Pre task in error (or even before)", finalValue.getException());
      } else {
        runnerErrorException = new OpenR66RunnerErrorException(
            "Pre task in error (or even before)");
      }
      finalValue.setException(runnerErrorException);
      logger.info("Pre task in error (or even before) : {}",
                  runnerErrorException.getMessage());
      if (Configuration.configuration.isExecuteErrorBeforeTransferAllowed()) {
        runner.finalizeTransfer(localChannelReference, file, finalValue,
                                status);
      }
      runner.saveStatus();
      localChannelReference.invalidateRequest(finalValue);
      throw runnerErrorException;
    }
    try {
      if (file != null) {
        file.closeFile();
      }
    } catch (final CommandAbstractException e1) {
      R66Result result = finalValue;
      if (status) {
        result = new R66Result(new OpenR66RunnerErrorException(e1), this, false,
                               ErrorCode.Internal, runner);
      }
      localChannelReference.invalidateRequest(result);
      throw (OpenR66RunnerErrorException) result.getException();
    }
    runner.finalizeTransfer(localChannelReference, file, finalValue, status);
    if (businessObject != null) {
      businessObject.checkAfterPost(this);
    }
  }

  /**
   * Try to finalize the request if possible
   *
   * @param errorValue
   *
   * @throws OpenR66RunnerErrorException
   * @throws OpenR66ProtocolSystemException
   */
  public final void tryFinalizeRequest(final R66Result errorValue)
      throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
    if (getLocalChannelReference() == null) {
      return;
    }
    if (getLocalChannelReference().getFutureRequest().isDone()) {
      return;
    }
    if (runner == null) {
      localChannelReference.invalidateRequest(errorValue);
      return;
    }
    // do the real end
    if (runner.getStatus() == ErrorCode.CompleteOk) {
      runner.setAllDone();
      runner.forceSaveStatus();
      localChannelReference.validateRequest(
          new R66Result(this, true, ErrorCode.CompleteOk, runner));
    } else if (runner.getStatus() == ErrorCode.TransferOk &&
               (!runner.isSender() ||
                errorValue.getCode() == ErrorCode.QueryAlreadyFinished)) {
      // Try to finalize it
      try {
        setFinalizeTransfer(true,
                            new R66Result(this, true, ErrorCode.CompleteOk,
                                          runner));
        localChannelReference.validateRequest(
            localChannelReference.getFutureEndTransfer().getResult());
      } catch (final OpenR66ProtocolSystemException e) {
        logger.error("Cannot validate runner:     {}", runner.toShortString());
        runner.changeUpdatedInfo(UpdatedInfo.INERROR);
        runner.setErrorExecutionStatus(errorValue.getCode());
        runner.forceSaveStatus();
        setFinalizeTransfer(false, errorValue);
      } catch (final OpenR66RunnerErrorException e) {
        logger.error("Cannot validate runner:     {}", runner.toShortString());
        runner.changeUpdatedInfo(UpdatedInfo.INERROR);
        runner.setErrorExecutionStatus(errorValue.getCode());
        runner.forceSaveStatus();
        setFinalizeTransfer(false, errorValue);
      }
    } else {
      // invalidate Request
      logger.error(
          "Runner {} will be shutdown while in status {} and future status will be {}",
          runner.getSpecialId(), runner.getStatus().getMesg(),
          errorValue.getCode().getMesg());
      setFinalizeTransfer(false, errorValue);
    }
  }

  /**
   * @return the file
   */
  public final R66File getFile() {
    return file;
  }

  /**
   * @return True if the number of Error is still acceptable
   */
  public final boolean addError() {
    final int value = numOfError.incrementAndGet();
    return value < Configuration.RETRYNB;
  }

  @Override
  public final String toString() {
    return "Session: FS[" + state.getCurrent() + "] " + status + "  " +
           (auth != null? auth.toString() : "no Auth") + "     " +
           (dir != null? dir.toString() : "no Dir") + "     " +
           (file != null? file.toString() : "no File") + "     " +
           (runner != null? runner.toShortString() : "no Runner");
  }

  @Override
  public final String getUniqueExtension() {
    return Configuration.EXT_R66;
  }

  /**
   * @return the dirsFromSession
   */
  public final HashMap<String, R66Dir> getDirsFromSession() {
    return dirsFromSession;
  }

  /**
   * @return True if according to session, it is the sender side (to byPass
   *     send to itself issue)
   */
  public final boolean isSender() {
    return isSender;
  }
}