SpooledDirectoryTransfer.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.client;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.waarp.common.database.exception.WaarpDatabaseException;
import org.waarp.common.filemonitor.FileMonitor;
import org.waarp.common.filemonitor.FileMonitor.FileItem;
import org.waarp.common.filemonitor.FileMonitorCommandFactory;
import org.waarp.common.filemonitor.FileMonitorCommandRunnableFuture;
import org.waarp.common.filemonitor.RegexFileFilter;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.logging.WaarpSlf4JLoggerFactory;
import org.waarp.common.utility.WaarpShutdownHook;
import org.waarp.common.utility.WaarpSystemUtil;
import org.waarp.common.utility.WaarpThreadFactory;
import org.waarp.common.xml.XmlDecl;
import org.waarp.common.xml.XmlHash;
import org.waarp.common.xml.XmlType;
import org.waarp.common.xml.XmlUtil;
import org.waarp.common.xml.XmlValue;
import org.waarp.openr66.configuration.FileBasedConfiguration;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.context.R66Result;
import org.waarp.openr66.context.task.SpooledInformTask;
import org.waarp.openr66.database.data.DbRule;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.configuration.Messages;
import org.waarp.openr66.protocol.localhandler.packet.BusinessRequestPacket;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
import org.waarp.openr66.protocol.utils.R66Future;
import java.io.File;
import java.io.FileFilter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.waarp.common.database.DbConstant.*;
import static org.waarp.openr66.context.ErrorCode.*;
/**
* Direct Transfer from a client with or without database connection or Submit
* Transfer from a client with
* database connection to transfer files from a spooled directory to possibly
* multiple hosts at once.<br>
* -to Hosts will have to be separated by ','.<br>
* -rule Rule to be used to send files to partners<br>
* <br>
* Mandatory additional elements:<br>
* -directory source (directory to spooled on ; many directories can be
* specified using a comma separated list
* as "directory1,directory2,directory3")<br>
* -statusfile file (file to use as permanent status (if process is killed or
* aborts))<br>
* -stopfile file (file when created will stop the dameon)<br>
* Other options:<br>
* -info info to be send with the file as filetransfer information<br>
* -md5 for md5 option<br>
* -block size for block size specification<br>
* -nolog to prevent saving action locally<br>
* -regex regex (regular expression to filter file names from directory
* source)<br>
* -elapse elapse (elapse time in ms > 100 ms between 2 checks of the
* directory)<br>
* -submit (to submit only: default: only one between submit and direct is
* allowed)<br>
* -direct (to directly transfer only: only one between submit and direct is
* allowed)<br>
* -recursive (to scan recursively from the root)<br>
* -waarp WaarpHosts (seperated by ',') to inform of running spooled directory
* (information stays in memory of
* Waarp servers, not in database)<br>
* -name name to be used as name in list printing in Waarp servers. Note this
* name must be unique
* globally.<br>
* -elapseWaarp elapse to specify a specific timing > 1000ms between to
* information sent to Waarp servers
* (default: 5000ms)<br>
* -parallel to allow (default) parallelism between send actions and
* information<br>
* -sequential to not allow parallelism between send actions and
* information<br>
* -limitParallel limit to specify the number of concurrent actions in -direct
* mode only<br>
* -minimalSize limit to specify the minimal size of each file that will be
* transferred (default: no
* limit)<br>
* -notlogWarn | -logWarn to deactivate or activate (default) the logging in
* Warn mode of Send/Remove
* information of the spool<br>
*/
public class SpooledDirectoryTransfer implements Runnable {
public static final String NEEDFULL = "needfull";
public static final String PARTIALOK = "Validated";
/**
* Internal Logger
*/
protected static volatile WaarpLogger logger;
protected static String infoArgs =
Messages.getString("SpooledDirectoryTransfer.0"); //$NON-NLS-1$
protected static final String NO_INFO_ARGS = "noinfo";
protected final R66Future future;
public final String name;
protected final List<String> directory;
protected final String statusFile;
protected final String stopFile;
protected final String ruleName;
protected final String fileInfo;
protected final boolean isMD5;
protected final List<String> remoteHosts;
protected final String regexFilter;
protected final List<String> waarpHosts;
protected final int blocksize;
protected final long elapseTime;
protected final long elapseWaarpTime;
protected final boolean parallel;
protected final int limitParallelTasks;
protected final boolean submit;
protected final boolean nolog;
protected final boolean recurs;
protected final long minimalSize;
protected final boolean normalInfoAsWarn;
protected final boolean ignoreAlreadyUsed;
protected final NetworkTransaction networkTransaction;
protected FileMonitor monitor;
private long sent;
private long error;
/**
* @param future
* @param arguments
* @param networkTransaction
*/
public SpooledDirectoryTransfer(final R66Future future,
final Arguments arguments,
final NetworkTransaction networkTransaction) {
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
this.future = future;
this.name = arguments.name;
this.directory = arguments.localDirectory;
statusFile = arguments.statusFile;
stopFile = arguments.stopFile;
this.ruleName = arguments.rule;
this.fileInfo = arguments.fileInfo;
this.isMD5 = arguments.isMd5;
this.remoteHosts = arguments.remoteHosts;
this.blocksize = arguments.block;
regexFilter = arguments.regex;
elapseTime = arguments.elapsed;
this.submit = arguments.toSubmit;
this.nolog = arguments.noLog && !arguments.toSubmit;
AbstractTransfer.nolog = this.nolog;
recurs = arguments.recursive;
elapseWaarpTime = arguments.elapsedWaarp;
if (this.submit) {
this.parallel = false;
} else {
this.parallel = arguments.isParallel;
}
limitParallelTasks = arguments.limitParallel;
waarpHosts = arguments.waarpHosts;
this.minimalSize = arguments.minimalSize;
normalInfoAsWarn = arguments.logWarn;
this.ignoreAlreadyUsed = arguments.ignoreAlreadyUsed;
this.networkTransaction = networkTransaction;
}
@Override
public void run() {
setSent(0);
setError(0);
// first check if rule is for SEND
final DbRule dbrule;
try {
dbrule = new DbRule(ruleName);
} catch (final WaarpDatabaseException e1) {
logger.error(Messages.getString("Transfer.18") + ": {}",
e1.getMessage()); //$NON-NLS-1$
future.setFailure(e1);
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("Transfer.18") + e1.getMessage()));
}
return;
}
if (dbrule.isRecvMode()) {
logger.error(
Messages.getString("SpooledDirectoryTransfer.5")); //$NON-NLS-1$
future.cancel();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("SpooledDirectoryTransfer.5")));
}
return;
}
final File status = new File(statusFile);
if (status.isDirectory()) {
logger.error(
Messages.getString("SpooledDirectoryTransfer.6")); //$NON-NLS-1$
future.cancel();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("SpooledDirectoryTransfer.6")));
}
return;
}
final File stop = new File(stopFile);
if (stop.isDirectory()) {
logger.error(
Messages.getString("SpooledDirectoryTransfer.7")); //$NON-NLS-1$
future.cancel();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("SpooledDirectoryTransfer.7")));
}
return;
} else if (stop.exists()) {
logger.warn(
Messages.getString("SpooledDirectoryTransfer.8")); //$NON-NLS-1$
future.setSuccess();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("SpooledDirectoryTransfer.8")));
}
return;
}
for (final String dirname : directory) {
final File dir = new File(dirname);
if (!dir.isDirectory()) {
logger.error(Messages.getString("SpooledDirectoryTransfer.9") + " : " +
dir); //$NON-NLS-1$
future.cancel();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("SpooledDirectoryTransfer.9")));
}
return;
}
}
FileFilter filter = null;
if (regexFilter != null) {
filter = new RegexFileFilter(regexFilter, minimalSize);
} else if (minimalSize > 0) {
filter = new RegexFileFilter(minimalSize);
}
final FileMonitorCommandRunnableFuture commandValidFile =
initializeFileMonitorCommandRunnableFuture(status, stop, filter);
File dir;
if (!monitor.initialized()) {
// wrong
logger.error(
Messages.getString("Configuration.WrongInit") + " : already running");
future.cancel();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
new Exception(Messages.getString("Configuration.WrongInit") +
" : already running"));
}
return;
}
commandValidFile.setMonitor(monitor);
if (parallel) {
final FileMonitorCommandFactory factory =
new FileMonitorCommandFactory() {
@Override
public final FileMonitorCommandRunnableFuture create(
final FileItem fileItem) {
final SpooledRunner runner = new SpooledRunner(fileItem);
runner.setMonitor(monitor);
return runner;
}
};
monitor.setCommandValidFileFactory(factory, limitParallelTasks);
}
final FileMonitor monitorArg = monitor;
if (waarpHosts != null && !waarpHosts.isEmpty()) {
setWaarpHostCommand(monitorArg);
}
for (int i = 1; i < directory.size(); i++) {
dir = new File(directory.get(i));
monitor.addDirectory(dir);
}
logger.warn("SpooledDirectoryTransfer starts name:" + name + " directory:" +
directory + " statusFile:" + statusFile + " stopFile:" +
stopFile + " rulename:" + ruleName + " fileinfo:" + fileInfo +
" hosts:" + remoteHosts + " regex:" + regexFilter +
" minimalSize:" + minimalSize + " waarp:" + waarpHosts +
" elapse:" + elapseTime + " waarpElapse:" + elapseWaarpTime +
" parallel:" + parallel + " limitParallel:" +
limitParallelTasks + " submit:" + submit + " recursive:" +
recurs);
monitor.start();
monitor.waitForStopFile();
future.setSuccess();
if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
null) {
Configuration.configuration.getShutdownConfiguration().serviceFuture.setSuccess();
}
}
private void setWaarpHostCommand(final FileMonitor monitorArg) {
final FileMonitorCommandRunnableFuture waarpHostCommand;
waarpHostCommand = new FileMonitorCommandRunnableFuture() {
@Override
public void run(final FileItem notused) {
try {
Thread.currentThread().setName("FileMonitorInformation_" + name);
if (admin.getSession() != null && admin.getSession().isDisActive()) {
admin.getSession().checkConnectionNoException();
}
String status = monitorArg.getStatus();
if (normalInfoAsWarn) {
logger.warn("Will inform back Waarp hosts of current history: " +
monitorArg.getCurrentHistoryNb());
} else {
logger.info("Will inform back Waarp hosts of current history: {}",
monitorArg.getCurrentHistoryNb());
}
for (String host : waarpHosts) {
if (host == null) {
continue;
}
host = host.trim();
if (!host.isEmpty()) {
final R66Future r66Future = new R66Future(true);
final BusinessRequestPacket packet = new BusinessRequestPacket(
SpooledInformTask.class.getName() + ' ' + status, 0);
final BusinessRequest transaction =
new BusinessRequest(networkTransaction, r66Future, host,
packet);
transaction.run();
r66Future.awaitOrInterruptible();
if (!r66Future.isSuccess()) {
logger.info("Can't inform Waarp server: {} since {}", host,
r66Future.getCause());
} else {
final R66Result result = r66Future.getResult();
if (result == null) {
monitorArg.setNextAsFullStatus();
} else {
status = (String) result.getOther();
if (status == null || status.equalsIgnoreCase(NEEDFULL)) {
monitorArg.setNextAsFullStatus();
}
}
logger.debug("Inform back Waarp hosts over for: {}", host);
}
}
}
} catch (final Throwable e) {
logger.error("Issue during Waarp information", e);
// ignore
}
}
};
monitor.setCommandCheckIteration(waarpHostCommand);
monitor.setElapseWaarpTime(elapseWaarpTime);
}
private FileMonitorCommandRunnableFuture initializeFileMonitorCommandRunnableFuture(
final File status, final File stop, final FileFilter filter) {
// Will be used if no parallelism
final FileMonitorCommandRunnableFuture commandValidFile =
new SpooledRunner(null);
final FileMonitorCommandRunnableFuture waarpRemovedCommand =
new FileMonitorCommandRunnableFuture() {
@Override
public void run(final FileItem file) {
if (normalInfoAsWarn) {
logger.warn("File removed: {}", file.file);
} else {
logger.info("File removed: {}", file.file);
}
}
};
final File dir = new File(directory.get(0));
monitor = new FileMonitor(name, status, stop, dir, null, elapseTime, filter,
recurs, commandValidFile, waarpRemovedCommand,
null);
monitor.setIgnoreAlreadyUsed(ignoreAlreadyUsed);
return commandValidFile;
}
public void stop() {
if (monitor != null) {
logger.info("Stop Monitor");
monitor.stop();
logger.info("Monitor Stopped");
} else {
logger.warn("NO MONITOR found");
}
}
public class SpooledRunner extends FileMonitorCommandRunnableFuture {
private static final String REQUEST_INFORMATION_FAILURE =
"RequestInformation.Failure";
private static final String REMOTE2 = "</REMOTE>";
private static final String REMOTE = "<REMOTE>";
public SpooledRunner(final FileItem fileItem) {
super(fileItem);
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
}
@Override
public void run(final FileItem fileItem) {
setFileItem(fileItem);
checkReuse(ignoreAlreadyUsed);
if (admin.getSession() != null && admin.getSession().isDisActive()) {
admin.getSession().checkConnectionNoException();
}
boolean finalStatus = false;
int ko = 0;
long specialId =
remoteHosts.size() > 1? ILLEGALVALUE : fileItem.specialId;
long newSpecialId = ILLEGALVALUE;
// check if already launched before
if (isIgnored(ignoreAlreadyUsed)) {
// Nothing to do
return;
}
specialId = checkReuseUniqueHost(fileItem, specialId);
try {
for (String host : remoteHosts) {
if (host == null) {
continue;
}
host = host.trim();
if (!host.isEmpty()) {
final String filename = fileItem.file.getAbsolutePath();
logger.info(
"Launch transfer to " + host + " with file " + filename);
R66Future r66Future = new R66Future(true);
final String text;
if (submit) {
text = submitTransfer(specialId, host, filename, r66Future);
} else {
if (specialId != ILLEGALVALUE) {
// Clean previously transfer if any
cleanPreviousTransfer(specialId, host, filename, r66Future);
}
text = "Direct Transfer: ";
r66Future = new R66Future(true);
directTransfer(host, filename, r66Future, text);
}
r66Future.awaitOrInterruptible();
final R66Result r66result = r66Future.getResult();
if (r66Future.isSuccess()) {
finalStatus = true;
newSpecialId =
finalizeInSuccess(newSpecialId, host, text, r66result);
} else {
setError(getError() + 1);
ko++;
final DbTaskRunner runner;
if (r66result != null) {
String errMsg = "Unknown Error Message";
if (r66Future.getCause() != null) {
errMsg = r66Future.getCause().getMessage();
}
final boolean isConnectionImpossible =
r66result.getCode() == ErrorCode.ConnectionImpossible &&
!normalInfoAsWarn;
runner = r66result.getRunner();
if (runner != null) {
newSpecialId = koOnFoundRunner(host, text, runner, errMsg,
isConnectionImpossible);
} else {
ko = getKoOnNoRunner(ko, host, r66Future, text, r66result,
isConnectionImpossible);
}
} else {
logger.error(
text + Messages.getString(REQUEST_INFORMATION_FAILURE)
//$NON-NLS-1$
+ REMOTE + host + REMOTE2, r66Future.getCause());
}
}
}
}
} catch (final Throwable e) {
// catch any exception
logger.error("Error in SpooledDirectory", e);
finalStatus = false;
}
specialId = remoteHosts.size() > 1? ILLEGALVALUE : newSpecialId;
if (ko > 0) {
// If at least one is in error, the transfer is in error so should be redone
finalStatus = false;
}
finalizeValidFile(finalStatus, specialId);
}
private void directTransfer(final String host, final String filename,
final R66Future r66Future, final String text) {
final DirectTransfer transaction =
new DirectTransfer(r66Future, host, filename, ruleName, fileInfo,
isMD5, blocksize, ILLEGALVALUE,
networkTransaction);
if (!fileInfo.contains("-nofollow")) {
TransferArgs.forceAnalyzeFollow(transaction);
}
// If retry indefinitely is useful transaction.setLimitRetryConnection(true)
transaction.normalInfoAsWarn = normalInfoAsWarn;
logger.info("{}{}", text, host);
transaction.run();
}
private String submitTransfer(final long specialId, final String host,
final String filename,
final R66Future r66Future) {
final String text;
text = "Submit Transfer: ";
final SubmitTransfer transaction =
new SubmitTransfer(r66Future, host, filename, ruleName, fileInfo,
isMD5, blocksize, specialId, null);
if (!fileInfo.contains("-nofollow")) {
TransferArgs.forceAnalyzeFollow(transaction);
}
transaction.normalInfoAsWarn = normalInfoAsWarn;
logger.info("{}{}", text, host);
transaction.run();
return text;
}
private long koOnFoundRunner(final String host, final String text,
final DbTaskRunner runner, final String errMsg,
final boolean isConnectionImpossible) {
final long newSpecialId;
newSpecialId = runner.getSpecialId();
DbTaskRunner.removeNoDbSpecialId(newSpecialId);
if (isConnectionImpossible) {
if (logger.isInfoEnabled()) {
logger.info("{}{}{}{}{}{}{}{}", text,
Messages.getString(REQUEST_INFORMATION_FAILURE),
//$NON-NLS-1$
runner.toShortString(), REMOTE, host, "</REMOTE><REASON>",
errMsg, "</REASON>");
}
} else {
logger.error(text + Messages.getString(REQUEST_INFORMATION_FAILURE) +
//$NON-NLS-1$
runner.toShortString() + REMOTE + host +
"</REMOTE><REASON>" + errMsg + "</REASON>");
}
return newSpecialId;
}
private int getKoOnNoRunner(int ko, final String host,
final R66Future r66Future, final String text,
final R66Result r66result,
final boolean isConnectionImpossible) {
if (isConnectionImpossible) {
logger.info("{}{}{}{}{}", text,
Messages.getString(REQUEST_INFORMATION_FAILURE), REMOTE,
host, REMOTE2, r66Future.getCause());
} else {
if (r66result.getCode() == QueryRemotelyUnknown) {
logger.info("Transfer not found {}{}{}", REMOTE, host, REMOTE2);
// False negative
ko--;
setError(getError() - 1);
} else {
logger.error(
text + Messages.getString(REQUEST_INFORMATION_FAILURE) + REMOTE +
host + REMOTE2, r66Future.getCause());
}
}
return ko;
}
private long finalizeInSuccess(long newSpecialId, final String host,
final String text,
final R66Result r66result) {
setSent(getSent() + 1);
final DbTaskRunner runner;
if (r66result != null) {
runner = r66result.getRunner();
if (runner != null) {
newSpecialId = runner.getSpecialId();
String status =
Messages.getString("RequestInformation.Success"); //$NON-NLS-1$
if (runner.getErrorInfo() == ErrorCode.Warning) {
status =
Messages.getString("RequestInformation.Warned"); //$NON-NLS-1$
}
if (normalInfoAsWarn) {
logger.warn(
text + " status: " + status + " " + runner.toShortString() +
" <REMOTE>" + host + REMOTE2 + " <FILEFINAL>" +
(r66result.getFile() != null?
r66result.getFile() + "</FILEFINAL>" : "no file"));
} else if (logger.isInfoEnabled()) {
logger.info(
"{} status: {} {} <REMOTE>{}</REMOTE> <FILEFINAL>{}",
text, status, runner.toShortString(), host,
(r66result.getFile() != null?
r66result.getFile() + "</FILEFINAL>" : "no file"));
}
if (nolog && !submit) {
// In case of success, delete the runner
try {
runner.delete();
} catch (final WaarpDatabaseException e) {
logger.warn(
"Cannot apply nolog to " + runner.toShortString() +
" : {}", e.getMessage());
}
}
DbTaskRunner.removeNoDbSpecialId(newSpecialId);
} else {
if (normalInfoAsWarn) {
logger.warn(text + Messages.getString("RequestInformation.Success")
//$NON-NLS-1$
+ REMOTE + host + REMOTE2);
} else {
logger.info("{}{}{}{}{}", text,
Messages.getString("RequestInformation.Success"),
//$NON-NLS-1$
REMOTE, host, REMOTE2);
}
}
} else {
if (normalInfoAsWarn) {
logger.warn(text + Messages.getString("RequestInformation.Success")
//$NON-NLS-1$
+ REMOTE + host + REMOTE2);
} else {
logger.info("{}{}{}{}{}", text,
Messages.getString("RequestInformation.Success"),
//$NON-NLS-1$
REMOTE, host, REMOTE2);
}
}
return newSpecialId;
}
private void cleanPreviousTransfer(final long specialId, final String host,
final String filename,
final R66Future r66Future) {
final String text;
try {
final String srequester = Configuration.configuration.getHostId(host);
text =
"Request Transfer Cancelled: " + specialId + ' ' + filename + ' ';
// Cancel
logger.debug("Will try to cancel {}", specialId);
final RequestTransfer transaction2 =
new RequestTransfer(r66Future, specialId, host, srequester, true,
false, false, networkTransaction);
transaction2.normalInfoAsWarn = normalInfoAsWarn;
logger.warn(text + host);
transaction2.run();
// special task
r66Future.awaitOrInterruptible();
} catch (final WaarpDatabaseException e) {
if (admin.getSession() != null) {
admin.getSession().checkConnectionNoException();
}
logger.warn(Messages.getString("RequestTransfer.5") + host + " : {}",
e.getMessage()); //$NON-NLS-1$
}
}
private long checkReuseUniqueHost(final FileItem fileItem, long specialId) {
if (isReuse() && remoteHosts.size() == 1) {
// if specialId is not IllegalValue, then used was necessarily true
// before
if (!submit) {
// reset fileItem usage
setValid(fileItem);
} else {
// Cancel the unique previous transfer
String host = remoteHosts.get(0);
if (host == null) {
return specialId;
}
host = host.trim();
if (!host.isEmpty()) {
final String filename = fileItem.file.getAbsolutePath();
final String text =
"Request Transfer to be cancelled: " + fileItem.specialId +
' ' + filename + ' ';
try {
final R66Future r66Future = new R66Future(true);
final String srequester =
Configuration.configuration.getHostId(host);
// Try restart
final RequestTransfer transaction =
new RequestTransfer(r66Future, fileItem.specialId, host,
srequester, true, false, false,
networkTransaction);
transaction.normalInfoAsWarn = normalInfoAsWarn;
logger.info("{}{}", text, host);
// special task
transaction.run();
r66Future.awaitOrInterruptible();
// reset fileItem usage
setValid(fileItem);
specialId = fileItem.specialId;
} catch (final WaarpDatabaseException e) {
if (admin.getSession() != null) {
admin.getSession().checkConnectionNoException();
}
logger.warn(
Messages.getString("RequestTransfer.5") + host + " : {}",
e.getMessage()); //$NON-NLS-1$
}
}
}
}
return specialId;
}
}
/**
* Default arguments
*/
public static class Arguments {
private String name;
private final List<String> remoteHosts = new ArrayList<String>();
private final List<String> localDirectory = new ArrayList<String>();
private String rule;
private String fileInfo = NO_INFO_ARGS;
private boolean isMd5;
private int block = 0x10000; // 64K as default
private String statusFile;
private String stopFile;
private String regex;
private long elapsed = 1000;
private long elapsedWaarp = 5000;
private boolean toSubmit = true;
private boolean noLog;
private boolean recursive;
private final List<String> waarpHosts = new ArrayList<String>();
private boolean isParallel = true;
private int limitParallel;
private long minimalSize;
private boolean logWarn = true;
private boolean ignoreAlreadyUsed = false;
public final String getName() {
return name;
}
public final void setName(final String name) {
this.name = name;
}
public final List<String> getRemoteHosts() {
return remoteHosts;
}
public final List<String> getLocalDirectory() {
return localDirectory;
}
public final String getRule() {
return rule;
}
public final void setRule(final String rule) {
this.rule = rule;
}
public final String getFileInfo() {
return fileInfo;
}
public final void setFileInfo(final String fileInfo) {
this.fileInfo = fileInfo;
}
public final boolean isMd5() {
return isMd5;
}
public final void setMd5(final boolean md5) {
this.isMd5 = md5;
}
public final int getBlock() {
return block;
}
public final void setBlock(final int block) {
this.block = block;
}
public final String getStatusFile() {
return statusFile;
}
public final void setStatusFile(final String statusFile) {
this.statusFile = statusFile;
}
public final String getStopFile() {
return stopFile;
}
public final void setStopFile(final String stopFile) {
this.stopFile = stopFile;
}
public final String getRegex() {
return regex;
}
public final void setRegex(final String regex) {
this.regex = regex;
}
public final long getElapsed() {
return elapsed;
}
public final void setElapsed(final long elapsed) {
this.elapsed = elapsed;
}
public final long getElapsedWaarp() {
return elapsedWaarp;
}
public final void setElapsedWaarp(final long elapsedWaarp) {
this.elapsedWaarp = elapsedWaarp;
}
public final boolean isToSubmit() {
return toSubmit;
}
public final void setToSubmit(final boolean toSubmit) {
this.toSubmit = toSubmit;
}
public final boolean isNoLog() {
return noLog;
}
public final void setNoLog(final boolean noLog) {
this.noLog = noLog;
}
public final boolean isRecursive() {
return recursive;
}
public final void setRecursive(final boolean recursive) {
this.recursive = recursive;
}
public final List<String> getWaarpHosts() {
return waarpHosts;
}
public final boolean isParallel() {
return isParallel;
}
public final void setParallel(final boolean parallel) {
this.isParallel = parallel;
}
public final int getLimitParallel() {
return limitParallel;
}
public final void setLimitParallel(final int limitParallel) {
this.limitParallel = limitParallel;
}
public final long getMinimalSize() {
return minimalSize;
}
public final void setMinimalSize(final long minimalSize) {
this.minimalSize = minimalSize;
}
public final boolean isLogWarn() {
return logWarn;
}
public final void setLogWarn(final boolean logWarn) {
this.logWarn = logWarn;
}
public final boolean isIgnoreAlreadyUsed() {
return ignoreAlreadyUsed;
}
public final void setIgnoreAlreadyUsed(final boolean ignoreAlreadyUsed) {
this.ignoreAlreadyUsed = ignoreAlreadyUsed;
}
}
protected static final List<Arguments> arguments = new ArrayList<Arguments>();
private static final String XML_ROOT = "/config/";
private static final String XML_SPOOLEDDAEMON = "spooleddaemon";
private static final String XML_STOPFILE = "stopfile";
private static final String XML_SPOOLED = "spooled";
private static final String XML_NAME = "name";
private static final String XML_TO = "to";
private static final String XML_RULE = "rule";
private static final String XML_STATUSFILE = "statusfile";
private static final String XML_DIRECTORY = "directory";
private static final String XML_REGEX = "regex";
private static final String XML_RECURSIVE = "recursive";
private static final String XML_ELAPSE = "elapse";
private static final String XML_SUBMIT = "submit";
private static final String XML_PARALLEL = "parallel";
private static final String XML_LIMIT_PARALLEL = "limitParallel";
private static final String XML_INFO = "info";
private static final String XML_MD_5 = "md5";
private static final String XML_BLOCK = "block";
private static final String XML_NOLOG = "nolog";
private static final String XML_WAARP = "waarp";
private static final String XML_ELAPSE_WAARP = "elapseWaarp";
private static final String XML_MINIMAL_SIZE = "minimalSize";
private static final String XML_LOG_WARN = "logWarn";
private static final String XML_IGNORED_ALREADY_USED = "ignoreAlreadyUsed";
private static final XmlDecl[] subSpooled = {
new XmlDecl(XmlType.STRING, XML_NAME),
new XmlDecl(XML_TO, XmlType.STRING, XML_TO, true),
new XmlDecl(XmlType.STRING, XML_RULE),
new XmlDecl(XmlType.STRING, XML_STATUSFILE),
new XmlDecl(XML_DIRECTORY, XmlType.STRING, XML_DIRECTORY, true),
new XmlDecl(XmlType.STRING, XML_REGEX),
new XmlDecl(XmlType.BOOLEAN, XML_RECURSIVE),
new XmlDecl(XmlType.LONG, XML_ELAPSE),
new XmlDecl(XmlType.BOOLEAN, XML_SUBMIT),
new XmlDecl(XmlType.BOOLEAN, XML_PARALLEL),
new XmlDecl(XmlType.INTEGER, XML_LIMIT_PARALLEL),
new XmlDecl(XmlType.STRING, XML_INFO),
new XmlDecl(XmlType.BOOLEAN, XML_MD_5),
new XmlDecl(XmlType.INTEGER, XML_BLOCK),
new XmlDecl(XmlType.BOOLEAN, XML_NOLOG),
new XmlDecl(XML_WAARP, XmlType.STRING, XML_WAARP, true),
new XmlDecl(XmlType.LONG, XML_ELAPSE_WAARP),
new XmlDecl(XmlType.BOOLEAN, XML_IGNORED_ALREADY_USED),
new XmlDecl(XmlType.LONG, XML_MINIMAL_SIZE)
};
private static final XmlDecl[] spooled = {
new XmlDecl(XmlType.STRING, XML_STOPFILE),
new XmlDecl(XmlType.BOOLEAN, XML_LOG_WARN),
new XmlDecl(XML_SPOOLED, XmlType.XVAL, XML_SPOOLED, subSpooled, true)
};
private static final XmlDecl[] configSpooled = {
new XmlDecl(XML_SPOOLEDDAEMON, XmlType.XVAL, XML_ROOT + XML_SPOOLEDDAEMON,
spooled, false)
};
@SuppressWarnings("unchecked")
protected static boolean getParamsFromConfigFile(final String filename) {
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
final Document document;
// Open config file
try {
document = XmlUtil.getNewSaxReader().read(filename);
} catch (final DocumentException e) {
logger.error(Messages.getString("FileBasedConfiguration.CannotReadXml") +
filename + ": {}", e.getMessage()); //$NON-NLS-1$
return false;
}
if (document == null) {
logger.error(Messages.getString("FileBasedConfiguration.CannotReadXml") +
filename); //$NON-NLS-1$
return false;
}
final XmlValue[] configuration = XmlUtil.read(document, configSpooled);
final XmlHash hashConfig = new XmlHash(configuration);
XmlValue value = hashConfig.get(XML_STOPFILE);
final String stopfile;
if (value == null || value.isEmpty()) {
return false;
}
stopfile = value.getString();
value = hashConfig.get(XML_LOG_WARN);
boolean logWarn = true;
if (value != null && !value.isEmpty()) {
logWarn = value.getBoolean();
}
value = hashConfig.get(XML_SPOOLED);
if (value != null && value.getList() != null) {
for (final XmlValue[] xml : (Iterable<XmlValue[]>) value.getList()) {
final Arguments arg = new Arguments();
arg.setStopFile(stopfile);
arg.setLogWarn(logWarn);
final XmlHash subHash = new XmlHash(xml);
value = subHash.get(XML_NAME);
if (value != null && !value.isEmpty()) {
arg.setName(value.getString());
}
value = subHash.get(XML_TO);
if (value != null && value.getList() != null) {
for (final String to : (Iterable<String>) value.getList()) {
if (to.trim().isEmpty()) {
continue;
}
arg.getRemoteHosts().add(to.trim());
}
if (arg.getRemoteHosts().isEmpty()) {
logger.warn("to directive is empty but must not");
continue;
}
} else {
logger.warn("to directive is empty but must not");
continue;
}
value = subHash.get(XML_RULE);
if (value != null && !value.isEmpty()) {
arg.setRule(value.getString());
} else {
logger.warn("rule directive is empty but must not");
continue;
}
value = subHash.get(XML_STATUSFILE);
if (value != null && !value.isEmpty()) {
arg.setStatusFile(value.getString());
} else {
logger.warn("statusfile directive is empty but must not");
continue;
}
value = subHash.get(XML_DIRECTORY);
if (value != null && value.getList() != null) {
for (final String dir : (Iterable<String>) value.getList()) {
if (dir.trim().isEmpty()) {
continue;
}
arg.getLocalDirectory().add(dir.trim());
}
if (arg.getLocalDirectory().isEmpty()) {
logger.warn("directory directive is empty but must not");
continue;
}
} else {
logger.warn("directory directive is empty but must not");
continue;
}
value = subHash.get(XML_REGEX);
if (value != null && !value.isEmpty()) {
arg.setRegex(value.getString());
}
value = subHash.get(XML_RECURSIVE);
if (value != null && !value.isEmpty()) {
arg.setRecursive(value.getBoolean());
}
value = subHash.get(XML_ELAPSE);
if (value != null && !value.isEmpty()) {
arg.setElapsed(value.getLong());
}
value = subHash.get(XML_SUBMIT);
if (value != null && !value.isEmpty()) {
arg.setToSubmit(value.getBoolean());
}
value = subHash.get(XML_PARALLEL);
if (value != null && !value.isEmpty()) {
arg.setParallel(value.getBoolean());
}
value = subHash.get(XML_LIMIT_PARALLEL);
if (value != null && !value.isEmpty()) {
arg.setLimitParallel(value.getInteger());
}
value = subHash.get(XML_INFO);
if (value != null && !value.isEmpty()) {
arg.setFileInfo(value.getString());
}
value = subHash.get(XML_MD_5);
if (value != null && !value.isEmpty()) {
arg.setMd5(value.getBoolean());
}
value = subHash.get(XML_BLOCK);
if (value != null && !value.isEmpty()) {
arg.setBlock(value.getInteger());
}
value = subHash.get(XML_NOLOG);
if (value != null && !value.isEmpty()) {
arg.setNoLog(value.getBoolean());
}
value = subHash.get(XML_WAARP);
if (value != null && value.getList() != null) {
for (final String host : (Iterable<String>) value.getList()) {
if (host.trim().isEmpty()) {
continue;
}
arg.getWaarpHosts().add(host.trim());
}
}
value = subHash.get(XML_ELAPSE_WAARP);
if (value != null && !value.isEmpty()) {
arg.setElapsedWaarp(value.getLong());
}
value = subHash.get(XML_MINIMAL_SIZE);
if (value != null && !value.isEmpty()) {
arg.setMinimalSize(value.getLong());
}
value = subHash.get(XML_IGNORED_ALREADY_USED);
if (value != null && !value.isEmpty()) {
arg.setIgnoreAlreadyUsed(value.getBoolean());
}
arguments.add(arg);
}
}
hashConfig.clear();
return !arguments.isEmpty();
}
/**
* Parse the parameter and set current values
*
* @param args
*
* @return True if all parameters were found and correct
*/
protected static boolean getParams(final String[] args) {
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
infoArgs = Messages.getString("SpooledDirectoryTransfer.0"); //$NON-NLS-1$
if (args.length < 1) {
logger.error(infoArgs);
return false;
}
if (!FileBasedConfiguration.setClientConfigurationFromXml(
Configuration.configuration, args[0])) {
logger.error(
Messages.getString("Configuration.NeedCorrectConfig")); //$NON-NLS-1$
return false;
}
// Now check if the configuration file contains already elements of specifications
if (!getParamsFromConfigFile(args[0])) {
if (args.length < 11) {
logger.error(infoArgs);
return false;
}
// Now set default values from configuration
final Arguments arg = new Arguments();
arg.setBlock(Configuration.configuration.getBlockSize());
int i = 1;
try {
for (i = 1; i < args.length; i++) {
if ("-to".equalsIgnoreCase(args[i])) {
i++;
final String[] rhosts = args[i].split(",");
for (String string : rhosts) {
string = string.trim();
if (string.isEmpty()) {
continue;
}
if (Configuration.configuration.getAliases()
.containsKey(string)) {
string = Configuration.configuration.getAliases().get(string);
}
arg.getRemoteHosts().add(string);
}
} else if ("-name".equalsIgnoreCase(args[i])) {
i++;
arg.setName(args[i]);
} else if ("-directory".equalsIgnoreCase(args[i])) {
i++;
final String[] dir = args[i].split(",");
for (final String string : dir) {
if (string.trim().isEmpty()) {
continue;
}
arg.getLocalDirectory().add(string.trim());
}
} else if ("-rule".equalsIgnoreCase(args[i])) {
i++;
arg.setRule(args[i]);
} else if ("-statusfile".equalsIgnoreCase(args[i])) {
i++;
arg.setStatusFile(args[i]);
} else if ("-stopfile".equalsIgnoreCase(args[i])) {
i++;
arg.setStopFile(args[i]);
} else if ("-info".equalsIgnoreCase(args[i])) {
i++;
arg.setFileInfo(args[i]);
} else if ("-md5".equalsIgnoreCase(args[i])) {
arg.setMd5(true);
} else if ("-block".equalsIgnoreCase(args[i])) {
i++;
arg.setBlock(Integer.parseInt(args[i]));
if (arg.getBlock() < 100) {
logger.error(Messages.getString("AbstractTransfer.1") +
arg.getBlock()); //$NON-NLS-1$
return false;
}
} else if ("-nolog".equalsIgnoreCase(args[i])) {
arg.setNoLog(true);
} else if ("-submit".equalsIgnoreCase(args[i])) {
arg.setToSubmit(true);
} else if ("-direct".equalsIgnoreCase(args[i])) {
arg.setToSubmit(false);
} else if ("-recursive".equalsIgnoreCase(args[i])) {
arg.setRecursive(true);
} else if ("-logWarn".equalsIgnoreCase(args[i])) {
arg.setLogWarn(true);
} else if ("-notlogWarn".equalsIgnoreCase(args[i])) {
arg.setLogWarn(false);
} else if ("-regex".equalsIgnoreCase(args[i])) {
i++;
arg.setRegex(args[i]);
} else if ("-waarp".equalsIgnoreCase(args[i])) {
i++;
final String[] host = args[i].split(",");
for (final String string : host) {
if (string.trim().isEmpty()) {
continue;
}
arg.getWaarpHosts().add(string.trim());
}
} else if ("-elapse".equalsIgnoreCase(args[i])) {
i++;
arg.setElapsed(Long.parseLong(args[i]));
} else if ("-elapseWaarp".equalsIgnoreCase(args[i])) {
i++;
arg.setElapsedWaarp(Long.parseLong(args[i]));
} else if ("-minimalSize".equalsIgnoreCase(args[i])) {
i++;
arg.setMinimalSize(Long.parseLong(args[i]));
} else if ("-limitParallel".equalsIgnoreCase(args[i])) {
i++;
arg.setLimitParallel(Integer.parseInt(args[i]));
} else if ("-parallel".equalsIgnoreCase(args[i])) {
arg.setParallel(true);
} else if ("-sequential".equalsIgnoreCase(args[i])) {
arg.setParallel(false);
} else if ("-ignoreAlreadyUsed".equalsIgnoreCase(args[i])) {
arg.setIgnoreAlreadyUsed(true);
}
}
} catch (final NumberFormatException e) {
logger.error(
Messages.getString("AbstractTransfer.20") + i); //$NON-NLS-1$
return false;
}
if (arg.getFileInfo() == null) {
arg.setFileInfo(NO_INFO_ARGS);
}
if (arg.getName() == null) {
arg.setName(Configuration.configuration.getHostId() + " : " +
arg.getLocalDirectory());
}
if (!arg.getRemoteHosts().isEmpty() && arg.getRule() != null &&
!arg.getLocalDirectory().isEmpty() && arg.getStatusFile() != null &&
arg.getStopFile() != null) {
arguments.add(arg);
return true;
}
logger.error(Messages.getString("SpooledDirectoryTransfer.56") +
//$NON-NLS-1$
infoArgs);
return false;
}
return !arguments.isEmpty();
}
public static void main(final String[] args) {
WaarpLoggerFactory.setDefaultFactoryIfNotSame(
new WaarpSlf4JLoggerFactory(null));
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
initialize(args, true);
}
public static final List<SpooledDirectoryTransfer> list =
new ArrayList<SpooledDirectoryTransfer>();
public static NetworkTransaction networkTransactionStatic;
public static ExecutorService executorService;
/**
* @param args
* @param normalStart if True, will exit JVM when all daemons are
* stopped;
* else False let the caller do (used
* by SpooledEngine)
*/
public static boolean initialize(final String[] args,
final boolean normalStart) {
if (logger == null) {
logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
}
arguments.clear();
if (!getParams(args)) {
logger.error(Messages.getString("Configuration.WrongInit")); //$NON-NLS-1$
if (admin != null) {
admin.close();
}
if (normalStart) {
WaarpSystemUtil.systemExit(2);
}
return false;
}
Configuration.configuration.pipelineInit();
networkTransactionStatic = new NetworkTransaction();
try {
executorService = Executors.newCachedThreadPool(
new WaarpThreadFactory("SpooledDirectoryDaemon"));
for (final Arguments arg : arguments) {
final R66Future future = new R66Future(true);
final SpooledDirectoryTransfer spooled =
new SpooledDirectoryTransfer(future, arg, networkTransactionStatic);
executorService.submit(spooled);
list.add(spooled);
}
arguments.clear();
Thread.sleep(1000);
executorService.shutdown();
Configuration.configuration.launchStatistics();
if (normalStart) {
while (!executorService.awaitTermination(
Configuration.configuration.getTimeoutCon(),
TimeUnit.MILLISECONDS)) {
Thread.sleep(Configuration.configuration.getTimeoutCon());
}
for (final SpooledDirectoryTransfer spooledDirectoryTransfer : list) {
logger.warn(Messages.getString("SpooledDirectoryTransfer.58") +
spooledDirectoryTransfer.name + ": " +
spooledDirectoryTransfer.getSent() + " success, " +
spooledDirectoryTransfer.getError() + Messages.getString(
"SpooledDirectoryTransfer.60")); //$NON-NLS-1$
}
list.clear();
}
return true;
} catch (final Throwable e) {
logger.error("Exception", e);
return false;
} finally {
if (normalStart) {
WaarpShutdownHook.shutdownWillStart();
networkTransactionStatic.closeAll();
WaarpSystemUtil.systemExit(0);
}
}
}
/**
* @return the sent
*/
public final long getSent() {
return sent;
}
/**
* @param sent the sent to set
*/
private void setSent(final long sent) {
this.sent = sent;
}
/**
* @return the error
*/
public final long getError() {
return error;
}
/**
* @param error the error to set
*/
private void setError(final long error) {
this.error = error;
}
}