NetworkServerHandler.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.protocol.networkhandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.AttributeKey;
import org.waarp.common.crypto.ssl.WaarpSslUtility;
import org.waarp.common.database.DbSession;
import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpShutdownHook;
import org.waarp.openr66.context.authentication.R66Auth;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.exception.OpenR66Exception;
import org.waarp.openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolBlackListedException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
import org.waarp.openr66.protocol.localhandler.LocalServerHandler;
import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
import org.waarp.openr66.protocol.localhandler.packet.ConnectionErrorPacket;
import org.waarp.openr66.protocol.localhandler.packet.KeepAlivePacket;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketCodec;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
import org.waarp.openr66.protocol.utils.ChannelCloseTimer;
import org.waarp.openr66.protocol.utils.ChannelUtils;
import java.net.BindException;
import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.waarp.common.database.DbConstant.*;
/**
* Network Server Handler (Requester side)
*/
public class NetworkServerHandler
extends SimpleChannelInboundHandler<NetworkPacket> {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(NetworkServerHandler.class);
public static final String REUSABLE_AUTH_KEY_NAME = "ReusableAuthKey";
public static final AttributeKey<R66Auth> REUSABLE_AUTH_KEY =
AttributeKey.newInstance(REUSABLE_AUTH_KEY_NAME);
/**
* The associated Remote Address
*/
private SocketAddress remoteAddress;
/**
* The associated NetworkChannelReference
*/
private NetworkChannelReference networkChannelReference;
/**
* The Database connection attached to this NetworkChannelReference shared
* among all associated LocalChannels
*/
private DbSession dbSession;
/**
* Does this Handler is for SSL
*/
protected boolean isSSL;
/**
* To handle the keep alive
*/
private final AtomicInteger keepAlivedSent = new AtomicInteger(0);
/**
* Is this network connection being refused (black listed)
*/
protected boolean isBlackListed;
/**
* Is this network connection being refused (shutting down)
*/
protected boolean isShuttingDown;
/**
*
*/
public NetworkServerHandler() {
// Empty
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
try {
if (Configuration.configuration.getServerConnectedChannelGroup() !=
null) {
Configuration.configuration.getServerConnectedChannelGroup()
.remove(ctx.channel());
}
if (networkChannelReference != null) {
if (networkChannelReference.nbLocalChannels() > 0) {
logger.info("Network Channel Closed: {} LocalChannels Left: {}",
ctx.channel().id(),
networkChannelReference.nbLocalChannels());
// Give an extra time if necessary to let the local channel being closed
final int nb =
Math.min(10, networkChannelReference.nbLocalChannels());
try {
Thread.sleep(Configuration.RETRYINMS * nb);
} catch (final InterruptedException e1) {//NOSONAR
SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
}
}
try {
NetworkTransaction.closedNetworkChannel(networkChannelReference);
} catch (final RejectedExecutionException e) {
logger.debug(e);
}
} else {
if (remoteAddress == null) {
remoteAddress = ctx.channel().remoteAddress();
}
try {
NetworkTransaction.closedNetworkChannel(remoteAddress);
} catch (final RejectedExecutionException e) {
logger.debug(e);
}
}
// Now force the close of the database after a wait
if (dbSession != null && admin != null && admin.getSession() != null &&
!dbSession.equals(admin.getSession())) {
dbSession.forceDisconnect();
dbSession = null;
}
} catch (final RejectedExecutionException e) {
logger.debug(e);
}
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
try {
final Channel netChannel = ctx.channel();
if (Configuration.configuration.getServerConnectedChannelGroup() !=
null) {
Configuration.configuration.getServerConnectedChannelGroup()
.add(netChannel);
}
remoteAddress = netChannel.remoteAddress();
logger.debug(
"Will the Connection be refused if Partner is BlackListed from {}",
remoteAddress);
if (NetworkTransaction.isBlacklisted(netChannel)) {
logger.warn("Connection refused since Partner is BlackListed from {}",
remoteAddress);
isBlackListed = true;
if (Configuration.configuration.getR66Mib() != null) {
Configuration.configuration.getR66Mib().notifyError(
"Black Listed connection temptative", "During connection");
}
// close immediately the connection
WaarpSslUtility.closingSslChannel(netChannel);
return;
}
try {
networkChannelReference =
NetworkTransaction.addNetworkChannel(netChannel, isSSL);
} catch (final OpenR66ProtocolRemoteShutdownException e2) {
logger.warn("Connection refused since Partner is in Shutdown from " +
remoteAddress + " : {}", e2.getMessage());
isShuttingDown = true;
// close immediately the connection
WaarpSslUtility.closingSslChannel(netChannel);
return;
} catch (final OpenR66ProtocolBlackListedException e2) {
logger.warn("Connection refused since Partner is Black Listed from " +
remoteAddress + " : {}", e2.getMessage());
isBlackListed = true;
// close immediately the connection
WaarpSslUtility.closingSslChannel(netChannel);
return;
}
if (admin.isCompatibleWithThreadSharedConnexion()) {
dbSession = new DbSession(admin, false);
dbSession.useConnection();
} else {
logger.debug("DbSession will be adjusted on LocalChannelReference");
dbSession = admin.getSession();
}
} catch (final WaarpDatabaseNoConnectionException e1) {
// Cannot connect so use default connection
logger.warn("Use default database connection");
dbSession = admin.getSession();
}
logger.debug("Network Channel Connected: {} ", ctx.channel().id());
ctx.read();
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx,
final Object evt) throws Exception {
if (Configuration.configuration.isShutdown()) {
return;
}
if (evt instanceof IdleStateEvent) {
if (networkChannelReference != null &&
networkChannelReference.checkLastTime(
Configuration.configuration.getTimeoutCon() * 2) <= 0) {
resetKeepAlive();
return;
}
if (keepAlivedSent.get() > 0) {
final int nbLocalChannels = networkChannelReference != null?
networkChannelReference.nbLocalChannels() : 0;
if (nbLocalChannels > 0 && keepAlivedSent.get() < 5) {
// ignore this time
keepAlivedSent.getAndIncrement();
return;
}
if (networkChannelReference != null &&
networkChannelReference.isSomeLocalChannelsActive()) {
// Reset counter but still waiting for a KA
logger.info(
"No KAlive yet while {} LocalChannels and {} tentatives, reset " +
"KA to 1", nbLocalChannels, keepAlivedSent.get());
keepAlivedSent.set(1);
return;
}
if (keepAlivedSent.get() < 5) {
keepAlivedSent.getAndIncrement();
return;
}
logger.error(
"Not getting KAlive: closing channel while {} LocalChannels" +
" and {} tentatives", nbLocalChannels, keepAlivedSent.get());
if (Configuration.configuration.getR66Mib() != null) {
Configuration.configuration.getR66Mib()
.notifyWarning("KeepAlive get no answer",
"Closing network connection");
}
ChannelCloseTimer.closeFutureChannel(ctx.channel());
} else {
keepAlivedSent.set(1);
final KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
final NetworkPacket response =
new NetworkPacket(ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
keepAlivePacket, null);
logger.info("Write KAlive");
ctx.channel().writeAndFlush(response);
if (networkChannelReference != null) {
networkChannelReference.useIfUsed();
}
}
}
}
public final void resetKeepAlive() {
keepAlivedSent.set(0);
if (networkChannelReference != null) {
networkChannelReference.useIfUsed();
}
}
@Override
public void channelRead0(final ChannelHandlerContext ctx,
final NetworkPacket msg) {
try {
if (isBlackListed || isShuttingDown) {
// ignore message since close on going
msg.clear();
return;
}
resetKeepAlive();
final Channel channel = ctx.channel();
if (msg.getCode() == LocalPacketFactory.NOOPPACKET) {
msg.clear();
// Do nothing
return;
} else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
logger.debug("NetworkRecv: {}", msg);
// Special code to STOP here
if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
final int nb = networkChannelReference.nbLocalChannels();
if (nb > 0) {
try {
logger.warn(
"Tentative of connection failed ({}) but still some connection" +
" are there so not closing the server channel immediately: {}",
LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()), nb);
} catch (final OpenR66ProtocolPacketException ignore) {
logger.warn(
"Tentative of connection failed but still some connection" +
" are there so not closing the server channel immediately: {}",
nb);
}
msg.clear();
return;
}
// No way to know what is wrong: close all connections with
// remote host
logger.error(
"Will close NETWORK channel, Cannot continue connection with remote Host: " +
msg + " : " + channel.remoteAddress() + " : " + nb);
msg.clear();
WaarpSslUtility.closingSslChannel(channel);
return;
}
} else if (msg.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
try {
final KeepAlivePacket keepAlivePacket =
(KeepAlivePacket) LocalPacketCodec.decodeNetworkPacket(
msg.getBuffer());
if (keepAlivePacket.isToValidate()) {
keepAlivePacket.validate();
final NetworkPacket response =
new NetworkPacket(ChannelUtils.NOCHANNEL,
ChannelUtils.NOCHANNEL, keepAlivePacket,
null);
logger.info("Answer KAlive");
ctx.writeAndFlush(response);
} else {
logger.info("Get KAlive");
}
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
} finally {
msg.clear();
}
return;
}
networkChannelReference.use();
final LocalChannelReference localChannelReference;
if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
localChannelReference =
NetworkTransaction.createConnectionFromNetworkChannelStartup(
networkChannelReference, msg, isSSL);
} else {
if (msg.getCode() == LocalPacketFactory.ENDREQUESTPACKET) {
// Coming from remote
try {
localChannelReference =
Configuration.configuration.getLocalTransaction()
.getClient(msg.getRemoteId(),
msg.getLocalId());
} catch (final OpenR66ProtocolSystemException e1) {
// do not send anything since the packet is external
try {
logger.info(
"Cannot get LocalChannel while an end of request comes: {}",
LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
} catch (final OpenR66ProtocolPacketException e2) {
logger.info(
"Cannot get LocalChannel while an end of request comes: {}",
msg);
}
msg.clear();
return;
}
// OK continue and send to the local channel
} else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
// Not a local error but a remote one
try {
localChannelReference =
Configuration.configuration.getLocalTransaction()
.getClient(msg.getRemoteId(),
msg.getLocalId());
} catch (final OpenR66ProtocolSystemException e1) {
// do not send anything since the packet is external
try {
logger.info(
"Cannot get LocalChannel while an external error comes: {}",
LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
} catch (final OpenR66ProtocolPacketException e2) {
logger.info(
"Cannot get LocalChannel while an external error comes: {}",
msg);
}
msg.clear();
return;
}
// OK continue and send to the local channel
} else {
try {
localChannelReference =
Configuration.configuration.getLocalTransaction()
.getClient(msg.getRemoteId(),
msg.getLocalId());
} catch (final OpenR66ProtocolSystemException e1) {
if (remoteAddress == null) {
remoteAddress = channel.remoteAddress();
}
if (NetworkTransaction.isShuttingdownNetworkChannel(
remoteAddress) || WaarpShutdownHook.isShutdownStarting()) {
// ignore
msg.clear();
return;
}
// try to send later
logger.info("Cannot get LocalChannel: {} due to {}", msg,
e1.getMessage());
final ConnectionErrorPacket error = new ConnectionErrorPacket(
"Cannot get localChannel since localId is not found anymore",
String.valueOf(msg.getLocalId()));
writeError(channel, msg.getRemoteId(), msg.getLocalId(), error);
msg.clear();
return;
}
}
}
// check if not already in shutdown or closed
if (NetworkTransaction.isShuttingdownNetworkChannel(remoteAddress) ||
WaarpShutdownHook.isShutdownStarting()) {
logger.debug(
"Cannot use LocalChannel since already in shutdown: " + msg);
// ignore
msg.clear();
return;
}
LocalServerHandler.channelRead0(localChannelReference, msg);
} finally {
ctx.read();
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
final Throwable cause) {
final Channel channel = ctx.channel();
if (isBlackListed || isShuttingDown) {
logger.info("While partner is blacklisted, Network Channel Exception: {}",
channel.id(), cause.getClass().getName() + " : " + cause);
// ignore
return;
}
logger.debug("Network Channel Exception: {}", channel.id(), cause);
if (cause instanceof ReadTimeoutException) {
final ReadTimeoutException exception = (ReadTimeoutException) cause;
// No read for too long
logger.error("ReadTimeout so Will close NETWORK channel {}",
exception.getClass().getName() + " : " +
exception.getMessage());
ChannelCloseTimer.closeFutureChannel(channel);
return;
}
if (cause instanceof BindException) {
// received when not yet connected
logger.debug("BindException");
ChannelCloseTimer.closeFutureChannel(channel);
return;
}
final OpenR66Exception exception =
OpenR66ExceptionTrappedFactory.getExceptionFromTrappedException(channel,
cause);
if (exception != null) {
if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
if (networkChannelReference != null &&
networkChannelReference.nbLocalChannels() > 0) {
logger.info("Network Channel Exception: {} {}", channel.id(),
exception.getClass().getName() + " : " +
exception.getMessage());
}
logger.debug("Will close NETWORK channel");
ChannelCloseTimer.closeFutureChannel(channel);
return;
} else if (exception instanceof OpenR66ProtocolNoConnectionException) {
logger.info("Connection impossible with NETWORK channel {}",
exception.getClass().getName() + " : " +
exception.getMessage());
channel.close();
return;
} else {
logger.info("Network Channel Exception: {} {}", channel.id(),
exception.getClass().getName() + " : " +
exception.getMessage());
}
final ConnectionErrorPacket errorPacket = new ConnectionErrorPacket(
exception.getClass().getName() + " : " + exception.getMessage(),
null);
writeError(channel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
errorPacket);
logger.debug("Will close NETWORK channel: {}",
exception.getClass().getName() + " : " +
exception.getMessage());
ChannelCloseTimer.closeFutureChannel(channel);
} else {
// Nothing to do
}
}
/**
* Write error back to remote client
*
* @param channel
* @param remoteId
* @param localId
* @param error
*/
public static void writeError(final Channel channel, final Integer remoteId,
final Integer localId,
final AbstractLocalPacket error) {
if (channel.isActive()) {
NetworkPacket networkPacket = null;
try {
networkPacket = new NetworkPacket(localId, remoteId, error, null);
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
}
if (networkPacket != null) {
final NetworkPacket finalNP = networkPacket;
channel.eventLoop().submit(new finalNPWrite(channel, finalNP));
}
}
}
private static class finalNPWrite implements Runnable {
private final Channel channel;
private final NetworkPacket finalNP;
private finalNPWrite(final Channel channel, final NetworkPacket finalNP) {
this.channel = channel;
this.finalNP = finalNP;
}
@Override
public void run() {
channel.writeAndFlush(finalNP).awaitUninterruptibly();
finalNP.clear();
}
}
/**
* @return the dbSession
*/
public final DbSession getDbSession() {
return dbSession;
}
/**
* @return True if this Handler is for SSL
*/
public final boolean isSsl() {
return isSSL;
}
}