ChannelUtils.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.protocol.utils;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.waarp.common.database.DbAdmin;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.digest.FilesystemBasedDigest.DigestAlgo;
import org.waarp.common.file.DataBlock;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpNettyUtil;
import org.waarp.common.utility.WaarpShutdownHook;
import org.waarp.common.utility.WaarpSystemUtil;
import org.waarp.openr66.context.R66FiniteDualStates;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.context.task.localexec.LocalExecClient;
import org.waarp.openr66.database.data.DbTaskRunner;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.configuration.Messages;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
import org.waarp.openr66.protocol.localhandler.packet.DataPacket;
import org.waarp.openr66.protocol.localhandler.packet.EndTransferPacket;
import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
import java.lang.management.ManagementFactory;
import static org.waarp.openr66.database.DbConstantR66.*;
/**
* Channel Utils
*/
public class ChannelUtils extends Thread {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(ChannelUtils.class);
public static final Integer NOCHANNEL = Integer.MIN_VALUE;
/**
* Terminate all registered channels
*
* @return the number of previously registered network channels
*/
private static int terminateCommandChannels() {
if (Configuration.configuration.getServerChannelGroup() == null) {
return 0;
}
final int result =
Configuration.configuration.getServerChannelGroup().size();
logger.info("ServerChannelGroup: {}", result);
Configuration.configuration.getServerChannelGroup().close();
return result;
}
/**
* Terminate all registered connected client channels
*
* @return the number of previously registered network connected client
* channels
*/
private static int terminateClientChannels() {
if (Configuration.configuration.getServerConnectedChannelGroup() == null) {
return 0;
}
final int result =
Configuration.configuration.getServerConnectedChannelGroup().size();
logger.info("ServerConnectedChannelGroup: {}", result);
Configuration.configuration.getServerConnectedChannelGroup().close();
return result;
}
/**
* Terminate all registered Http channels
*
* @return the number of previously registered http network channels
*/
private static int terminateHttpChannels() {
if (Configuration.configuration.getHttpChannelGroup() == null) {
return 0;
}
final int result = Configuration.configuration.getHttpChannelGroup().size();
logger.debug("HttpChannelGroup: {}", result);
Configuration.configuration.getHttpChannelGroup().close();
return result;
}
/**
* Return the current number of network connections
*
* @param configuration
*
* @return the current number of network connections
*/
public static int nbCommandChannels(final Configuration configuration) {
int nb = 0;
if (Configuration.configuration.getServerConnectedChannelGroup() != null) {
nb += configuration.getServerConnectedChannelGroup().size();
}
if (configuration.getHttpChannelGroup() != null) {
nb += configuration.getHttpChannelGroup().size();
}
return nb;
}
/**
* @param localChannelReference
* @param digestGlobal
* @param block
*
* @return the ChannelFuture of this write operation
*
* @throws OpenR66ProtocolPacketException
*/
public static ChannelFuture writeBackDataBlock(
final LocalChannelReference localChannelReference,
final FilesystemBasedDigest digestGlobal, final DataBlock block,
final FilesystemBasedDigest digestBlock)
throws OpenR66ProtocolPacketException {
byte[] md5 = {};
final DbTaskRunner runner = localChannelReference.getSession().getRunner();
final byte[] dataBlock = block.getByteBlock();
final int length = block.getByteCount();
if (digestBlock != null) {
if (digestGlobal != null) {
digestGlobal.Update(dataBlock, 0, length);
}
digestBlock.Update(dataBlock, 0, length);
md5 = digestBlock.Final();
} else if (RequestPacket.isSendThroughMode(runner.getMode()) &&
RequestPacket.isMD5Mode(runner.getMode())) {
final DigestAlgo algo =
localChannelReference.getPartner().getDigestAlgo();
md5 = FileUtils.getHash(dataBlock, length, algo, digestGlobal);
} else if (digestGlobal != null) {
digestGlobal.Update(dataBlock, 0, length);
}
if (runner.getRank() % 100 == 1 ||
localChannelReference.getSessionState() != R66FiniteDualStates.DATAS) {
localChannelReference.sessionNewState(R66FiniteDualStates.DATAS);
}
final DataPacket data =
new DataPacket(runner.getRank(), dataBlock, length, md5);
if (localChannelReference.getSession().isCompressionEnabled()) {
R66Session.getCodec().compress(data, localChannelReference.getSession());
}
if (logger.isDebugEnabled()) {
logger.debug("DIGEST {} for {} to {} bytes at rank{} using {} at rank {}",
FilesystemBasedDigest.getHex(data.getKey()), length,
data.getLengthPacket(), data.getPacketRank(),
localChannelReference.getPartner().getDigestAlgo(),
runner.getRank());
}
final ChannelFuture future =
writeAbstractLocalPacket(localChannelReference, data, false);
runner.incrementRank();
return future;
}
/**
* Write the EndTransfer
*
* @param localChannelReference
*
* @throws OpenR66ProtocolPacketException
*/
public static void writeEndTransfer(
final LocalChannelReference localChannelReference)
throws OpenR66ProtocolPacketException {
final EndTransferPacket packet =
new EndTransferPacket(LocalPacketFactory.REQUESTPACKET);
localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
writeAbstractLocalPacket(localChannelReference, packet, false);
}
/**
* Write the EndTransfer plus Global Hash
*
* @param localChannelReference
* @param hash
*
* @throws OpenR66ProtocolPacketException
*/
public static void writeEndTransfer(
final LocalChannelReference localChannelReference, final String hash)
throws OpenR66ProtocolPacketException {
final EndTransferPacket packet =
new EndTransferPacket(LocalPacketFactory.REQUESTPACKET, hash);
localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
writeAbstractLocalPacket(localChannelReference, packet, false);
}
/**
* Write an AbstractLocalPacket to the network Channel
*
* @param localChannelReference
* @param packet
* @param wait
*
* @return the ChannelFuture on write operation
*
* @throws OpenR66ProtocolPacketException
*/
public static ChannelFuture writeAbstractLocalPacket(
final LocalChannelReference localChannelReference,
final AbstractLocalPacket packet, final boolean wait)
throws OpenR66ProtocolPacketException {
final NetworkPacket networkPacket;
try {
networkPacket = new NetworkPacket(localChannelReference.getLocalId(),
localChannelReference.getRemoteId(),
packet, localChannelReference);
} catch (final OpenR66ProtocolPacketException e) {
logger.error(Messages.getString("ChannelUtils.6") + packet,
//$NON-NLS-1$
e);
throw e;
}
final boolean addListener = packet instanceof ErrorPacket &&
((ErrorPacket) packet).getCode() ==
ErrorPacket.FORWARDCLOSECODE;
final ChannelFuture future =
localChannelReference.getNetworkChannel().writeAndFlush(networkPacket);
if (addListener) {
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(final Future<? super Void> future) {
localChannelReference.close();
}
});
}
final NetworkServerHandler nsh =
localChannelReference.getNetworkServerHandler();
if (nsh != null) {
nsh.resetKeepAlive();
}
final NetworkChannelReference ncr =
localChannelReference.getNetworkChannelObject();
if (ncr != null) {
ncr.use();
}
if (wait) {
WaarpNettyUtil.awaitOrInterrupted(future);
}
return future;
}
/**
* Exit global ChannelFactory
*/
public static void exit() {
logger.info("Current launched threads before exit: {}",
ManagementFactory.getThreadMXBean().getThreadCount());
if (Configuration.configuration.getConstraintLimitHandler() != null) {
Configuration.configuration.getConstraintLimitHandler().release();
}
// First try to StopAll
if (admin != null) {
TransferUtils.stopSelectedTransfers(admin.getSession(), 0, null, null,
null, null, null, null, null, null,
null, true, true, true);
}
Configuration.configuration.setShutdown(true);
Configuration.configuration.prepareServerStop();
long delay = Configuration.configuration.getTimeoutCon();
// Inform others that shutdown
if (Configuration.configuration.getLocalTransaction() != null) {
final int nb = Configuration.configuration.getLocalTransaction()
.getNumberLocalChannel();
Configuration.configuration.getLocalTransaction().shutdownLocalChannels();
if (nb == 1) {
delay /= 3;
}
}
logger.info("Unbind server network services");
Configuration.configuration.unbindServer();
logger.info("Exit Shutdown Command");
terminateCommandChannels();
logger.warn(
Messages.getString("ChannelUtils.7") + delay + " ms"); //$NON-NLS-1$
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {//NOSONAR
SysErrLogger.FAKE_LOGGER.ignoreLog(e);
}
NetworkTransaction.stopAllEndRetrieve();
if (Configuration.configuration.getLocalTransaction() != null) {
Configuration.configuration.getLocalTransaction()
.debugPrintActiveLocalChannels();
}
if (Configuration.configuration.getGlobalTrafficShapingHandler() != null) {
Configuration.configuration.getGlobalTrafficShapingHandler().release();
}
logger.info("Exit Shutdown Http");
terminateHttpChannels();
logger.info("Exit Shutdown Local");
if (Configuration.configuration.getLocalTransaction() != null) {
Configuration.configuration.getLocalTransaction().closeAll();
}
logger.info("Exit Shutdown LocalExec");
if (Configuration.configuration.isUseLocalExec()) {
LocalExecClient.releaseResources();
}
logger.info("Exit Shutdown Connected Client");
terminateClientChannels();
logger.info("Exit Shutdown ServerStop");
Configuration.configuration.serverStop();
logger.info("Exit Shutdown Db Connection");
DbAdmin.closeAllConnection();
logger.warn(Messages.getString("ChannelUtils.15")); //$NON-NLS-1$
SysErrLogger.FAKE_LOGGER.syserr(
Messages.getString("ChannelUtils.15")); //$NON-NLS-1$
WaarpSystemUtil.stopLogger(false);
}
/**
* This function is the top function to be called when the server is to be
* shutdown.
*/
@Override
public void run() {
logger.info("Should restart? {}", WaarpShutdownHook.isRestart());
WaarpShutdownHook.terminate(false);
}
/**
* Start Shutdown
*/
public static void startShutdown() {
if (WaarpShutdownHook.isInShutdown()) {
return;
}
final Thread thread = new Thread(new ChannelUtils(), "R66 Shutdown Thread");
thread.setDaemon(false);
thread.start();
}
}