NetworkServerHandler.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.proxy.network;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import org.waarp.common.crypto.ssl.WaarpSslUtility;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.openr66.protocol.exception.OpenR66Exception;
import org.waarp.openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
import org.waarp.openr66.protocol.localhandler.packet.ConnectionErrorPacket;
import org.waarp.openr66.protocol.localhandler.packet.KeepAlivePacket;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketCodec;
import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
import org.waarp.openr66.protocol.utils.ChannelCloseTimer;
import org.waarp.openr66.protocol.utils.ChannelUtils;
import org.waarp.openr66.protocol.utils.R66Future;
import java.net.BindException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import static org.waarp.openr66.protocol.configuration.Configuration.*;
/**
* Network Server Handler (Requester side)
*/
public class NetworkServerHandler
extends SimpleChannelInboundHandler<NetworkPacket> {
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(NetworkServerHandler.class);
/**
* The underlying Network Channel
*/
private Channel networkChannel;
/**
* The underlying Proxified associated Channel
*/
private Channel proxyChannel;
/**
* The associated bridge
*/
private ProxyBridge bridge;
/**
* Does this Handler is for SSL
*/
protected boolean isSSL;
/**
* Is this Handler a server side
*/
protected final boolean isServer;
/**
* To handle the keep alive
*/
private final AtomicInteger keepAlivedSent = new AtomicInteger();
/**
* Future to wait for Client to be setup
*/
protected volatile R66Future clientFuture;
/**
* @param isServer
*/
public NetworkServerHandler(final boolean isServer) {
this.isServer = isServer;
if (!this.isServer) {
clientFuture = new R66Future(true);
}
}
public final void setBridge(final ProxyBridge bridge) {
this.bridge = bridge;
if (this.bridge != null) {
proxyChannel = bridge.getSource().getNetworkChannel();
}
clientFuture.setSuccess();
logger.info("Proxy setBridge: {} {}", isServer, (bridge != null?
bridge.getProxyEntry() + " proxyChannelId: " + proxyChannel.id() :
"nobridge"));
}
/**
* @return the networkChannel
*/
public final Channel getNetworkChannel() {
return networkChannel;
}
public final void close() {
WaarpSslUtility.closingSslChannel(networkChannel);
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
if (proxyChannel != null) {
WaarpSslUtility.closingSslChannel(proxyChannel);
}
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
try {
networkChannel = ctx.channel();
/*
* The associated Local Address
*/
final SocketAddress localAddress = networkChannel.localAddress();
if (isServer) {
final ProxyEntry entry = ProxyEntry.get(localAddress.toString());
if (entry == null) {
// error
exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
"Cannot found Proxy Entry: connection aborted"));
// WaarpSslUtility.closingSslChannel(networkChannel);
logger.error("No proxy configuration found for: " + localAddress);
return;
}
bridge = new ProxyBridge(entry, this);
bridge.initializeProxy();
if (!bridge.waitForRemoteConnection()) {
exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
"Proxy Cannot connect to remote Server: connection aborted"));
logger.error("No connection for proxy: " + localAddress);
return;
}
proxyChannel = bridge.getProxified().networkChannel;
logger.warn("Connected: " + isServer + ' ' + bridge.getProxyEntry() +
" proxyChannelId: " + proxyChannel.id());
} else {
clientFuture.awaitOrInterruptible(configuration.getTimeoutCon());
if (bridge == null || !clientFuture.isSuccess()) {
exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
"Proxy Cannot connect to remote Server: connection aborted"));
logger.error("No connection for proxy: " + localAddress);
return;
}
bridge.remoteConnected();
}
logger.debug("Proxy Network Channel Connected: {} ", ctx.channel().id());
} finally {
ctx.read();
}
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx,
final Object evt) throws Exception {
if (configuration.isShutdown()) {
return;
}
if (evt instanceof IdleStateEvent) {
if (keepAlivedSent.get() > 0) {
if (keepAlivedSent.get() < 5) {
// ignore this time
keepAlivedSent.getAndIncrement();
return;
}
logger.error("Proxy Not getting KAlive: closing channel");
if (configuration.getR66Mib() != null) {
configuration.getR66Mib()
.notifyWarning("Proxy KeepAlive get no answer",
"Closing network connection");
}
ChannelCloseTimer.closeFutureChannel(ctx.channel());
} else {
keepAlivedSent.set(1);
final KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
final NetworkPacket response =
new NetworkPacket(ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
keepAlivePacket, null);
logger.info("Proxy Write KAlive");
ctx.channel().writeAndFlush(response);
}
}
}
public final void resetKeepAlive() {
keepAlivedSent.set(0);
}
@Override
public void channelRead0(final ChannelHandlerContext ctx,
final NetworkPacket msg) {
try {
if (msg.getCode() == LocalPacketFactory.NOOPPACKET) {
resetKeepAlive();
// Will forward
} else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
logger.debug("Proxy NetworkRecv: {}", msg);
// Special code to STOP here
if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
// No way to know what is wrong: close all connections with
// remote host
logger.error(
"Proxy Will close NETWORK channel, Cannot continue connection with remote Host: " +
msg + " : " + ctx.channel().remoteAddress());
WaarpSslUtility.closingSslChannel(ctx.channel());
msg.clear();
return;
}
} else if (msg.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
resetKeepAlive();
try {
final KeepAlivePacket keepAlivePacket =
(KeepAlivePacket) LocalPacketCodec.decodeNetworkPacket(
msg.getBuffer());
if (keepAlivePacket.isToValidate()) {
keepAlivePacket.validate();
final NetworkPacket response =
new NetworkPacket(ChannelUtils.NOCHANNEL,
ChannelUtils.NOCHANNEL, keepAlivePacket,
null);
logger.info("Proxy Answer KAlive");
ctx.channel().writeAndFlush(response);
} else {
logger.info("Proxy Get KAlive");
}
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
}
msg.clear();
return;
}
// forward message
resetKeepAlive();
if (proxyChannel != null) {
proxyChannel.writeAndFlush(msg);
} else {
msg.clear();
}
} finally {
ctx.read();
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
final Throwable cause) {
final Channel channel = ctx.channel();
logger.debug("Proxy Network Channel Exception: {}", channel.id(), cause);
if (cause instanceof ReadTimeoutException) {
final ReadTimeoutException exception = (ReadTimeoutException) cause;
// No read for too long
logger.error("ReadTimeout so Will close NETWORK channel {}",
exception.getClass().getName() + " : " +
exception.getMessage());
ChannelCloseTimer.closeFutureChannel(channel);
return;
}
if (cause instanceof BindException) {
// received when not yet connected
logger.debug("BindException");
ChannelCloseTimer.closeFutureChannel(channel);
return;
}
final OpenR66Exception exception =
OpenR66ExceptionTrappedFactory.getExceptionFromTrappedException(channel,
cause);
if (exception != null) {
if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
logger.debug("Will close NETWORK channel");
ChannelCloseTimer.closeFutureChannel(channel);
return;
} else if (exception instanceof OpenR66ProtocolNoConnectionException) {
logger.info("Connection impossible with NETWORK channel {}",
exception.getMessage());
channel.close();
return;
} else {
logger.info("Network Channel Exception: {} {}", channel.id(),
exception.getMessage());
}
final ConnectionErrorPacket errorPacket =
new ConnectionErrorPacket(exception.getMessage(), null);
writeError(channel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
errorPacket);
if (proxyChannel != null) {
writeError(proxyChannel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
errorPacket);
}
logger.debug("Will close NETWORK channel: {}", exception.getMessage());
ChannelCloseTimer.closeFutureChannel(channel);
} else {
// Nothing to do
}
}
/**
* Write error back to remote client
*
* @param channel
* @param remoteId
* @param localId
* @param error
*/
final void writeError(final Channel channel, final Integer remoteId,
final Integer localId,
final AbstractLocalPacket error) {
if (channel.isActive()) {
NetworkPacket networkPacket = null;
logger.info("Proxy Error to send {}", error);
try {
networkPacket = new NetworkPacket(localId, remoteId, error, null);
} catch (final OpenR66ProtocolPacketException ignored) {
// nothing
}
if (networkPacket != null) {
final NetworkPacket finalNP = networkPacket;
channel.eventLoop().submit(new Runnable() {
@Override
public final void run() {
channel.writeAndFlush(finalNP).awaitUninterruptibly();
finalNP.clear();
}
});
}
}
}
/**
* @return True if this Handler is for SSL
*/
public final boolean isSsl() {
return isSSL;
}
}