Commander.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.commander;
import org.waarp.common.database.DbPreparedStatement;
import org.waarp.common.database.data.AbstractDbData;
import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
import org.waarp.common.database.exception.WaarpDatabaseException;
import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
import org.waarp.common.database.exception.WaarpDatabaseNoDataException;
import org.waarp.common.database.exception.WaarpDatabaseSqlException;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpShutdownHook;
import org.waarp.openr66.database.data.DbConfiguration;
import org.waarp.openr66.database.data.DbHostAuth;
import org.waarp.openr66.database.data.DbHostConfiguration;
import org.waarp.openr66.database.data.DbMultipleMonitor;
import org.waarp.openr66.database.data.DbRule;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
import static org.waarp.openr66.database.DbConstantR66.*;
/**
* Commander is responsible to read from database updated data from time to time
* in order to achieve new
* runner or new configuration updates.
*/
public class Commander implements CommanderInterface {
private static final String DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER =
"Database Error: Cannot execute Commander";
private static final String CONFIG = "Config {}";
private static final String DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER =
"Database SQL Error: Cannot execute Commander";
private static final String
DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER =
"Database No Connection Error: Cannot execute Commander";
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(Commander.class);
public static final int LIMIT_MAX_SUBMIT = 50000;
public static final int LIMIT_SUBMIT = 1000;
private InternalRunner internalRunner;
private DbPreparedStatement preparedStatementLock;
private long totalRuns = 0;
/**
* Prepare requests that will be executed from time to time
*
* @param runner
*
* @throws WaarpDatabaseNoConnectionException
* @throws WaarpDatabaseSqlException
*/
public Commander(final InternalRunner runner)
throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
internalConstructor(runner);
}
/**
* Prepare requests that will be executed from time to time
*
* @param runner
* @param fromStartup True if call from startup of the server
*
* @throws WaarpDatabaseNoConnectionException
* @throws WaarpDatabaseSqlException
*/
public Commander(final InternalRunner runner, final boolean fromStartup)
throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
internalConstructor(runner);
if (fromStartup) {
// Change RUNNING or INTERRUPTED to TOSUBMIT since they should be ready
DbTaskRunner.resetToSubmit(admin.getSession());
}
}
private void internalConstructor(final InternalRunner runner)
throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
try {
if (Configuration.configuration.getMultipleMonitors() > 1) {
preparedStatementLock = DbMultipleMonitor.getUpdatedPrepareStament(
noCommitAdmin.getSession());
} else {
preparedStatementLock = null;
}
// Clean tasks (CompleteOK and ALLDONE => DONE)
DbTaskRunner.changeFinishedToDone();
internalRunner = runner;
} finally {
if (internalRunner == null) {
// An error occurs
if (preparedStatementLock != null) {
preparedStatementLock.realClose();
}
} else {
if (preparedStatementLock != null) {
noCommitAdmin.getSession()
.addLongTermPreparedStatement(preparedStatementLock);
}
}
}
}
/**
* Finalize internal data
*/
@Override
public final void finalizeCommander() {
if (preparedStatementLock != null) {
try {
noCommitAdmin.getSession().commit();
} catch (final WaarpDatabaseSqlException ignored) {
// nothing
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
preparedStatementLock.realClose();
noCommitAdmin.getSession()
.removeLongTermPreparedStatements(preparedStatementLock);
// DbConstant.noCommitAdmin.session.removeLongTermPreparedStatements()
}
// DbConstant.admin.session.removeLongTermPreparedStatements()
}
@Override
public void run() {
Thread.currentThread().setName("OpenR66Commander");
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
if (admin.getSession() != null && admin.getSession().isDisActive()) {
admin.getSession().checkConnectionNoException();
}
// each time it is runned, it parses all database for updates
DbMultipleMonitor multipleMonitor = null;
// Open a lock to prevent other "HA" monitors to retrieve access as Commander
try {
try {
if (preparedStatementLock != null) {
preparedStatementLock.executeQuery();
preparedStatementLock.getNext();
multipleMonitor =
DbMultipleMonitor.getFromStatement(preparedStatementLock);
}
} catch (final WaarpDatabaseNoConnectionException e) {
logger.error(
DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
try {
noCommitAdmin.getDbModel()
.validConnection(noCommitAdmin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
return;
} catch (final WaarpDatabaseSqlException e) {
logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER, e);
try {
noCommitAdmin.getDbModel()
.validConnection(noCommitAdmin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
return;
}
logger.debug("Before {}", multipleMonitor);
boolean shallReturnInCaseError = true;
try {
// First check Configuration
checkConfiguration(multipleMonitor);
// check HostConfiguration
shallReturnInCaseError = false;
checkHostConfiguration(multipleMonitor);
} catch (final WaarpDatabaseNoConnectionException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(
DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
return;
} catch (final WaarpDatabaseSqlException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
// XXX no return since table might not be initialized
if (shallReturnInCaseError) {
return;
}
} catch (final WaarpDatabaseException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
// XXX no return since table might not be initialized
if (shallReturnInCaseError) {
return;
}
}
// Do not fusion with previous cases since last one could be in error but
// still continue
try {
// ConsistencyCheck HostAuthent
checkHostAuthent(multipleMonitor);
// Check Rules
checkRule(multipleMonitor);
if (WaarpShutdownHook.isShutdownStarting()) {
// no more task to submit
return;
}
// Lauch Transfer ready to be submited
logger.debug("start runner");
checkTaskRunner(multipleMonitor);
} catch (final WaarpDatabaseNoConnectionException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(
DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
return;
} catch (final WaarpDatabaseSqlException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
return;
} catch (final WaarpDatabaseNoDataException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
return;
} catch (final WaarpDatabaseException e) {
try {
admin.getDbModel().validConnection(admin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
e.getMessage());
return;
}
logger.debug("end commander");
} finally {
if (multipleMonitor != null) {
try {
// Now update and Commit so releasing the lock
logger.debug("Update {}", multipleMonitor);
multipleMonitor.update();
noCommitAdmin.getSession().commit();
} catch (final WaarpDatabaseException e) {
try {
noCommitAdmin.getDbModel()
.validConnection(noCommitAdmin.getSession());
} catch (final WaarpDatabaseNoConnectionException ignored) {
// nothing
}
}
}
}
}
private void checkConfiguration(final DbMultipleMonitor multipleMonitor)
throws WaarpDatabaseException {
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
final DbConfiguration[] configurations =
DbConfiguration.getUpdatedPrepareStament();
int i = 0;
while (i < configurations.length) {
// should be only one...
final DbConfiguration configuration = configurations[i];
if (configuration.isOwnConfiguration()) {
configuration.updateConfiguration();
}
if (multipleMonitor != null) {
// update the configuration in HA mode
if (multipleMonitor.checkUpdateConfig()) {
configuration.changeUpdatedInfo(
AbstractDbData.UpdatedInfo.NOTUPDATED);
configuration.update();
logger.debug(CONFIG, multipleMonitor);
} else {
configuration.update();
logger.debug(CONFIG, multipleMonitor);
}
} else {
configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
configuration.update();
}
i++;
}
}
private void checkHostConfiguration(final DbMultipleMonitor multipleMonitor)
throws WaarpDatabaseException {
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
final DbHostConfiguration[] configurations =
DbHostConfiguration.getUpdatedPrepareStament();
int i = 0;
while (i < configurations.length) {
// should be only one...
final DbHostConfiguration configuration = configurations[i];
if (configuration.isOwnConfiguration()) {
configuration.updateConfiguration();
}
if (multipleMonitor != null) {
// update the configuration in HA mode
if (multipleMonitor.checkUpdateConfig()) {
configuration.changeUpdatedInfo(
AbstractDbData.UpdatedInfo.NOTUPDATED);
configuration.update();
logger.debug(CONFIG, multipleMonitor);
} else {
configuration.update();
logger.debug(CONFIG, multipleMonitor);
}
} else {
configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
configuration.update();
}
i++;
}
}
private void checkHostAuthent(final DbMultipleMonitor multipleMonitor)
throws WaarpDatabaseException {
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
final DbHostAuth[] auths = DbHostAuth.getUpdatedPreparedStatement();
int i = 0;
boolean mm = false;
boolean lastUpdate = false;
while (i < auths.length) {
// Maybe multiple
final DbHostAuth hostAuth = auths[i];
if (multipleMonitor != null) {
if (!mm) {
// not already set from a previous hostAuth
mm = true;
lastUpdate = multipleMonitor.checkUpdateHost();
} // else already set so no action on multipleMonitor
// Update the Host configuration in HA mode
if (lastUpdate) {
hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
} else {
// Nothing to do except validate
}
hostAuth.update();
logger.debug("Host {}", multipleMonitor);
} else {
// Nothing to do except validate
hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
hostAuth.update();
}
i++;
}
}
private void checkRule(final DbMultipleMonitor multipleMonitor)
throws WaarpDatabaseException {
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
final DbRule[] rules = DbRule.getUpdatedPrepareStament();
int i = 0;
boolean mm = false;
boolean lastUpdate = false;
while (i < rules.length) {
final DbRule rule = rules[i];
if (multipleMonitor != null) {
if (!mm) {
// not already set from a previous hostAuth
mm = true;
lastUpdate = multipleMonitor.checkUpdateRule();
} // else already set so no action on multipleMonitor
// Update the Rules in HA mode
if (lastUpdate) {
rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
} else {
// Nothing to do except validate
}
rule.update();
logger.debug("Rule {}", multipleMonitor);
} else {
// Nothing to do except validate
rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
rule.update();
}
i++;
}
}
private void checkTaskRunner(final DbMultipleMonitor multipleMonitor)
throws WaarpDatabaseException {
if (Configuration.configuration.isShutdown()) {
// Stop
return;
}
// No specific HA mode since the other servers will wait for the commit on Lock
final int maxRunnable =
Math.min(Configuration.configuration.getRunnerThread(),
internalRunner.allowedToSubmit());
if (maxRunnable > 0) {
final DbTaskRunner[] tasks =
DbTaskRunner.getSelectFromInfoPrepareStatement(UpdatedInfo.TOSUBMIT,
true, maxRunnable);
logger.info("TaskRunner to launch: {} (launched: {}, active: {}) {}",
tasks.length, totalRuns, internalRunner.nbInternalRunner(),
NetworkTransaction.hashStatus());
int i = 0;
while (i < tasks.length) {
if (WaarpShutdownHook.isShutdownStarting()) {
logger.info("Will not start transfers, server is in shutdown.");
return;
}
final DbTaskRunner taskRunner = tasks[i];
i++;
logger.debug("get a task: {}", taskRunner);
// Launch if possible this task
final String key =
taskRunner.getRequested() + ' ' + taskRunner.getRequester() + ' ' +
taskRunner.getSpecialId();
if (Configuration.configuration.getLocalTransaction()
.getFromRequest(key) != null) {
// already running
continue;
}
if (taskRunner.isRequestOnRequested()) {
// cannot schedule a request where the host is the requested host
taskRunner.changeUpdatedInfo(UpdatedInfo.INTERRUPTED);
try {
taskRunner.update();
} catch (final WaarpDatabaseNoDataException e) {
logger.warn("Update failed, no transfer found");
}
continue;
}
// last check: number can have raised up since Commander checks
if (i > (tasks.length - 10) && internalRunner.nbInternalRunner() >=
Configuration.configuration.getRunnerThread()) {
break;
}
if (internalRunner.submitTaskRunner(taskRunner)) {
totalRuns++;
} else {
break;
}
}
}
}
}