R66EmbeddedServiceImpl.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.thrift;
import org.apache.thrift.TException;
import org.waarp.common.command.exception.CommandAbstractException;
import org.waarp.common.database.data.AbstractDbData;
import org.waarp.common.database.exception.WaarpDatabaseException;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.openr66.client.AbstractTransfer;
import org.waarp.openr66.commander.ClientRunner;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.filesystem.R66File;
import org.waarp.openr66.database.data.DbRule;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
import org.waarp.openr66.protocol.localhandler.LocalServerHandler;
import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
import org.waarp.openr66.protocol.utils.TransferUtils;
import org.waarp.thrift.r66.Action;
import org.waarp.thrift.r66.ErrorCode;
import org.waarp.thrift.r66.R66Request;
import org.waarp.thrift.r66.R66Result;
import org.waarp.thrift.r66.R66Service;
import org.waarp.thrift.r66.RequestMode;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.waarp.common.database.DbConstant.*;
/**
* Embedded service attached with the Thrift service
*/
public class R66EmbeddedServiceImpl implements R66Service.Iface {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(R66EmbeddedServiceImpl.class);
private DbTaskRunner initRequest(final R66Request request) {
Timestamp ttimestart = null;
if (request.isSetStart()) {
final Date date;
try {
final SimpleDateFormat dateFormat =
new SimpleDateFormat(AbstractTransfer.TIMESTAMP_FORMAT);
date = dateFormat.parse(request.getStart());
ttimestart = new Timestamp(date.getTime());
} catch (final ParseException ignored) {
// nothing
}
} else if (request.isSetDelay()) {
if (request.getDelay().charAt(0) == '+') {
ttimestart = new Timestamp(System.currentTimeMillis() + Long.parseLong(
request.getDelay().substring(1)));
} else {
ttimestart = new Timestamp(Long.parseLong(request.getDelay()));
}
}
final DbRule rule;
try {
rule = new DbRule(request.getRule());
} catch (final WaarpDatabaseException e) {
logger.warn("Cannot get Rule: " + request.getRule() + " : {}",
e.getMessage());
return null;
}
int mode = rule.getMode();
if (request.isMd5()) {
mode = RequestPacket.getModeMD5(mode);
}
final DbTaskRunner taskRunner;
long tid = ILLEGALVALUE;
if (request.isSetTid()) {
tid = request.getTid();
}
if (tid != ILLEGALVALUE) {
try {
taskRunner = new DbTaskRunner(tid, request.getDestuid());
// requested
taskRunner.setSenderByRequestToValidate(true);
} catch (final WaarpDatabaseException e) {
logger.warn("Cannot get task" + " : {}", e.getMessage());
return null;
}
} else {
final String sep =
PartnerConfiguration.getSeparator(request.getDestuid());
final RequestPacket requestPacket =
new RequestPacket(request.getRule(), mode, request.getFile(),
request.getBlocksize(), 0, tid, request.getInfo(),
-1, sep);
// Not isRecv since it is the requester, so send => isRetrieve is true
final boolean isRetrieve =
!RequestPacket.isRecvMode(requestPacket.getMode());
try {
taskRunner = new DbTaskRunner(rule, isRetrieve, requestPacket,
request.getDestuid(), ttimestart);
} catch (final WaarpDatabaseException e) {
logger.warn("Cannot get task" + " : {}", e.getMessage());
return null;
}
}
return taskRunner;
}
@Override
public R66Result transferRequestQuery(final R66Request request)
throws TException {
final DbTaskRunner runner = initRequest(request);
if (runner != null) {
runner.changeUpdatedInfo(AbstractDbData.UpdatedInfo.TOSUBMIT);
final boolean isSender = runner.isSender();
if (!runner.forceSaveStatus()) {
logger.warn("Cannot prepare task");
return new R66Result(request.getMode(), ErrorCode.CommandNotFound,
"ERROR: Cannot prepare transfer");
}
final R66Result result =
new R66Result(request.getMode(), ErrorCode.InitOk,
"Transfer Scheduled");
if (request.getMode() == RequestMode.SYNCTRANSFER) {
// now need to wait but first, reload the runner
try {
runner.select();
while (!runner.isFinished()) {
try {
Thread.sleep(1000);
runner.select();
} catch (final InterruptedException e) {//NOSONAR
SysErrLogger.FAKE_LOGGER.ignoreLog(e);
break;
}
}
runner.setSender(isSender);
} catch (final WaarpDatabaseException ignored) {
// nothing
}
setResultFromRunner(runner, result);
if (runner.isAllDone()) {
result.setCode(ErrorCode.CompleteOk);
result.setResultinfo("Transfer Done");
} else {
result.setCode(ErrorCode.valueOf(runner.getErrorInfo().name()));
result.setResultinfo(runner.getErrorInfo().getMesg());
}
} else {
try {
runner.select();
} catch (final WaarpDatabaseException ignored) {
// nothing
}
runner.setSender(isSender);
setResultFromRunner(runner, result);
}
return result;
} else {
logger.warn("ERROR: Transfer NOT scheduled");
return new R66Result(request.getMode(), ErrorCode.Internal,
"ERROR: Transfer NOT scheduled");
}
}
private void setResultFromRunner(final DbTaskRunner runner,
final R66Result result) {
result.setDestuid(runner.getRequested());
result.setFromuid(runner.getRequester());
result.setTid(runner.getSpecialId());
result.setRule(runner.getRuleId());
result.setBlocksize(runner.getBlocksize());
result.setFile(runner.getFilename());
result.setOriginalfilename(runner.getOriginalFilename());
result.setIsmoved(runner.isFileMoved());
result.setModetransfer(runner.getMode());
result.setRetrievemode(runner.isSender());
result.setStep(runner.getStep());
result.setGloballaststep(runner.getGloballaststep());
result.setRank(runner.getRank());
result.setStart(runner.getStart().toString());
result.setStop(runner.getStop().toString());
result.setResultinfo(runner.getFileInformation());
}
private void setResultFromLCR(final LocalChannelReference lcr,
final R66Result result) {
final R66Session session = lcr.getSession();
DbTaskRunner runner = null;
if (session != null) {
runner = session.getRunner();
} else {
final ClientRunner run = lcr.getClientRunner();
if (run != null) {
runner = run.getTaskRunner();
}
}
if (runner != null) {
setResultFromRunner(runner, result);
}
}
private int stopOrCancelRunner(final long id, final String reqd,
final String reqr,
final org.waarp.openr66.context.ErrorCode code) {
try {
final DbTaskRunner taskRunner =
new DbTaskRunner(null, null, id, reqr, reqd);
return taskRunner.stopOrCancelRunner(code)? 1 : 0;
} catch (final WaarpDatabaseException ignored) {
// nothing
}
logger.warn(
"Cannot accomplished action on task: " + id + ' ' + code.name());
return -1;
}
private R66Result stopOrCancel(final R66Request request,
final LocalChannelReference lcr,
final org.waarp.openr66.context.ErrorCode r66code) {
// stop the current transfer
final R66Result resulttest;
if (lcr != null) {
int rank = 0;
if (r66code == org.waarp.openr66.context.ErrorCode.StoppedTransfer &&
lcr.getSession() != null) {
final DbTaskRunner taskRunner = lcr.getSession().getRunner();
if (taskRunner != null) {
rank = taskRunner.getRank();
}
}
final ErrorPacket error =
new ErrorPacket(r66code.name() + ' ' + rank, r66code.getCode(),
ErrorPacket.FORWARDCLOSECODE);
try {
// inform local instead of remote
LocalServerHandler.channelRead0(lcr, error);
} catch (final Exception ignored) {
// nothing
}
resulttest = new R66Result(request.getMode(), ErrorCode.CompleteOk,
r66code.name());
setResultFromLCR(lcr, resulttest);
} else {
// Transfer is not running
// but maybe need action on database
final int test =
stopOrCancelRunner(request.getTid(), request.getDestuid(),
request.getFromuid(), r66code);
if (test > 0) {
resulttest = new R66Result(request.getMode(), ErrorCode.CompleteOk,
r66code.name());
} else if (test == 0) {
resulttest = new R66Result(request.getMode(), ErrorCode.TransferOk,
r66code.name());
} else {
resulttest = new R66Result(request.getMode(), ErrorCode.CommandNotFound,
"Error: cannot accomplished task on transfer");
}
}
return resulttest;
}
private R66Result restart(final R66Request request,
final LocalChannelReference lcr) {
// Try to validate a restarting transfer
// validLimit on requested side
if (Configuration.configuration.getConstraintLimitHandler()
.checkConstraints()) {
logger.warn(
"Limit exceeded {} while asking to relaunch a task " + request,
Configuration.configuration.getConstraintLimitHandler().lastAlert);
return new R66Result(request.getMode(), ErrorCode.ServerOverloaded,
"Limit exceeded while asking to relaunch a task");
}
// Try to validate a restarting transfer
// header = ?; middle = requested+blank+requester+blank+specialId
final DbTaskRunner taskRunner;
try {
taskRunner =
new DbTaskRunner(null, null, request.getTid(), request.getFromuid(),
request.getDestuid());
final org.waarp.openr66.context.R66Result resulttest =
TransferUtils.restartTransfer(taskRunner, lcr);
return new R66Result(request.getMode(),
ErrorCode.valueOf(resulttest.getCode().name()),
resulttest.getMessage());
} catch (final WaarpDatabaseException e1) {
logger.warn("Exception while trying to restart transfer" + " : {}",
e1.getMessage());
return new R66Result(request.getMode(), ErrorCode.Internal,
"Exception while trying to restart transfer");
}
}
@Override
public R66Result infoTransferQuery(final R66Request request)
throws TException {
final RequestMode mode = request.getMode();
if (mode != RequestMode.INFOREQUEST) {
// error
logger.warn("Mode is uncompatible with infoTransferQuery");
return new R66Result(request.getMode(), ErrorCode.Unimplemented,
"Mode is uncompatible with infoTransferQuery");
}
// now check if enough arguments are provided
if (!request.isSetTid() ||
!request.isSetDestuid() && !request.isSetFromuid() ||
!request.isSetAction()) {
// error
logger.warn("Not enough arguments");
return new R66Result(request.getMode(), ErrorCode.RemoteError,
"Not enough arguments");
}
// requested+blank+requester+blank+specialId
final LocalChannelReference lcr =
Configuration.configuration.getLocalTransaction().getFromRequest(
request.getDestuid() + ' ' + request.getFromuid() + ' ' +
request.getTid());
final org.waarp.openr66.context.ErrorCode r66code;
switch (request.getAction()) {
case Detail: {
final R66Result result =
new R66Result(request.getMode(), ErrorCode.CompleteOk,
"Existence test OK");
result.setAction(Action.Exist);
result.setDestuid(request.getDestuid());
result.setFromuid(request.getFromuid());
result.setTid(request.getTid());
if (lcr != null) {
setResultFromLCR(lcr, result);
} else {
try {
final DbTaskRunner runner =
new DbTaskRunner(null, null, request.getTid(),
request.getFromuid(), request.getDestuid());
setResultFromRunner(runner, result);
} catch (final WaarpDatabaseException e) {
result.setCode(ErrorCode.FileNotFound);
}
}
return result;
}
case Restart:
return restart(request, lcr);
case Cancel:
r66code = org.waarp.openr66.context.ErrorCode.CanceledTransfer;
return stopOrCancel(request, lcr, r66code);
case Stop:
r66code = org.waarp.openr66.context.ErrorCode.StoppedTransfer;
return stopOrCancel(request, lcr, r66code);
default:
logger.warn("Uncompatible with " + request.getAction().name());
return new R66Result(request.getMode(), ErrorCode.Unimplemented,
"Uncompatible with " + request.getAction().name());
}
}
@Override
public final boolean isStillRunning(final String fromuid, final String touid,
final long tid) throws TException {
// now check if enough arguments are provided
if (fromuid == null || touid == null || tid == ILLEGALVALUE) {
// error
logger.warn("Not enough arguments");
return false;
}
// header = ?; middle = requested+blank+requester+blank+specialId
final LocalChannelReference lcr =
Configuration.configuration.getLocalTransaction().getFromRequest(
touid + ' ' + fromuid + ' ' + tid);
return lcr != null;
}
@Override
public final List<String> infoListQuery(final R66Request request)
throws TException {
List<String> list = new ArrayList<String>();
final RequestMode mode = request.getMode();
if (mode != RequestMode.INFOFILE) {
// error
logger.warn("Not correct mode for infoListQuery");
list.add("Not correct mode for infoListQuery");
return list;
}
// now check if enough arguments are provided
if (!request.isSetRule() || !request.isSetAction()) {
// error
logger.warn("Not enough arguments");
list.add("Not enough arguments");
return list;
}
final R66Session session = new R66Session(false);
session.getAuth().specialNoSessionAuth(false,
Configuration.configuration.getHostId());
final DbRule rule;
try {
rule = new DbRule(request.getRule());
} catch (final WaarpDatabaseException e) {
logger.warn("Rule is unknown: " + request.getRule());
list.add("Rule is unknown: " + request.getRule());
return list;
}
try {
if (RequestPacket.isRecvMode(rule.getMode())) {
session.getDir().changeDirectory(rule.getWorkPath());
} else {
session.getDir().changeDirectory(rule.getSendPath());
}
if (request.getAction() == Action.List ||
request.getAction() == Action.Mlsx) {
// ls or mls from current directory
if (request.getAction() == Action.List) {
list = session.getDir().list(request.getFile());
} else {
list = session.getDir().listFull(request.getFile(), false);
}
} else {
// ls pr mls from current directory and filename
if (!request.isSetFile()) {
logger.warn("File missing");
list.add("File missing");
return list;
}
final R66File file =
(R66File) session.getDir().setFile(request.getFile(), false);
String sresult;
if (request.getAction() == Action.Exist) {
sresult = String.valueOf(file.exists());
list.add(sresult);
} else if (request.getAction() == Action.Detail) {
sresult = session.getDir().fileFull(request.getFile(), false);
final String[] slist = sresult.split("\n");
sresult = slist[1];
list.add(sresult);
}
}
return list;
} catch (final CommandAbstractException e) {
logger.warn("Error occurs during: " + request + " : {}", e.getMessage());
list.add("Error occurs during: " + request);
return list;
}
}
}