RetrieveRunner.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.protocol.localhandler;
import io.netty.channel.ChannelFuture;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.file.DataBlock;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.context.R66FiniteDualStates;
import org.waarp.openr66.context.R66Result;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.filesystem.R66File;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.exception.OpenR66Exception;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
import org.waarp.openr66.protocol.localhandler.packet.EndRequestPacket;
import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
import org.waarp.openr66.protocol.utils.ChannelUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Retrieve transfer runner
*/
public class RetrieveRunner extends Thread {
private static final String END_RETRIEVE_IN_ERROR = "End Retrieve in Error";
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(RetrieveRunner.class);
private final R66Session session;
private final LocalChannelReference localChannelReference;
private boolean done;
protected final AtomicBoolean running = new AtomicBoolean(true);
private final String nameThread;
protected RetrieveRunner() {
// empty constructor
session = null;
localChannelReference = null;
nameThread = "RetrieveRunner: None";
setName(nameThread);
setDaemon(true);
}
/**
* @param session
*/
public RetrieveRunner(final R66Session session) {
this.session = session;
localChannelReference = this.session.getLocalChannelReference();
nameThread = "RetrieveRunner: " + localChannelReference.getLocalId();
setName(nameThread);
setDaemon(true);
}
/**
* Try to stop the runner
*/
public final void stopRunner() {
running.set(false);
}
@Override
public void run() {
boolean requestValidDone = false;
setName(nameThread);
try {
try {
if (session.getRunner().getGloballaststep() ==
TASKSTEP.POSTTASK.ordinal()) {
logger.warn("Restart from POSTTASK: EndTransfer");
// restart from PostTask global step so just end now
try {
ChannelUtils.writeEndTransfer(localChannelReference);
} catch (final OpenR66ProtocolPacketException e) {
transferInError(e);
logger.error(END_RETRIEVE_IN_ERROR);
return;
}
} else {
logger.debug("Start retrieve operation (send)");
final R66File r66File = session.getFile();
if (r66File == null) {
logger.error("R66File null : {}", r66File);
transferInError(
new OpenR66RunnerErrorException("R66File not setup"));
logger.info(END_RETRIEVE_IN_ERROR);
return;
} else {
r66File.retrieveBlocking(running);
}
}
} catch (final OpenR66RunnerErrorException e) {
transferInError(e);
logger.info(END_RETRIEVE_IN_ERROR);
return;
} catch (final OpenR66ProtocolSystemException e) {
transferInError(e);
logger.info(END_RETRIEVE_IN_ERROR);
return;
} catch (final Exception e) {
logger.info("TRACE for unknown Exception ", e);
transferInError(new OpenR66RunnerErrorException(e));
logger.info(END_RETRIEVE_IN_ERROR);
return;
}
localChannelReference.getFutureEndTransfer().awaitOrInterruptible();
logger.debug("Await future End Transfer done: {}",
localChannelReference.getFutureEndTransfer().isSuccess());
if (localChannelReference.getFutureEndTransfer().isDone() &&
localChannelReference.getFutureEndTransfer().isSuccess()) {
// send a validation
localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
final EndRequestPacket validPacket =
new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
if (session.getExtendedProtocol() &&
session.getBusinessObject() != null &&
session.getBusinessObject().getInfo(session) != null) {
validPacket.setOptional(session.getBusinessObject().getInfo(session));
}
try {
ChannelUtils.writeAbstractLocalPacket(localChannelReference,
validPacket, false);
requestValidDone = true;
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
}
if (!localChannelReference.getFutureRequest().awaitOrInterruptible(
Configuration.configuration.getTimeoutCon()) ||
Thread.interrupted()) {
// valid it however
finalizeInternal();
}
if (session.getRunner() != null &&
session.getRunner().isRequestOnRequested()) {
localChannelReference.close();
}
done = true;
} else {
checkDoneNotAnswered();
if (!localChannelReference.getFutureRequest().isDone()) {
R66Result result =
localChannelReference.getFutureEndTransfer().getResult();
if (result == null) {
result = new R66Result(session, false, ErrorCode.TransferError,
session.getRunner());
}
localChannelReference.invalidateRequest(result);
}
done = true;
logger.info(END_RETRIEVE_IN_ERROR);
}
} finally {
try {
if (!done) {
finalizeRequestDone(requestValidDone);
}
NetworkTransaction.normalEndRetrieve(localChannelReference);
} finally {
setName("Finished_" + nameThread);
}
}
}
private void finalizeInternal() {
session.getRunner().setAllDone();
try {
session.getRunner().saveStatus();
} catch (final OpenR66RunnerErrorException e) {
// ignore
}
localChannelReference.validateRequest(
localChannelReference.getFutureEndTransfer().getResult());
}
private boolean checkDoneNotAnswered() {
if (localChannelReference.getFutureEndTransfer().isDone()) {
// Done and Not Success => error
if (!localChannelReference.getFutureEndTransfer().getResult()
.isAnswered()) {
localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
final ErrorPacket error =
new ErrorPacket(localChannelReference.getErrorMessage(),
localChannelReference.getFutureEndTransfer()
.getResult().getCode()
.getCode(),
ErrorPacket.FORWARDCLOSECODE);
try {
ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
false);
} catch (final OpenR66ProtocolPacketException ignored) {
// ignore
}
}
return true;
}
return false;
}
private void finalizeRequestDone(final boolean requestValidDone) {
if (localChannelReference.getFutureEndTransfer().isDone() &&
localChannelReference.getFutureEndTransfer().isSuccess()) {
if (!requestValidDone) {
localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
final EndRequestPacket validPacket =
new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
if (session.getExtendedProtocol() &&
session.getBusinessObject() != null &&
session.getBusinessObject().getInfo(session) != null) {
validPacket.setOptional(session.getBusinessObject().getInfo(session));
}
try {
ChannelUtils.writeAbstractLocalPacket(localChannelReference,
validPacket, false);
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
}
}
finalizeInternal();
if (session.getRunner() != null &&
session.getRunner().isRequestOnRequested()) {
localChannelReference.close();
}
} else {
if (!checkDoneNotAnswered()) {
R66Result result =
localChannelReference.getFutureEndTransfer().getResult();
if (result == null) {
result = new R66Result(session, false, ErrorCode.TransferError,
session.getRunner());
}
localChannelReference.invalidateRequest(result);
}
}
}
private void transferInError(final OpenR66Exception e) {
final R66Result result =
new R66Result(e, session, true, ErrorCode.TransferError,
session.getRunner());
logger.error("Transfer in error", e);
session.newState(R66FiniteDualStates.ERROR);
final ErrorPacket error =
new ErrorPacket("Transfer in error", ErrorCode.TransferError.getCode(),
ErrorPacket.FORWARDCLOSECODE);
try {
ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
false);
} catch (final OpenR66ProtocolPacketException ignored) {
// ignore
}
localChannelReference.invalidateRequest(result);
localChannelReference.close();
done = true;
}
/**
* Write the next block when the channel is ready to prevent OOM
*
* @param block
* @param localChannelReference
* @param digestGlobal
* @param digestBlock
*
* @return the ChannelFuture on the write operation
*
* @throws OpenR66ProtocolPacketException
*/
public static ChannelFuture writeWhenPossible(final DataBlock block,
final LocalChannelReference localChannelReference,
final FilesystemBasedDigest digestGlobal,
final FilesystemBasedDigest digestBlock)
throws OpenR66ProtocolPacketException {
return ChannelUtils.writeBackDataBlock(localChannelReference, digestGlobal,
block, digestBlock);
}
public final int getLocalId() {
return localChannelReference.getLocalId();
}
/**
* When submit RetrieveRunner cannot be done since Executor is already stopped
*/
public final void notStartRunner() {
transferInError(
new OpenR66RunnerErrorException("Cannot Start Runner: " + session));
stopRunner();
}
}