1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.protocol.networkhandler;
21
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.handler.timeout.IdleStateEvent;
26 import io.netty.handler.timeout.ReadTimeoutException;
27 import io.netty.util.AttributeKey;
28 import org.waarp.common.crypto.ssl.WaarpSslUtility;
29 import org.waarp.common.database.DbSession;
30 import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
31 import org.waarp.common.logging.SysErrLogger;
32 import org.waarp.common.logging.WaarpLogger;
33 import org.waarp.common.logging.WaarpLoggerFactory;
34 import org.waarp.common.utility.WaarpShutdownHook;
35 import org.waarp.openr66.context.authentication.R66Auth;
36 import org.waarp.openr66.protocol.configuration.Configuration;
37 import org.waarp.openr66.protocol.exception.OpenR66Exception;
38 import org.waarp.openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
39 import org.waarp.openr66.protocol.exception.OpenR66ProtocolBlackListedException;
40 import org.waarp.openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
41 import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
42 import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
43 import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
44 import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
45 import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
46 import org.waarp.openr66.protocol.localhandler.LocalServerHandler;
47 import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
48 import org.waarp.openr66.protocol.localhandler.packet.ConnectionErrorPacket;
49 import org.waarp.openr66.protocol.localhandler.packet.KeepAlivePacket;
50 import org.waarp.openr66.protocol.localhandler.packet.LocalPacketCodec;
51 import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
52 import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
53 import org.waarp.openr66.protocol.utils.ChannelCloseTimer;
54 import org.waarp.openr66.protocol.utils.ChannelUtils;
55
56 import java.net.BindException;
57 import java.net.SocketAddress;
58 import java.util.concurrent.RejectedExecutionException;
59 import java.util.concurrent.atomic.AtomicInteger;
60
61 import static org.waarp.common.database.DbConstant.*;
62
63
64
65
66 public class NetworkServerHandler
67 extends SimpleChannelInboundHandler<NetworkPacket> {
68
69
70
71 private static final WaarpLogger logger =
72 WaarpLoggerFactory.getLogger(NetworkServerHandler.class);
73 public static final String REUSABLE_AUTH_KEY_NAME = "ReusableAuthKey";
74 public static final AttributeKey<R66Auth> REUSABLE_AUTH_KEY =
75 AttributeKey.newInstance(REUSABLE_AUTH_KEY_NAME);
76
77
78
79 private SocketAddress remoteAddress;
80
81
82
83 private NetworkChannelReference networkChannelReference;
84
85
86
87
88 private DbSession dbSession;
89
90
91
92 protected boolean isSSL;
93
94
95
96 private final AtomicInteger keepAlivedSent = new AtomicInteger(0);
97
98
99
100 protected boolean isBlackListed;
101
102
103
104 protected boolean isShuttingDown;
105
106
107
108
109 public NetworkServerHandler() {
110
111 }
112
113 @Override
114 public void channelInactive(final ChannelHandlerContext ctx) {
115 try {
116 if (Configuration.configuration.getServerConnectedChannelGroup() !=
117 null) {
118 Configuration.configuration.getServerConnectedChannelGroup()
119 .remove(ctx.channel());
120 }
121 if (networkChannelReference != null) {
122 if (networkChannelReference.nbLocalChannels() > 0) {
123 logger.info("Network Channel Closed: {} LocalChannels Left: {}",
124 ctx.channel().id(),
125 networkChannelReference.nbLocalChannels());
126
127 final int nb =
128 Math.min(10, networkChannelReference.nbLocalChannels());
129 try {
130 Thread.sleep(Configuration.RETRYINMS * nb);
131 } catch (final InterruptedException e1) {
132 SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
133 }
134 }
135 try {
136 NetworkTransaction.closedNetworkChannel(networkChannelReference);
137 } catch (final RejectedExecutionException e) {
138 logger.debug(e);
139 }
140 } else {
141 if (remoteAddress == null) {
142 remoteAddress = ctx.channel().remoteAddress();
143 }
144 try {
145 NetworkTransaction.closedNetworkChannel(remoteAddress);
146 } catch (final RejectedExecutionException e) {
147 logger.debug(e);
148 }
149 }
150
151 if (dbSession != null && admin != null && admin.getSession() != null &&
152 !dbSession.equals(admin.getSession())) {
153 dbSession.forceDisconnect();
154 dbSession = null;
155 }
156 } catch (final RejectedExecutionException e) {
157 logger.debug(e);
158 }
159 }
160
161 @Override
162 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
163 try {
164 final Channel netChannel = ctx.channel();
165 if (Configuration.configuration.getServerConnectedChannelGroup() !=
166 null) {
167 Configuration.configuration.getServerConnectedChannelGroup()
168 .add(netChannel);
169 }
170 remoteAddress = netChannel.remoteAddress();
171 logger.debug(
172 "Will the Connection be refused if Partner is BlackListed from {}",
173 remoteAddress);
174 if (NetworkTransaction.isBlacklisted(netChannel)) {
175 logger.warn("Connection refused since Partner is BlackListed from {}",
176 remoteAddress);
177 isBlackListed = true;
178 if (Configuration.configuration.getR66Mib() != null) {
179 Configuration.configuration.getR66Mib().notifyError(
180 "Black Listed connection temptative", "During connection");
181 }
182
183 WaarpSslUtility.closingSslChannel(netChannel);
184 return;
185 }
186 try {
187 networkChannelReference =
188 NetworkTransaction.addNetworkChannel(netChannel, isSSL);
189 } catch (final OpenR66ProtocolRemoteShutdownException e2) {
190 logger.warn("Connection refused since Partner is in Shutdown from " +
191 remoteAddress + " : {}", e2.getMessage());
192 isShuttingDown = true;
193
194 WaarpSslUtility.closingSslChannel(netChannel);
195 return;
196 } catch (final OpenR66ProtocolBlackListedException e2) {
197 logger.warn("Connection refused since Partner is Black Listed from " +
198 remoteAddress + " : {}", e2.getMessage());
199 isBlackListed = true;
200
201 WaarpSslUtility.closingSslChannel(netChannel);
202 return;
203 }
204 if (admin.isCompatibleWithThreadSharedConnexion()) {
205 dbSession = new DbSession(admin, false);
206 dbSession.useConnection();
207 } else {
208 logger.debug("DbSession will be adjusted on LocalChannelReference");
209 dbSession = admin.getSession();
210 }
211 } catch (final WaarpDatabaseNoConnectionException e1) {
212
213 logger.warn("Use default database connection");
214 dbSession = admin.getSession();
215 }
216 logger.debug("Network Channel Connected: {} ", ctx.channel().id());
217 ctx.read();
218 }
219
220 @Override
221 public void userEventTriggered(final ChannelHandlerContext ctx,
222 final Object evt) throws Exception {
223 if (Configuration.configuration.isShutdown()) {
224 return;
225 }
226 if (evt instanceof IdleStateEvent) {
227 if (networkChannelReference != null &&
228 networkChannelReference.checkLastTime(
229 Configuration.configuration.getTimeoutCon() * 2) <= 0) {
230 resetKeepAlive();
231 return;
232 }
233 if (keepAlivedSent.get() > 0) {
234 final int nbLocalChannels = networkChannelReference != null?
235 networkChannelReference.nbLocalChannels() : 0;
236 if (nbLocalChannels > 0 && keepAlivedSent.get() < 5) {
237
238 keepAlivedSent.getAndIncrement();
239 return;
240 }
241 if (networkChannelReference != null &&
242 networkChannelReference.isSomeLocalChannelsActive()) {
243
244 logger.info(
245 "No KAlive yet while {} LocalChannels and {} tentatives, reset " +
246 "KA to 1", nbLocalChannels, keepAlivedSent.get());
247 keepAlivedSent.set(1);
248 return;
249 }
250 if (keepAlivedSent.get() < 5) {
251 keepAlivedSent.getAndIncrement();
252 return;
253 }
254 logger.error(
255 "Not getting KAlive: closing channel while {} LocalChannels" +
256 " and {} tentatives", nbLocalChannels, keepAlivedSent.get());
257 if (Configuration.configuration.getR66Mib() != null) {
258 Configuration.configuration.getR66Mib()
259 .notifyWarning("KeepAlive get no answer",
260 "Closing network connection");
261 }
262 ChannelCloseTimer.closeFutureChannel(ctx.channel());
263 } else {
264 keepAlivedSent.set(1);
265 final KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
266 final NetworkPacket response =
267 new NetworkPacket(ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
268 keepAlivePacket, null);
269 logger.info("Write KAlive");
270 ctx.channel().writeAndFlush(response);
271 if (networkChannelReference != null) {
272 networkChannelReference.useIfUsed();
273 }
274 }
275 }
276 }
277
278 public final void resetKeepAlive() {
279 keepAlivedSent.set(0);
280 if (networkChannelReference != null) {
281 networkChannelReference.useIfUsed();
282 }
283 }
284
285 @Override
286 public void channelRead0(final ChannelHandlerContext ctx,
287 final NetworkPacket msg) {
288 try {
289 if (isBlackListed || isShuttingDown) {
290
291 msg.clear();
292 return;
293 }
294 resetKeepAlive();
295 final Channel channel = ctx.channel();
296 if (msg.getCode() == LocalPacketFactory.NOOPPACKET) {
297 msg.clear();
298
299 return;
300 } else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
301 logger.debug("NetworkRecv: {}", msg);
302
303 if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
304 final int nb = networkChannelReference.nbLocalChannels();
305 if (nb > 0) {
306 try {
307 logger.warn(
308 "Tentative of connection failed ({}) but still some connection" +
309 " are there so not closing the server channel immediately: {}",
310 LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()), nb);
311 } catch (final OpenR66ProtocolPacketException ignore) {
312 logger.warn(
313 "Tentative of connection failed but still some connection" +
314 " are there so not closing the server channel immediately: {}",
315 nb);
316 }
317 msg.clear();
318 return;
319 }
320
321
322 logger.error(
323 "Will close NETWORK channel, Cannot continue connection with remote Host: " +
324 msg + " : " + channel.remoteAddress() + " : " + nb);
325 msg.clear();
326 WaarpSslUtility.closingSslChannel(channel);
327 return;
328 }
329 } else if (msg.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
330 try {
331 final KeepAlivePacket keepAlivePacket =
332 (KeepAlivePacket) LocalPacketCodec.decodeNetworkPacket(
333 msg.getBuffer());
334 if (keepAlivePacket.isToValidate()) {
335 keepAlivePacket.validate();
336 final NetworkPacket response =
337 new NetworkPacket(ChannelUtils.NOCHANNEL,
338 ChannelUtils.NOCHANNEL, keepAlivePacket,
339 null);
340 logger.info("Answer KAlive");
341 ctx.writeAndFlush(response);
342 } else {
343 logger.info("Get KAlive");
344 }
345 } catch (final OpenR66ProtocolPacketException ignored) {
346
347 } finally {
348 msg.clear();
349 }
350 return;
351 }
352 networkChannelReference.use();
353 final LocalChannelReference localChannelReference;
354 if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
355 localChannelReference =
356 NetworkTransaction.createConnectionFromNetworkChannelStartup(
357 networkChannelReference, msg, isSSL);
358 } else {
359 if (msg.getCode() == LocalPacketFactory.ENDREQUESTPACKET) {
360
361 try {
362 localChannelReference =
363 Configuration.configuration.getLocalTransaction()
364 .getClient(msg.getRemoteId(),
365 msg.getLocalId());
366 } catch (final OpenR66ProtocolSystemException e1) {
367
368 try {
369 logger.info(
370 "Cannot get LocalChannel while an end of request comes: {}",
371 LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
372 } catch (final OpenR66ProtocolPacketException e2) {
373 logger.info(
374 "Cannot get LocalChannel while an end of request comes: {}",
375 msg);
376 }
377 msg.clear();
378 return;
379 }
380
381 } else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
382
383 try {
384 localChannelReference =
385 Configuration.configuration.getLocalTransaction()
386 .getClient(msg.getRemoteId(),
387 msg.getLocalId());
388 } catch (final OpenR66ProtocolSystemException e1) {
389
390 try {
391 logger.info(
392 "Cannot get LocalChannel while an external error comes: {}",
393 LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
394 } catch (final OpenR66ProtocolPacketException e2) {
395 logger.info(
396 "Cannot get LocalChannel while an external error comes: {}",
397 msg);
398 }
399 msg.clear();
400 return;
401 }
402
403 } else {
404 try {
405 localChannelReference =
406 Configuration.configuration.getLocalTransaction()
407 .getClient(msg.getRemoteId(),
408 msg.getLocalId());
409 } catch (final OpenR66ProtocolSystemException e1) {
410 if (remoteAddress == null) {
411 remoteAddress = channel.remoteAddress();
412 }
413 if (NetworkTransaction.isShuttingdownNetworkChannel(
414 remoteAddress) || WaarpShutdownHook.isShutdownStarting()) {
415
416 msg.clear();
417 return;
418 }
419
420 logger.info("Cannot get LocalChannel: {} due to {}", msg,
421 e1.getMessage());
422 final ConnectionErrorPacket error = new ConnectionErrorPacket(
423 "Cannot get localChannel since localId is not found anymore",
424 String.valueOf(msg.getLocalId()));
425 writeError(channel, msg.getRemoteId(), msg.getLocalId(), error);
426 msg.clear();
427 return;
428 }
429 }
430 }
431
432 if (NetworkTransaction.isShuttingdownNetworkChannel(remoteAddress) ||
433 WaarpShutdownHook.isShutdownStarting()) {
434 logger.debug(
435 "Cannot use LocalChannel since already in shutdown: " + msg);
436
437 msg.clear();
438 return;
439 }
440 LocalServerHandler.channelRead0(localChannelReference, msg);
441 } finally {
442 ctx.read();
443 }
444 }
445
446 @Override
447 public void exceptionCaught(final ChannelHandlerContext ctx,
448 final Throwable cause) {
449 final Channel channel = ctx.channel();
450 if (isBlackListed || isShuttingDown) {
451 logger.info("While partner is blacklisted, Network Channel Exception: {}",
452 channel.id(), cause.getClass().getName() + " : " + cause);
453
454 return;
455 }
456 logger.debug("Network Channel Exception: {}", channel.id(), cause);
457 if (cause instanceof ReadTimeoutException) {
458 final ReadTimeoutException exception = (ReadTimeoutException) cause;
459
460 logger.error("ReadTimeout so Will close NETWORK channel {}",
461 exception.getClass().getName() + " : " +
462 exception.getMessage());
463 ChannelCloseTimer.closeFutureChannel(channel);
464 return;
465 }
466 if (cause instanceof BindException) {
467
468 logger.debug("BindException");
469 ChannelCloseTimer.closeFutureChannel(channel);
470 return;
471 }
472 final OpenR66Exception exception =
473 OpenR66ExceptionTrappedFactory.getExceptionFromTrappedException(channel,
474 cause);
475 if (exception != null) {
476 if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
477 if (networkChannelReference != null &&
478 networkChannelReference.nbLocalChannels() > 0) {
479 logger.info("Network Channel Exception: {} {}", channel.id(),
480 exception.getClass().getName() + " : " +
481 exception.getMessage());
482 }
483 logger.debug("Will close NETWORK channel");
484 ChannelCloseTimer.closeFutureChannel(channel);
485 return;
486 } else if (exception instanceof OpenR66ProtocolNoConnectionException) {
487 logger.info("Connection impossible with NETWORK channel {}",
488 exception.getClass().getName() + " : " +
489 exception.getMessage());
490 channel.close();
491 return;
492 } else {
493 logger.info("Network Channel Exception: {} {}", channel.id(),
494 exception.getClass().getName() + " : " +
495 exception.getMessage());
496 }
497 final ConnectionErrorPacket errorPacket = new ConnectionErrorPacket(
498 exception.getClass().getName() + " : " + exception.getMessage(),
499 null);
500 writeError(channel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
501 errorPacket);
502 logger.debug("Will close NETWORK channel: {}",
503 exception.getClass().getName() + " : " +
504 exception.getMessage());
505 ChannelCloseTimer.closeFutureChannel(channel);
506 } else {
507
508 }
509 }
510
511
512
513
514
515
516
517
518
519 public static void writeError(final Channel channel, final Integer remoteId,
520 final Integer localId,
521 final AbstractLocalPacket error) {
522 if (channel.isActive()) {
523 NetworkPacket networkPacket = null;
524 try {
525 networkPacket = new NetworkPacket(localId, remoteId, error, null);
526 } catch (final OpenR66ProtocolPacketException ignored) {
527
528 }
529 if (networkPacket != null) {
530 final NetworkPacket finalNP = networkPacket;
531 channel.eventLoop().submit(new finalNPWrite(channel, finalNP));
532 }
533 }
534 }
535
536 private static class finalNPWrite implements Runnable {
537 private final Channel channel;
538 private final NetworkPacket finalNP;
539
540 private finalNPWrite(final Channel channel, final NetworkPacket finalNP) {
541 this.channel = channel;
542 this.finalNP = finalNP;
543 }
544
545 @Override
546 public void run() {
547 channel.writeAndFlush(finalNP).awaitUninterruptibly();
548 finalNP.clear();
549 }
550 }
551
552
553
554
555 public final DbSession getDbSession() {
556 return dbSession;
557 }
558
559
560
561
562 public final boolean isSsl() {
563 return isSSL;
564 }
565 }