1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.proxy.network;
21
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.handler.timeout.IdleStateEvent;
26 import io.netty.handler.timeout.ReadTimeoutException;
27 import org.waarp.common.crypto.ssl.WaarpSslUtility;
28 import org.waarp.common.logging.WaarpLogger;
29 import org.waarp.common.logging.WaarpLoggerFactory;
30 import org.waarp.openr66.protocol.exception.OpenR66Exception;
31 import org.waarp.openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
32 import org.waarp.openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
33 import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
34 import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
35 import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
36 import org.waarp.openr66.protocol.localhandler.packet.ConnectionErrorPacket;
37 import org.waarp.openr66.protocol.localhandler.packet.KeepAlivePacket;
38 import org.waarp.openr66.protocol.localhandler.packet.LocalPacketCodec;
39 import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
40 import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
41 import org.waarp.openr66.protocol.utils.ChannelCloseTimer;
42 import org.waarp.openr66.protocol.utils.ChannelUtils;
43 import org.waarp.openr66.protocol.utils.R66Future;
44
45 import java.net.BindException;
46 import java.net.SocketAddress;
47 import java.util.concurrent.atomic.AtomicInteger;
48
49 import static org.waarp.openr66.protocol.configuration.Configuration.*;
50
51
52
53
54 public class NetworkServerHandler
55 extends SimpleChannelInboundHandler<NetworkPacket> {
56
57
58
59 private static final WaarpLogger logger =
60 WaarpLoggerFactory.getLogger(NetworkServerHandler.class);
61
62
63
64
65 private Channel networkChannel;
66
67
68
69 private Channel proxyChannel;
70
71
72
73 private ProxyBridge bridge;
74
75
76
77 protected boolean isSSL;
78
79
80
81 protected final boolean isServer;
82
83
84
85 private final AtomicInteger keepAlivedSent = new AtomicInteger();
86
87
88
89 protected volatile R66Future clientFuture;
90
91
92
93
94 public NetworkServerHandler(final boolean isServer) {
95 this.isServer = isServer;
96 if (!this.isServer) {
97 clientFuture = new R66Future(true);
98 }
99 }
100
101 public final void setBridge(final ProxyBridge bridge) {
102 this.bridge = bridge;
103 if (this.bridge != null) {
104 proxyChannel = bridge.getSource().getNetworkChannel();
105 }
106 clientFuture.setSuccess();
107 logger.info("Proxy setBridge: {} {}", isServer, (bridge != null?
108 bridge.getProxyEntry() + " proxyChannelId: " + proxyChannel.id() :
109 "nobridge"));
110 }
111
112
113
114
115 public final Channel getNetworkChannel() {
116 return networkChannel;
117 }
118
119 public final void close() {
120 WaarpSslUtility.closingSslChannel(networkChannel);
121 }
122
123 @Override
124 public void channelInactive(final ChannelHandlerContext ctx) {
125 if (proxyChannel != null) {
126 WaarpSslUtility.closingSslChannel(proxyChannel);
127 }
128 }
129
130 @Override
131 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
132 try {
133 networkChannel = ctx.channel();
134
135
136
137 final SocketAddress localAddress = networkChannel.localAddress();
138 if (isServer) {
139 final ProxyEntry entry = ProxyEntry.get(localAddress.toString());
140 if (entry == null) {
141
142 exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
143 "Cannot found Proxy Entry: connection aborted"));
144
145 logger.error("No proxy configuration found for: " + localAddress);
146 return;
147 }
148 bridge = new ProxyBridge(entry, this);
149 bridge.initializeProxy();
150 if (!bridge.waitForRemoteConnection()) {
151 exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
152 "Proxy Cannot connect to remote Server: connection aborted"));
153 logger.error("No connection for proxy: " + localAddress);
154 return;
155 }
156 proxyChannel = bridge.getProxified().networkChannel;
157 logger.warn("Connected: " + isServer + ' ' + bridge.getProxyEntry() +
158 " proxyChannelId: " + proxyChannel.id());
159 } else {
160 clientFuture.awaitOrInterruptible(configuration.getTimeoutCon());
161 if (bridge == null || !clientFuture.isSuccess()) {
162 exceptionCaught(ctx, new OpenR66ProtocolNoConnectionException(
163 "Proxy Cannot connect to remote Server: connection aborted"));
164 logger.error("No connection for proxy: " + localAddress);
165 return;
166 }
167 bridge.remoteConnected();
168 }
169 logger.debug("Proxy Network Channel Connected: {} ", ctx.channel().id());
170 } finally {
171 ctx.read();
172 }
173 }
174
175 @Override
176 public void userEventTriggered(final ChannelHandlerContext ctx,
177 final Object evt) throws Exception {
178 if (configuration.isShutdown()) {
179 return;
180 }
181 if (evt instanceof IdleStateEvent) {
182 if (keepAlivedSent.get() > 0) {
183 if (keepAlivedSent.get() < 5) {
184
185 keepAlivedSent.getAndIncrement();
186 return;
187 }
188 logger.error("Proxy Not getting KAlive: closing channel");
189 if (configuration.getR66Mib() != null) {
190 configuration.getR66Mib()
191 .notifyWarning("Proxy KeepAlive get no answer",
192 "Closing network connection");
193 }
194 ChannelCloseTimer.closeFutureChannel(ctx.channel());
195 } else {
196 keepAlivedSent.set(1);
197 final KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
198 final NetworkPacket response =
199 new NetworkPacket(ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
200 keepAlivePacket, null);
201 logger.info("Proxy Write KAlive");
202 ctx.channel().writeAndFlush(response);
203 }
204 }
205 }
206
207 public final void resetKeepAlive() {
208 keepAlivedSent.set(0);
209 }
210
211 @Override
212 public void channelRead0(final ChannelHandlerContext ctx,
213 final NetworkPacket msg) {
214 try {
215 if (msg.getCode() == LocalPacketFactory.NOOPPACKET) {
216 resetKeepAlive();
217
218 } else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
219 logger.debug("Proxy NetworkRecv: {}", msg);
220
221 if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
222
223
224 logger.error(
225 "Proxy Will close NETWORK channel, Cannot continue connection with remote Host: " +
226 msg + " : " + ctx.channel().remoteAddress());
227 WaarpSslUtility.closingSslChannel(ctx.channel());
228 msg.clear();
229 return;
230 }
231 } else if (msg.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
232 resetKeepAlive();
233 try {
234 final KeepAlivePacket keepAlivePacket =
235 (KeepAlivePacket) LocalPacketCodec.decodeNetworkPacket(
236 msg.getBuffer());
237 if (keepAlivePacket.isToValidate()) {
238 keepAlivePacket.validate();
239 final NetworkPacket response =
240 new NetworkPacket(ChannelUtils.NOCHANNEL,
241 ChannelUtils.NOCHANNEL, keepAlivePacket,
242 null);
243 logger.info("Proxy Answer KAlive");
244 ctx.channel().writeAndFlush(response);
245 } else {
246 logger.info("Proxy Get KAlive");
247 }
248 } catch (final OpenR66ProtocolPacketException ignored) {
249
250 }
251 msg.clear();
252 return;
253 }
254
255 resetKeepAlive();
256 if (proxyChannel != null) {
257 proxyChannel.writeAndFlush(msg);
258 } else {
259 msg.clear();
260 }
261 } finally {
262 ctx.read();
263 }
264 }
265
266 @Override
267 public void exceptionCaught(final ChannelHandlerContext ctx,
268 final Throwable cause) {
269 final Channel channel = ctx.channel();
270 logger.debug("Proxy Network Channel Exception: {}", channel.id(), cause);
271 if (cause instanceof ReadTimeoutException) {
272 final ReadTimeoutException exception = (ReadTimeoutException) cause;
273
274 logger.error("ReadTimeout so Will close NETWORK channel {}",
275 exception.getClass().getName() + " : " +
276 exception.getMessage());
277 ChannelCloseTimer.closeFutureChannel(channel);
278 return;
279 }
280 if (cause instanceof BindException) {
281
282 logger.debug("BindException");
283 ChannelCloseTimer.closeFutureChannel(channel);
284 return;
285 }
286 final OpenR66Exception exception =
287 OpenR66ExceptionTrappedFactory.getExceptionFromTrappedException(channel,
288 cause);
289 if (exception != null) {
290 if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
291 logger.debug("Will close NETWORK channel");
292 ChannelCloseTimer.closeFutureChannel(channel);
293 return;
294 } else if (exception instanceof OpenR66ProtocolNoConnectionException) {
295 logger.info("Connection impossible with NETWORK channel {}",
296 exception.getMessage());
297 channel.close();
298 return;
299 } else {
300 logger.info("Network Channel Exception: {} {}", channel.id(),
301 exception.getMessage());
302 }
303 final ConnectionErrorPacket errorPacket =
304 new ConnectionErrorPacket(exception.getMessage(), null);
305 writeError(channel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
306 errorPacket);
307 if (proxyChannel != null) {
308 writeError(proxyChannel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
309 errorPacket);
310 }
311 logger.debug("Will close NETWORK channel: {}", exception.getMessage());
312 ChannelCloseTimer.closeFutureChannel(channel);
313 } else {
314
315 }
316 }
317
318
319
320
321
322
323
324
325
326 final void writeError(final Channel channel, final Integer remoteId,
327 final Integer localId,
328 final AbstractLocalPacket error) {
329 if (channel.isActive()) {
330 NetworkPacket networkPacket = null;
331 logger.info("Proxy Error to send {}", error);
332 try {
333 networkPacket = new NetworkPacket(localId, remoteId, error, null);
334 } catch (final OpenR66ProtocolPacketException ignored) {
335
336 }
337 if (networkPacket != null) {
338 final NetworkPacket finalNP = networkPacket;
339 channel.eventLoop().submit(new Runnable() {
340 @Override
341 public final void run() {
342 channel.writeAndFlush(finalNP).awaitUninterruptibly();
343 finalNP.clear();
344 }
345 });
346 }
347 }
348 }
349
350
351
352
353 public final boolean isSsl() {
354 return isSSL;
355 }
356 }