LocalTransaction.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */
package org.waarp.openr66.protocol.localhandler;

import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpShutdownHook;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.context.R66FiniteDualStates;
import org.waarp.openr66.context.R66Result;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolNotAuthenticatedException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolShutdownException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
import org.waarp.openr66.protocol.localhandler.packet.StartupPacket;
import org.waarp.openr66.protocol.localhandler.packet.ValidPacket;
import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
import org.waarp.openr66.protocol.utils.R66Future;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

/**
 * This class handles Local Transaction connections
 */
public class LocalTransaction {
  /**
   * Internal Logger
   */
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(LocalTransaction.class);

  /**
   * HashMap of LocalChannelReference using LocalChannelId
   */
  private final ConcurrentHashMap<Integer, LocalChannelReference>
      localChannelHashMap =
      new ConcurrentHashMap<Integer, LocalChannelReference>();

  /**
   * HashMap of LocalChannelReference using requested_requester_specialId
   */
  private final ConcurrentHashMap<String, LocalChannelReference>
      localChannelHashMapIdBased =
      new ConcurrentHashMap<String, LocalChannelReference>();

  /**
   * Constructor
   */
  public LocalTransaction() {
    // EMpty
  }

  public final String hashStatus() {
    return "LocalTransaction: [localChannelHashMap: " +
           localChannelHashMap.size() + " localChannelHashMapIdBased: " +
           localChannelHashMapIdBased.size() + "] ";
  }

  /**
   * Get the corresponding LocalChannelReference and set the remoteId if
   * different
   *
   * @param remoteId
   * @param localId
   *
   * @return the LocalChannelReference
   *
   * @throws OpenR66ProtocolSystemException
   */
  public final LocalChannelReference getClient(final Integer remoteId,
                                               final Integer localId)
      throws OpenR66ProtocolSystemException {
    final LocalChannelReference localChannelReference = getFromId(localId);
    if (localChannelReference != null) {
      if (localChannelReference.getRemoteId().compareTo(remoteId) != 0) {
        localChannelReference.setRemoteId(remoteId);
      }
      return localChannelReference;
    }
    throw new OpenR66ProtocolSystemException(
        "Cannot find LocalChannelReference");
  }

  /**
   * Create a new Client
   *
   * @param networkChannelReference
   * @param remoteId might be set to ChannelUtils.NOCHANNEL (real
   *     creation)
   * @param futureRequest might be null (from NetworkChannel Startup)
   *
   * @return the LocalChannelReference
   *
   * @throws OpenR66ProtocolRemoteShutdownException
   * @throws OpenR66ProtocolNoConnectionException
   */
  public final LocalChannelReference createNewClient(
      final NetworkChannelReference networkChannelReference,
      final Integer remoteId, final R66Future futureRequest,
      final boolean fromSsl) throws OpenR66ProtocolRemoteShutdownException,
                                    OpenR66ProtocolNoConnectionException {
    if (WaarpShutdownHook.isShutdownStarting()) {
      // Do not try since already locally in shutdown
      throw new OpenR66ProtocolNoConnectionException(
          "Cannot create client since the server is in shutdown.");
    }
    final LocalChannelReference localChannelReference =
        new LocalChannelReference(networkChannelReference, remoteId,
                                  futureRequest);
    localChannelHashMap.put(localChannelReference.getLocalId(),
                            localChannelReference);
    logger.debug("Db connection done and Create LocalChannel entry: {}",
                 localChannelReference);
    // Now simulate sending first a Startup message
    final StartupPacket startup =
        new StartupPacket(localChannelReference.getLocalId(), fromSsl);
    try {
      localChannelReference.getServerHandler().startup(startup);
      // Try but may not be available yet
      localChannelReference.getServerHandler().validateAuthenticationReuse();
    } catch (final OpenR66ProtocolPacketException e) {
      throw new OpenR66ProtocolNoConnectionException(e);
    } catch (final OpenR66ProtocolNotAuthenticatedException e) {
      throw new OpenR66ProtocolNoConnectionException(e);
    }
    return localChannelReference;
  }

  /**
   * @param id
   *
   * @return the LocalChannelReference
   */
  public final LocalChannelReference getFromId(final Integer id) {
    return localChannelHashMap.get(id);
  }

  /**
   * Remove one local channel
   *
   * @param localChannelReference
   */
  protected final void remove(
      final LocalChannelReference localChannelReference) {
    logger.debug("DEBUG remove: {}", localChannelReference.getLocalId());
    localChannelHashMap.remove(localChannelReference.getLocalId());
    if (localChannelReference.getRequestId() != null) {
      localChannelHashMapIdBased.remove(localChannelReference.getRequestId());
    }
    if (localChannelReference.getNetworkChannelObject() != null) {
      localChannelReference.getNetworkChannelObject()
                           .remove(localChannelReference);
    }
  }

  /**
   * @param runner
   * @param lcr
   */
  public final void setFromId(final DbTaskRunner runner,
                              final LocalChannelReference lcr) {
    final String key = runner.getKey();
    lcr.setRequestId(key);
    localChannelHashMapIdBased.put(key, lcr);
  }

  /**
   * @param key as "requested requester specialId"
   *
   * @return the LocalChannelReference
   */
  public final LocalChannelReference getFromRequest(final String key) {
    return localChannelHashMapIdBased.get(key);
  }

  /**
   * @param key as "requested requester specialId"
   *
   * @return True if the LocalChannelReference exists
   */
  public final boolean contained(final String key) {
    return localChannelHashMapIdBased.containsKey(key);
  }

  /**
   * @param id
   *
   * @return True if the LocalChannelReference exists
   */
  public final boolean contained(final int id) {
    return localChannelHashMap.containsKey(id);
  }

  /**
   * @return the number of active local channels
   */
  public final int getNumberLocalChannel() {
    return localChannelHashMap.size();
  }

  /**
   * Debug function (while shutdown for instance)
   */
  public final void debugPrintActiveLocalChannels() {
    final Collection<LocalChannelReference> collection =
        localChannelHashMap.values();
    for (final LocalChannelReference localChannelReference : collection) {
      logger.debug("Will close local channel: {}", localChannelReference);
      logger.debug(" Containing: {}",
                   localChannelReference.getSession() != null?
                       localChannelReference.getSession() : "no session");
    }
  }

  /**
   * Informs all remote client that the server is shutting down
   */
  public final void shutdownLocalChannels() {
    logger.warn(
        "Will inform LocalChannels of Shutdown: " + localChannelHashMap.size());
    final Collection<LocalChannelReference> collection =
        localChannelHashMap.values();
    final Iterator<LocalChannelReference> iterator = collection.iterator();
    final ValidPacket packet = new ValidPacket("Shutdown forced", null,
                                               LocalPacketFactory.SHUTDOWNPACKET);
    while (iterator.hasNext()) {
      final LocalChannelReference localChannelReference = iterator.next();
      logger.info("Inform Shutdown {}", localChannelReference);
      packet.setSmiddle(null);
      packet.retain();
      // If a transfer is running, save the current rank and inform remote
      // host
      if (localChannelReference.getSession() != null) {
        final R66Session session = localChannelReference.getSession();
        final DbTaskRunner runner = session.getRunner();
        if (runner != null && runner.isInTransfer()) {
          if (!session.isSender()) {
            final int newrank = runner.getRank();
            packet.setSmiddle(Integer.toString(newrank));
          }
          // Save File status
          try {
            runner.saveStatus();
          } catch (final OpenR66RunnerErrorException ignored) {
            // nothing
          }
        }
        if (runner != null && !runner.isFinished()) {
          final R66Result result =
              new R66Result(new OpenR66ProtocolShutdownException(), session,
                            true, ErrorCode.Shutdown, runner);
          result.setOther(packet);
          try {
            final NetworkPacket message =
                new NetworkPacket(localChannelReference.getLocalId(),
                                  localChannelReference.getRemoteId(), packet,
                                  localChannelReference);
            localChannelReference.sessionNewState(R66FiniteDualStates.SHUTDOWN);
            try {
              localChannelReference.getNetworkChannel().writeAndFlush(message)
                                   .await(Configuration.WAITFORNETOP);
            } catch (final InterruptedException e1) {//NOSONAR
              SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
            }
          } catch (final OpenR66ProtocolPacketException ignored) {
            // ignore
          }
          try {
            session.setFinalizeTransfer(false, result);
          } catch (final OpenR66RunnerErrorException ignored) {
            // ignore
          } catch (final OpenR66ProtocolSystemException ignored) {
            // ignore
          }
        }
        localChannelReference.close();
        continue;
      }
      try {
        final NetworkPacket message =
            new NetworkPacket(localChannelReference.getLocalId(),
                              localChannelReference.getRemoteId(), packet,
                              localChannelReference);
        localChannelReference.getNetworkChannel().writeAndFlush(message);
      } catch (final OpenR66ProtocolPacketException ignored) {
        // ignore
      }
      localChannelReference.close();
    }
  }

  /**
   * Close All Local Channels
   */
  public final void closeAll() {
    logger.debug("close All Local Channels");
    final Collection<LocalChannelReference> collection =
        localChannelHashMap.values();
    for (final LocalChannelReference localChannelReference : collection) {
      logger.info("Inform Shutdown {}", localChannelReference);
      localChannelReference.close();
    }
  }

}