LocalChannelReference.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.protocol.localhandler;
import io.netty.channel.Channel;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import org.waarp.common.database.DbSession;
import org.waarp.common.guid.IntegerUuid;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.openr66.client.RecvThroughHandler;
import org.waarp.openr66.commander.ClientRunner;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.context.R66FiniteDualStates;
import org.waarp.openr66.context.R66Result;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
import org.waarp.openr66.protocol.exception.OpenR66Exception;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
import org.waarp.openr66.protocol.networkhandler.NetworkServerInitializer;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
import org.waarp.openr66.protocol.utils.R66Future;
import org.waarp.openr66.protocol.utils.R66Versions;
import static org.waarp.common.database.DbConstant.*;
/**
* Reference of one object using Local Channel localId and containing local
* channel and network channel.
*/
public class LocalChannelReference {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(LocalChannelReference.class);
/**
* Network Channel Ref
*/
private final NetworkChannelReference networkChannelRef;
/**
* Traffic handler associated if any
*/
private final ChannelTrafficShapingHandler cts;
/**
* Network Server Handler
*/
private final NetworkServerHandler networkServerHandler;
/**
* Server Actions handler
*/
private final TransferActions serverHandler = new TransferActions();
/**
* Local Id
*/
private final Integer localId;
/**
* Remote Id
*/
private Integer remoteId;
/**
* Requested_requester_specialId
*/
private String requestId;
/**
* Future on Global Request
*/
private final R66Future futureRequest;
/**
* Future on Valid Starting Request
*/
private final R66Future futureValidRequest = new R66Future(true);
/**
* Future on Transfer if any
*/
private R66Future futureEndTransfer = new R66Future(true);
/**
* Future on Connection
*/
private final R66Future futureConnection = new R66Future(true);
/**
* Future on Startup
*/
private final R66Future futureStartup = new R66Future(true);
/**
* Session
*/
private R66Session session;
/**
* Last error message
*/
private String errorMessage = "NoError";
/**
* Last error code
*/
private ErrorCode code = ErrorCode.Unknown;
/**
* RecvThroughHandler
*/
private RecvThroughHandler recvThroughHandler;
private boolean isSendThroughMode;
/**
* Thread for ClientRunner if any
*/
private ClientRunner clientRunner;
/**
* To be able to check hash once all transfer is over once again
*/
private String hashComputeDuringTransfer;
/**
* If partial hash, no global hash validation can be done
*/
private boolean partialHash;
/**
* PartnerConfiguration
*/
private PartnerConfiguration partner;
/**
* @param networkChannelRef
* @param remoteId
* @param futureRequest
*
* @throws OpenR66ProtocolRemoteShutdownException
*/
public LocalChannelReference(final NetworkChannelReference networkChannelRef,
final Integer remoteId,
final R66Future futureRequest)
throws OpenR66ProtocolRemoteShutdownException {
this.networkChannelRef = networkChannelRef;
networkServerHandler =
(NetworkServerHandler) this.networkChannelRef.channel().pipeline().get(
NetworkServerInitializer.NETWORK_HANDLER);
localId = new IntegerUuid().getInt();
this.remoteId = remoteId;
if (futureRequest == null) {
this.futureRequest = new R66Future(true);
} else {
if (futureRequest.isDone()) {
futureRequest.reset();
}
this.futureRequest = futureRequest;
}
cts = (ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
.get(
NetworkServerInitializer.LIMITCHANNEL);
LocalServerHandler.channelActive(serverHandler);
serverHandler.setLocalChannelReference(this);
networkChannelRef.add(this);
}
/**
* Special empty LCR constructor
*/
public LocalChannelReference() {
networkChannelRef = null;
networkServerHandler = null;
localId = 0;
futureRequest = new R66Future(true);
cts = null;
serverHandler.localChannelReference = this;
}
/**
* Close the localChannelReference
*/
public final void close() {
LocalServerHandler.channelInactive(serverHandler);
if (networkChannelRef != null) {
networkChannelRef.remove(this);
}
final LocalTransaction lt =
Configuration.configuration.getLocalTransaction();
if (lt != null) {
lt.remove(this);
}
}
/**
* @return the networkChannelRef
*/
public final Channel getNetworkChannel() {
return networkChannelRef.channel();
}
/**
* @return the id
*/
public final Integer getLocalId() {
return localId;
}
/**
* @return the remoteId
*/
public final Integer getRemoteId() {
return remoteId;
}
/**
* @return the ChannelTrafficShapingHandler
*/
public final ChannelTrafficShapingHandler getChannelTrafficShapingHandler() {
return cts;
}
/**
* @return the networkChannelObject
*/
public final NetworkChannelReference getNetworkChannelObject() {
return networkChannelRef;
}
/**
* @return the networkServerHandler
*/
public final NetworkServerHandler getNetworkServerHandler() {
return networkServerHandler;
}
/**
* @return the serverHandler
*/
public final TransferActions getServerHandler() {
return serverHandler;
}
/**
* @return the actual dbSession
*/
public final DbSession getDbSession() {
if (networkServerHandler != null) {
return networkServerHandler.getDbSession();
}
logger.info("SHOULD NOT BE");
return admin.getSession();
}
/**
* @param remoteId the remoteId to set
*/
public final void setRemoteId(final Integer remoteId) {
this.remoteId = remoteId;
}
/**
* @return the session
*/
public final R66Session getSession() {
return session;
}
/**
* @param session the session to set
*/
public final void setSession(final R66Session session) {
this.session = session;
}
/**
* @return the current errorMessage
*/
public final String getErrorMessage() {
return errorMessage;
}
/**
* @param errorMessage the errorMessage to set
*/
public final void setErrorMessage(final String errorMessage,
final ErrorCode code) {
this.errorMessage = errorMessage;
this.code = code;
}
/**
* @return the code
*/
public final ErrorCode getCurrentCode() {
return code;
}
/**
* Validate or not the Startup (before connection)
*
* @param validate
*/
public final void validateStartup(final boolean validate) {
if (futureStartup.isDone()) {
return;
}
if (validate) {
futureStartup.setSuccess();
} else {
futureStartup.cancel();
}
}
/**
* @return the futureValidateStartup
*/
public final R66Future getFutureValidateStartup() {
if (!futureStartup.awaitOrInterruptible()) {
validateStartup(false);
return futureStartup;
}
return futureStartup;
}
/**
* @return True if the connection is validated (in OK or KO status)
*/
public final boolean isConnectionValidate() {
return futureConnection.isDone();
}
/**
* Validate or Invalidate the connection (authentication)
*
* @param validate
*/
public final void validateConnection(final boolean validate,
final R66Result result) {
if (futureConnection.isDone()) {
logger.debug("LocalChannelReference already validated: {}",
futureConnection.isSuccess());
return;
}
logger.debug("Validation of connection {}", validate);
if (validate) {
futureConnection.setResult(result);
futureConnection.setSuccess();
} else {
futureConnection.setResult(result);
setErrorMessage(result.getMessage(), result.getCode());
futureConnection.cancel();
}
}
/**
* @return the futureValidateConnection
*/
public final R66Future getFutureValidateConnection() {
final R66Result result;
final Channel channel = networkChannelRef.channel();
if (channel != null && channel.isActive()) {
if (!futureConnection.awaitOrInterruptible()) {
if (futureConnection.isDone()) {
return futureConnection;
} else {
logger.warn("Cannot get Connection due to out of Time: {}", this);
result = new R66Result(
new OpenR66ProtocolNoConnectionException("Out of time"), session,
false, ErrorCode.ConnectionImpossible, null);
validateConnection(false, result);
return futureConnection;
}
} else {
return futureConnection;
}
}
if (futureConnection.isDone()) {
return futureConnection;
}
logger.info("Cannot get Connection due to out of Time: {}", this);
result =
new R66Result(new OpenR66ProtocolNoConnectionException("Out of time"),
session, false, ErrorCode.ConnectionImpossible, null);
validateConnection(false, result);
return futureConnection;
}
/**
* Validate the End of a Transfer
*
* @param finalValue
*/
public final void validateEndTransfer(final R66Result finalValue) {
if (!futureEndTransfer.isDone()) {
futureEndTransfer.setResult(finalValue);
futureEndTransfer.setSuccess();
} else {
logger.debug("Could not validate since Already validated: {} {}",
futureEndTransfer.isSuccess(), finalValue);
if (!futureEndTransfer.getResult().isAnswered()) {
futureEndTransfer.getResult().setAnswered(finalValue.isAnswered());
}
}
}
/**
* @return the futureEndTransfer
*/
public final R66Future getFutureEndTransfer() {
return futureEndTransfer;
}
/**
* Special waiter for Send Through method. It reset the EndTransfer future.
*
* @throws OpenR66Exception
*/
public final void waitReadyForSendThrough() throws OpenR66Exception {
logger.debug("Wait for End of Prepare Transfer");
futureEndTransfer.awaitOrInterruptible();
if (futureEndTransfer.isSuccess()) {
// reset since transfer will start now
futureEndTransfer = new R66Future(true);
} else {
if (futureEndTransfer.getResult() != null &&
futureEndTransfer.getResult().getException() != null) {
throw futureEndTransfer.getResult().getException();
} else if (futureEndTransfer.getCause() != null) {
throw new OpenR66RunnerErrorException(futureEndTransfer.getCause());
} else {
throw new OpenR66RunnerErrorException("Unknown reason");
}
}
}
/**
* @return the futureValidRequest
*/
public final R66Future getFutureValidRequest() {
return futureValidRequest;
}
/**
* @return the futureRequest
*/
public final R66Future getFutureRequest() {
return futureRequest;
}
/**
* Invalidate the current request
*
* @param finalvalue
*/
public final void invalidateRequest(final R66Result finalvalue) {
R66Result finalValue = finalvalue;
if (finalValue == null) {
finalValue =
new R66Result(session, false, ErrorCode.Unknown, session.getRunner());
}
if (logger.isDebugEnabled()) {
logger.debug(
"FST: " + futureStartup.isDone() + ":" + futureStartup.isSuccess() +
" FCT: " + futureConnection.isDone() + ':' +
futureConnection.isSuccess() + " FET: " + futureEndTransfer.isDone() +
':' + futureEndTransfer.isSuccess() + " FVR: " +
futureValidRequest.isDone() + ':' + futureValidRequest.isSuccess() +
" FR: " + futureRequest.isDone() + ':' + futureRequest.isSuccess() +
' ' + finalValue.getMessage());
}
if (!futureStartup.isDone()) {
futureStartup.setResult(finalValue);
if (finalValue.getException() != null) {
futureStartup.setFailure(finalValue.getException());
} else {
futureStartup.cancel();
}
}
if (!futureConnection.isDone()) {
futureConnection.setResult(finalValue);
if (finalValue.getException() != null) {
futureConnection.setFailure(finalValue.getException());
} else {
futureConnection.cancel();
}
}
if (!futureEndTransfer.isDone()) {
futureEndTransfer.setResult(finalValue);
if (finalValue.getException() != null) {
futureEndTransfer.setFailure(finalValue.getException());
} else {
futureEndTransfer.cancel();
}
}
if (!futureValidRequest.isDone()) {
futureValidRequest.setResult(finalValue);
if (finalValue.getException() != null) {
futureValidRequest.setFailure(finalValue.getException());
} else {
futureValidRequest.cancel();
}
}
if (logger.isTraceEnabled()) {
logger.trace("Invalidate Request",
new Exception("DEBUG Trace for Invalidation"));
}
if (finalValue.getCode() != ErrorCode.ServerOverloaded) {
if (!futureRequest.isDone()) {
setErrorMessage(finalValue.getMessage(), finalValue.getCode());
futureRequest.setResult(finalValue);
if (finalValue.getException() != null) {
futureRequest.setFailure(finalValue.getException());
} else {
futureRequest.cancel();
}
} else {
logger.debug("Could not invalidate since Already finished: {}",
futureEndTransfer.getResult());
}
} else {
setErrorMessage(finalValue.getMessage(), finalValue.getCode());
logger.info("Server Overloaded");
}
if (session != null) {
if (session.isSender()) {
NetworkTransaction.stopRetrieve(this);
}
}
}
/**
* Validate the current Request
*
* @param finalValue
*/
public final void validateRequest(final R66Result finalValue) {
setErrorMessage("NoError", null);
if (!futureEndTransfer.isDone()) {
logger.debug("Will validate EndTransfer");
validateEndTransfer(finalValue);
}
if (!futureValidRequest.isDone()) {
futureValidRequest.setResult(finalValue);
futureValidRequest.setSuccess();
}
logger.debug("Validate Request");
if (!futureRequest.isDone()) {
if (finalValue.getOther() == null &&
session.getBusinessObject() != null &&
session.getBusinessObject().getInfo(session) != null) {
finalValue.setOther(session.getBusinessObject().getInfo(session));
}
futureRequest.setResult(finalValue);
futureRequest.setSuccess();
} else {
logger.info("Already validated: {} {}", futureRequest.isSuccess(),
finalValue);
if (!futureRequest.getResult().isAnswered()) {
futureRequest.getResult().setAnswered(finalValue.isAnswered());
}
}
}
private long getMinLimit(final long a, final long b) {
long res = a;
if (a <= 0) {
res = b;
} else if (b > 0 && b < a) {
res = b;
}
return res;
}
public final void setChannelLimit(final boolean isSender, final long limit) {
final ChannelTrafficShapingHandler limitHandler =
(ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
.get(
NetworkServerInitializer.LIMITCHANNEL);
if (isSender) {
limitHandler.setWriteLimit(limit);
logger.info("Will write at {} Bytes/sec", limit);
} else {
limitHandler.setReadLimit(limit);
logger.info("Will read at {} Bytes/sec", limit);
}
}
public final long getChannelLimit(final boolean isSender) {
final long global;
final long channel;
if (isSender) {
global = Configuration.configuration.getServerGlobalWriteLimit();
channel = Configuration.configuration.getServerChannelWriteLimit();
} else {
global = Configuration.configuration.getServerGlobalReadLimit();
channel = Configuration.configuration.getServerChannelReadLimit();
}
return getMinLimit(global, channel);
}
@Override
public final String toString() {
return "LCR: L: " + localId + " R: " + remoteId + " Startup[" +
futureStartup + "] Conn[" + futureConnection +
"] ValidRequestRequest[" + futureValidRequest + "] EndTransfer[" +
(futureEndTransfer != null? futureEndTransfer : "noEndTransfer") +
"] Request[" + (futureRequest != null? futureRequest : "noRequest") +
']';
}
/**
* @return the recvThroughHandler
*/
public final RecvThroughHandler getRecvThroughHandler() {
return recvThroughHandler;
}
/**
* @return True if in RecvThrough Mode
*/
public final boolean isRecvThroughMode() {
return recvThroughHandler != null;
}
/**
* @param recvThroughHandler the recvThroughHandler to set
*/
public final void setRecvThroughHandler(
final RecvThroughHandler recvThroughHandler) {
this.recvThroughHandler = recvThroughHandler;
}
/**
* @return True if in SendThrough Mode
*/
public final boolean isSendThroughMode() {
return isSendThroughMode;
}
/**
* @param isSendThroughMode the isSendThroughMode to set
*/
public final void setSendThroughMode(final boolean isSendThroughMode) {
this.isSendThroughMode = isSendThroughMode;
}
/**
* @return the clientRunner
*/
public final ClientRunner getClientRunner() {
return clientRunner;
}
/**
* @param clientRunner the clientRunner to set
*/
public final void setClientRunner(final ClientRunner clientRunner) {
this.clientRunner = clientRunner;
}
/**
* Shortcut to set a new state in Session
*
* @param desiredState
*/
public final void sessionNewState(final R66FiniteDualStates desiredState) {
if (session != null) {
session.newState(desiredState);
}
}
/**
* @return the current state or TEST if no session exists
*/
public final R66FiniteDualStates getSessionState() {
if (session != null) {
return session.getState();
}
return R66FiniteDualStates.TEST;
}
/**
* @return the hashComputeDuringTransfer
*/
public final String getHashComputeDuringTransfer() {
return hashComputeDuringTransfer;
}
/**
* @param hashComputeDuringTransfer the hashComputeDuringTransfer to
* set
*/
public final void setHashComputeDuringTransfer(
final String hashComputeDuringTransfer) {
this.hashComputeDuringTransfer = hashComputeDuringTransfer;
}
public final void setPartialHash() {
partialHash = true;
}
public final boolean isPartialHash() {
return partialHash;
}
/**
* @return the partner
*/
public final PartnerConfiguration getPartner() {
return partner;
}
/**
* @param hostId the partner to set
*/
public final void setPartner(final String hostId) {
logger.debug("host: {}", hostId);
partner = Configuration.configuration.getVersions().get(hostId);
if (partner == null) {
partner =
new PartnerConfiguration(hostId, R66Versions.V2_4_12.getVersion());
}
logger.debug("DEBUG {}", partner);
}
/**
* @return the requestId
*/
public final String getRequestId() {
return requestId;
}
/**
* @param requestId the requestId to set
*/
public final void setRequestId(final String requestId) {
this.requestId = requestId;
}
}