InternalRunner.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */
package org.waarp.openr66.commander;

import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
import org.waarp.common.database.exception.WaarpDatabaseException;
import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
import org.waarp.common.database.exception.WaarpDatabaseSqlException;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpThreadFactory;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * This class launch and control the Commander and enable TaskRunner job
 * submissions
 */
public class InternalRunner {
  /**
   * Internal Logger
   */
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(InternalRunner.class);

  private final ScheduledExecutorService scheduledExecutorService;
  private ScheduledFuture<?> scheduledFuture;
  private CommanderInterface commander;
  private boolean isRunning = true;
  private final ThreadPoolExecutor threadPoolExecutor;
  private final NetworkTransaction networkTransaction;

  /**
   * Create the structure to enable submission by database
   *
   * @throws WaarpDatabaseNoConnectionException
   * @throws WaarpDatabaseSqlException
   */
  public InternalRunner()
      throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
    commander = new Commander(this, true);
    // This is not daemon intentionally
    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
        new WaarpThreadFactory("InternalRunner", false));
    isRunning = true;
    final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
    threadPoolExecutor = new ThreadPoolExecutor(
        Configuration.configuration.getRunnerThread() / 2,
        Configuration.configuration.getRunnerThread() * 2, 1000,
        TimeUnit.MILLISECONDS, workQueue,
        new WaarpThreadFactory("ClientRunner"), new RejectedExecutionHandler() {
      @Override
      public final void rejectedExecution(final Runnable runnable,
                                          final ThreadPoolExecutor threadPoolExecutor) {
        logger.debug("Task rescheduled");
      }
    });
    scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
                                                                      Configuration.configuration.getDelayCommander(),
                                                                      Configuration.configuration.getDelayCommander(),
                                                                      TimeUnit.MILLISECONDS);
    networkTransaction = new NetworkTransaction();
  }

  public final NetworkTransaction getNetworkTransaction() {
    return networkTransaction;
  }

  public final int allowedToSubmit() {
    final int active = threadPoolExecutor.getActiveCount();
    if ((isRunning || !Configuration.configuration.isShutdown()) &&
        (active < Configuration.configuration.getRunnerThread())) {
      return Configuration.configuration.getRunnerThread() - active;
    }
    return 0;
  }

  /**
   * Submit a task
   *
   * @param taskRunner
   *
   * @return True if launched, False if not since exceeding capacity
   */
  public final boolean submitTaskRunner(final DbTaskRunner taskRunner) {
    if (isRunning || !Configuration.configuration.isShutdown()) {
      logger.debug("Will run {}", taskRunner);
      final ClientRunner runner =
          new ClientRunner(networkTransaction, taskRunner, null);
      if (taskRunner.isSendThrough() && (taskRunner.isRescheduledTransfer() ||
                                         taskRunner.isPreTaskStarting())) {
        runner.setSendThroughMode();
        taskRunner.checkThroughMode();
      }
      try {
        taskRunner.changeUpdatedInfo(UpdatedInfo.RUNNING);
        taskRunner.update();
      } catch (final WaarpDatabaseException e) {
        logger.error("Error in Commander: {}", e.getMessage());
        return false;
      }
      // create the client, connect and run
      threadPoolExecutor.execute(runner);
      return true;
    }
    return false;
  }

  /**
   * First step while shutting down the service
   */
  public final void prepareStopInternalRunner() {
    isRunning = false;
    scheduledFuture.cancel(false);
    if (commander != null) {
      commander.finalizeCommander();
    }
    scheduledExecutorService.shutdown();
    threadPoolExecutor.shutdown();
  }

  /**
   * This should be called when the server is shutting down, after stopping
   * active requests if possible.
   */
  public final void stopInternalRunner() {
    isRunning = false;
    logger.info("Stopping Commander and Runner Tasks");
    scheduledFuture.cancel(true);
    if (commander != null) {
      commander.finalizeCommander();
    }
    scheduledExecutorService.shutdownNow();
    threadPoolExecutor.shutdownNow();
    networkTransaction.closeAll(false);
  }

  public final int nbInternalRunner() {
    return threadPoolExecutor.getActiveCount();
  }

  public final void reloadInternalRunner()
      throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
    scheduledFuture.cancel(false);
    if (commander != null) {
      commander.finalizeCommander();
    }
    commander = new Commander(this);
    scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
                                                                      2 *
                                                                      Configuration.configuration.getDelayCommander(),
                                                                      Configuration.configuration.getDelayCommander(),
                                                                      TimeUnit.MILLISECONDS);
  }
}