ExecOutputTask.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.context.task;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.waarp.commandexec.utils.LocalExecResult;
import org.waarp.common.command.exception.CommandAbstractException;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.context.R66Result;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.context.task.localexec.LocalExecClient;
import org.waarp.openr66.protocol.configuration.Configuration;
import java.io.File;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* Execute an external command and Use the output if an error occurs.<br>
* <p>
* The output is ignored if the command has a correct status.<br>
* In case of error, if the output finishes with <tt>NEWFINALNAME:xxx</tt> then
* this part is removed from the
* output and the xxx is used as the last valid name for the file (meaning the
* file was moved or renamed even
* in case of error)<br>
* <br>
* <p>
* waitForValidation (#NOWAIT#) must not be set since it will prevent to have
* the feedback in case of error.
* So it is ignored.
*/
public class ExecOutputTask extends AbstractExecTask {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(ExecOutputTask.class);
/**
* In the final line output, the filename must prefixed by the following
* field
*/
public static final String DELIMITER = "NEWFINALNAME:";
/**
* @param argRule
* @param delay
* @param argTransfer
* @param session
*/
public ExecOutputTask(final String argRule, final int delay,
final String argTransfer, final R66Session session) {
super(TaskType.EXECOUTPUT, delay, argRule, argTransfer, session);
}
@Override
public void run() {
/*
* First apply all replacements and format to argRule from context and argTransfer. Will call exec (from first
* element of resulting string) with arguments as the following value from the replacements. Return 0 if OK,
* else 1 for a warning else as an error. In case of an error (> 0), all the line from output will be send
* back to the partner with the Error code. No change is made to the file.
*/
logger.info("ExecOutput with {}:{} and {}", argRule, argTransfer, session);
final String finalname = applyTransferSubstitutions(argRule);
//
// Force the WaitForValidation
waitForValidation = true;
if (Configuration.configuration.isUseLocalExec() && useLocalExec) {
final LocalExecClient localExecClient = new LocalExecClient();
if (localExecClient.connect()) {
localExecClient.runOneCommand(finalname, delay, waitForValidation,
futureCompletion);
final LocalExecResult result = localExecClient.getLocalExecResult();
finalizeExec(result.getStatus(), result.getResult(), finalname);
localExecClient.disconnect();
return;
} // else continue
}
final PrepareCommandExec prepareCommandExec =
new PrepareCommandExec(this, finalname, false,
waitForValidation).invoke();
if (prepareCommandExec.isError()) {
return;
}
final CommandLine commandLine = prepareCommandExec.getCommandLine();
final DefaultExecutor defaultExecutor =
prepareCommandExec.getDefaultExecutor();
final PipedInputStream inputStream = prepareCommandExec.getInputStream();
final PipedOutputStream outputStream = prepareCommandExec.getOutputStream();
final PumpStreamHandler pumpStreamHandler =
prepareCommandExec.getPumpStreamHandler();
final ExecuteWatchdog watchdog = prepareCommandExec.getWatchdog();
final AllLineReader allLineReader = new AllLineReader(inputStream);
allLineReader.setName(
"LastLineReader" + session.getRunner().getSpecialId());
allLineReader.setDaemon(true);
Configuration.configuration.getExecutorService().execute(allLineReader);
final ExecuteCommand executeCommand =
new ExecuteCommand(this, commandLine, defaultExecutor, inputStream,
outputStream, pumpStreamHandler,
allLineReader).invoke();
if (executeCommand.isError()) {
return;
}
int status = executeCommand.getStatus();
final String newname;
if (defaultExecutor.isFailure(status) && watchdog != null &&
watchdog.killedProcess()) {
// kill by the watchdoc (time out)
status = -1;
newname = "TimeOut";
} else {
newname = allLineReader.getLastLine().toString();
}
finalizeExec(status, newname, commandLine.toString());
}
private void finalizeExec(final int status, final String newName,
final String commandLine) {
String newname = newName;
if (status == 0) {
final R66Result result =
new R66Result(session, true, ErrorCode.CompleteOk,
session.getRunner());
result.setOther(newName);
futureCompletion.setResult(result);
futureCompletion.setSuccess();
logger.info("Exec OK with {} returns {}", commandLine, newname);
} else if (status == 1) {
final R66Result result =
new R66Result(session, true, ErrorCode.Warning, session.getRunner());
result.setOther(newName);
futureCompletion.setResult(result);
logger.warn(
"Exec in warning with " + commandLine + " returns " + newname);
session.getRunner().setErrorExecutionStatus(ErrorCode.Warning);
futureCompletion.setSuccess();
} else {
final int pos = newname.lastIndexOf(DELIMITER);
if (pos >= 0) {
final String newfilename = newname.substring(pos + DELIMITER.length());
newname = newname.substring(0, pos);
if (newfilename.indexOf(' ') > 0) {
logger.warn(
"Exec returns a multiple string in final line: " + newfilename);
// XXX FIXME: should not split String[] args = newfilename.split(" ")
// newfilename = args[args.length - 1]
}
// now test if the previous file was deleted (should be)
final File file = new File(newfilename);
if (!file.exists()) {
logger.warn(
"New file does not exist at the end of the exec: " + newfilename);
}
// now replace the file with the new one
try {
session.getFile().replaceFilename(newfilename, true);
} catch (final CommandAbstractException e) {
logger.warn("Exec in warning with " + commandLine + " : {}",
e.getMessage());
}
session.getRunner().setFileMoved(newfilename, true);
}
logger.error("Status: " + status + " Exec in error with " + commandLine +
" returns " + newname);
final OpenR66RunnerErrorException exc = new OpenR66RunnerErrorException(
"<STATUS>" + status + "</STATUS><ERROR>" + newname + "</ERROR>");
futureCompletion.setFailure(exc);
}
}
@Override
final void finalizeFromError(final Runnable threadReader, final int status,
final CommandLine commandLine,
final Exception e) {
final String result =
((AllLineReader) threadReader).getLastLine().toString();
logger.error("Status: " + status + " Exec in error with " + commandLine +
" returns " + result);
final OpenR66RunnerErrorException exc = new OpenR66RunnerErrorException(
"<STATUS>" + status + "</STATUS><ERROR>" + result + "</ERROR>");
futureCompletion.setFailure(exc);
}
}