HttpResumableSession.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */

package org.waarp.http.protocol;

import org.waarp.common.command.exception.CommandAbstractException;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.digest.FilesystemBasedDigest.DigestAlgo;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.lru.ConcurrentUtility;
import org.waarp.common.utility.ParametersChecker;
import org.waarp.common.utility.WaarpStringUtils;
import org.waarp.common.utility.WaarpSystemUtil;
import org.waarp.http.protocol.servlet.HttpAuthent;
import org.waarp.openr66.context.R66BusinessInterface;
import org.waarp.openr66.context.filesystem.R66Dir;
import org.waarp.openr66.context.filesystem.R66File;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.database.data.DbTaskRunner;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.Set;
import java.util.UUID;

import static org.waarp.openr66.context.R66FiniteDualStates.*;

/**
 * Http Resumable session
 */
public class HttpResumableSession extends HttpSessionAbstract {
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(HttpResumableSession.class);

  private final Set<HttpResumableChunkNumber> uploadedChunks =
      ConcurrentUtility.newConcurrentSet();
  private final HttpResumableInfo httpResumableInfo;
  private long size = 0L;

  /**
   * Constructor for an Http Resumable Session
   *
   * @param resumableInfo
   * @param rulename
   * @param comment
   * @param authent already initialized
   *
   * @throws IllegalArgumentException if something wrong happens
   */
  HttpResumableSession(final HttpResumableInfo resumableInfo,
                       final String rulename, final String comment,
                       final HttpAuthent authent)
      throws IllegalArgumentException {
    super(authent);
    this.httpResumableInfo = resumableInfo;
    if (!WaarpSystemUtil.isJunit()) {
      final R66BusinessInterface business =
          checkAuthentR66Business(this, session, authent);
      final DbTaskRunner runner =
          getDbTaskRunner(authent.getUserId(), rulename, comment, business);
      preTasks(business, runner);
    }
  }

  /**
   * Adapted method to resumable context
   *
   * @param user
   * @param rulename
   * @param comment
   * @param business
   *
   * @return the DbTaskRunner
   */
  private DbTaskRunner getDbTaskRunner(final String user, final String rulename,
                                       final String comment,
                                       final R66BusinessInterface business) {
    final long uuid = UUID.nameUUIDFromBytes(
                              httpResumableInfo.getIdentifier().getBytes(WaarpStringUtils.UTF8))
                          .getMostSignificantBits();
    return getDbTaskRunner(user, httpResumableInfo.getFilename(), rulename,
                           uuid, comment, httpResumableInfo.getChunkSize(),
                           business, true);
  }

  /**
   * @return the current HttpResumableInfo
   */
  public HttpResumableInfo getHttpResumableInfo() {
    return httpResumableInfo;
  }

  /**
   * Try to write the data according to resumbaleInfo
   *
   * @param resumableInfo
   * @param stream
   *
   * @return true if Write is OK
   *
   * @throws IOException
   */
  public final boolean tryWrite(final HttpResumableInfo resumableInfo,
                                final InputStream stream) throws IOException {
    if (!session.isAuthenticated() || !session.isReady()) {
      logger.error("Not authenticated or not Ready");
      return false;
    }
    if (!valid(resumableInfo)) {
      return false;
    }
    final HttpResumableChunkNumber chunkNumber =
        new HttpResumableChunkNumber(resumableInfo.getChunkNumber());
    if (uploadedChunks.contains(chunkNumber)) {
      return false;
    }
    write(resumableInfo, stream);
    uploadedChunks.add(chunkNumber);
    session.getRunner().incrementRank();
    return true;
  }

  /**
   * Check if the resumableInfo is valid compared to current session
   *
   * @param resumableInfo
   *
   * @return true if ok
   */
  public final boolean valid(final HttpResumableInfo resumableInfo) {
    return (resumableInfo.getChunkSize() > 0 &&
            resumableInfo.getTotalSize() > 0 &&
            resumableInfo.getChunkNumber() > 0 &&
            ParametersChecker.isNotEmpty(resumableInfo.getIdentifier()) &&
            ParametersChecker.isNotEmpty(resumableInfo.getFilename()) &&
            ParametersChecker.isNotEmpty(resumableInfo.getRelativePath()) &&
            httpResumableInfo.isCompatible(resumableInfo));
  }

  /**
   * Real write to the final file
   *
   * @param info
   * @param stream
   *
   * @throws IOException
   */
  private void write(final HttpResumableInfo info, final InputStream stream)
      throws IOException {
    final File file = session.getFile().getTrueFile();
    RandomAccessFile raf = null;
    try {
      raf = new RandomAccessFile(file, "rw");
      //Seek to position
      raf.seek((info.getChunkNumber() - 1) * (long) info.getChunkSize());
      final byte[] bytes = new byte[info.getChunkSize()];
      while (true) {
        final int r = stream.read(bytes);
        if (r < 0) {
          break;
        }
        raf.write(bytes, 0, r);
        size += r;
      }
    } finally {
      if (raf != null) {
        raf.close();
      }
      stream.close();
    }

  }

  /**
   * Check if the resumableInfo block is already written (previously received)
   *
   * @param resumableInfo
   *
   * @return True if contained
   */
  public final boolean contains(final HttpResumableInfo resumableInfo) {
    final HttpResumableChunkNumber chunkNumber =
        new HttpResumableChunkNumber(resumableInfo.getChunkNumber());
    return uploadedChunks.contains(chunkNumber);
  }

  /**
   * Check if the current upload is finished or not
   *
   * @param sha256 if not empty, contains the sha256 in hex format
   *
   * @return True if finished
   */
  public final boolean checkIfUploadFinished(final String sha256) {
    logger.debug("Write until now: {} for {}", size,
                 httpResumableInfo.getTotalSize());
    //check if upload finished
    if (size != httpResumableInfo.getTotalSize()) {
      return false;
    }
    if (session.getState() == CLOSEDCHANNEL) {
      return true;
    }
    logger.debug("Final block received! {}", session);
    session.newState(ENDTRANSFERS);
    //Upload finished, change filename.
    final R66File r66File = session.getFile();
    final File file = r66File.getTrueFile();
    if (file.isFile()) {
      // Now if sha256 is given, compute it and compare
      if (ParametersChecker.isNotEmpty(sha256)) {
        try {
          final byte[] bin =
              FilesystemBasedDigest.getHash(file, false, DigestAlgo.SHA256);
          if (!FilesystemBasedDigest.digestEquals(sha256, bin)) {
            logger.error("Digests differs: {} {}", sha256,
                         FilesystemBasedDigest.getHex(bin));
            error(new OpenR66RunnerErrorException("Digest differs"),
                  session.getBusinessObject());
          }
          logger.info("Digest OK");
        } catch (final IOException ignore) {
          logger.warn(ignore);
        }
      } else {
        logger.info("NO DIGEST given");
      }
      final DbTaskRunner runner = session.getRunner();
      try {
        final String finalpath = R66Dir.getFinalUniqueFilename(r66File);
        logger.debug("File to move from {} to {}",
                     r66File.getTrueFile().getAbsolutePath(), finalpath);
        if (!r66File.renameTo(runner.getRule().setRecvPath(finalpath))) {
          logger.error("Cannot move file to final position {}",
                       runner.getRule().setRecvPath(finalpath));
        }
        runner.setFilename(r66File.getFile());
        runner.saveStatus();
        runPostTask();
        authent.finalizeTransfer(this, session);
      } catch (final OpenR66RunnerErrorException e) {
        error(e, session.getBusinessObject());
      } catch (final CommandAbstractException e) {
        error(e, session.getBusinessObject());
      }
      return true;
    }
    return true;
  }

  @Override
  public String toString() {
    return "RS: {" + uploadedChunks.toString() + ", " + session.toString() +
           ", " + httpResumableInfo + "}";
  }
}