View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.protocol.networkhandler;
21  
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.handler.timeout.IdleStateEvent;
26  import io.netty.handler.timeout.ReadTimeoutException;
27  import io.netty.util.AttributeKey;
28  import org.waarp.common.crypto.ssl.WaarpSslUtility;
29  import org.waarp.common.database.DbSession;
30  import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
31  import org.waarp.common.logging.SysErrLogger;
32  import org.waarp.common.logging.WaarpLogger;
33  import org.waarp.common.logging.WaarpLoggerFactory;
34  import org.waarp.common.utility.WaarpShutdownHook;
35  import org.waarp.openr66.context.authentication.R66Auth;
36  import org.waarp.openr66.protocol.configuration.Configuration;
37  import org.waarp.openr66.protocol.exception.OpenR66Exception;
38  import org.waarp.openr66.protocol.exception.OpenR66ExceptionTrappedFactory;
39  import org.waarp.openr66.protocol.exception.OpenR66ProtocolBlackListedException;
40  import org.waarp.openr66.protocol.exception.OpenR66ProtocolBusinessNoWriteBackException;
41  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
42  import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
43  import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
44  import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
45  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
46  import org.waarp.openr66.protocol.localhandler.LocalServerHandler;
47  import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
48  import org.waarp.openr66.protocol.localhandler.packet.ConnectionErrorPacket;
49  import org.waarp.openr66.protocol.localhandler.packet.KeepAlivePacket;
50  import org.waarp.openr66.protocol.localhandler.packet.LocalPacketCodec;
51  import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
52  import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
53  import org.waarp.openr66.protocol.utils.ChannelCloseTimer;
54  import org.waarp.openr66.protocol.utils.ChannelUtils;
55  
56  import java.net.BindException;
57  import java.net.SocketAddress;
58  import java.util.concurrent.RejectedExecutionException;
59  import java.util.concurrent.atomic.AtomicInteger;
60  
61  import static org.waarp.common.database.DbConstant.*;
62  
63  /**
64   * Network Server Handler (Requester side)
65   */
66  public class NetworkServerHandler
67      extends SimpleChannelInboundHandler<NetworkPacket> {
68    /**
69     * Internal Logger
70     */
71    private static final WaarpLogger logger =
72        WaarpLoggerFactory.getLogger(NetworkServerHandler.class);
73    public static final String REUSABLE_AUTH_KEY_NAME = "ReusableAuthKey";
74    public static final AttributeKey<R66Auth> REUSABLE_AUTH_KEY =
75        AttributeKey.newInstance(REUSABLE_AUTH_KEY_NAME);
76    /**
77     * The associated Remote Address
78     */
79    private SocketAddress remoteAddress;
80    /**
81     * The associated NetworkChannelReference
82     */
83    private NetworkChannelReference networkChannelReference;
84    /**
85     * The Database connection attached to this NetworkChannelReference shared
86     * among all associated LocalChannels
87     */
88    private DbSession dbSession;
89    /**
90     * Does this Handler is for SSL
91     */
92    protected boolean isSSL;
93    /**
94     * To handle the keep alive
95     */
96    private final AtomicInteger keepAlivedSent = new AtomicInteger(0);
97    /**
98     * Is this network connection being refused (black listed)
99     */
100   protected boolean isBlackListed;
101   /**
102    * Is this network connection being refused (shutting down)
103    */
104   protected boolean isShuttingDown;
105 
106   /**
107    *
108    */
109   public NetworkServerHandler() {
110     // Empty
111   }
112 
113   @Override
114   public void channelInactive(final ChannelHandlerContext ctx) {
115     try {
116       if (Configuration.configuration.getServerConnectedChannelGroup() !=
117           null) {
118         Configuration.configuration.getServerConnectedChannelGroup()
119                                    .remove(ctx.channel());
120       }
121       if (networkChannelReference != null) {
122         if (networkChannelReference.nbLocalChannels() > 0) {
123           logger.info("Network Channel Closed: {} LocalChannels Left: {}",
124                       ctx.channel().id(),
125                       networkChannelReference.nbLocalChannels());
126           // Give an extra time if necessary to let the local channel being closed
127           final int nb =
128               Math.min(10, networkChannelReference.nbLocalChannels());
129           try {
130             Thread.sleep(Configuration.RETRYINMS * nb);
131           } catch (final InterruptedException e1) {//NOSONAR
132             SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
133           }
134         }
135         try {
136           NetworkTransaction.closedNetworkChannel(networkChannelReference);
137         } catch (final RejectedExecutionException e) {
138           logger.debug(e);
139         }
140       } else {
141         if (remoteAddress == null) {
142           remoteAddress = ctx.channel().remoteAddress();
143         }
144         try {
145           NetworkTransaction.closedNetworkChannel(remoteAddress);
146         } catch (final RejectedExecutionException e) {
147           logger.debug(e);
148         }
149       }
150       // Now force the close of the database after a wait
151       if (dbSession != null && admin != null && admin.getSession() != null &&
152           !dbSession.equals(admin.getSession())) {
153         dbSession.forceDisconnect();
154         dbSession = null;
155       }
156     } catch (final RejectedExecutionException e) {
157       logger.debug(e);
158     }
159   }
160 
161   @Override
162   public void channelActive(final ChannelHandlerContext ctx) throws Exception {
163     try {
164       final Channel netChannel = ctx.channel();
165       if (Configuration.configuration.getServerConnectedChannelGroup() !=
166           null) {
167         Configuration.configuration.getServerConnectedChannelGroup()
168                                    .add(netChannel);
169       }
170       remoteAddress = netChannel.remoteAddress();
171       logger.debug(
172           "Will the Connection be refused if Partner is BlackListed from {}",
173           remoteAddress);
174       if (NetworkTransaction.isBlacklisted(netChannel)) {
175         logger.warn("Connection refused since Partner is BlackListed from {}",
176                     remoteAddress);
177         isBlackListed = true;
178         if (Configuration.configuration.getR66Mib() != null) {
179           Configuration.configuration.getR66Mib().notifyError(
180               "Black Listed connection temptative", "During connection");
181         }
182         // close immediately the connection
183         WaarpSslUtility.closingSslChannel(netChannel);
184         return;
185       }
186       try {
187         networkChannelReference =
188             NetworkTransaction.addNetworkChannel(netChannel, isSSL);
189       } catch (final OpenR66ProtocolRemoteShutdownException e2) {
190         logger.warn("Connection refused since Partner is in Shutdown from " +
191                     remoteAddress + " : {}", e2.getMessage());
192         isShuttingDown = true;
193         // close immediately the connection
194         WaarpSslUtility.closingSslChannel(netChannel);
195         return;
196       } catch (final OpenR66ProtocolBlackListedException e2) {
197         logger.warn("Connection refused since Partner is Black Listed from " +
198                     remoteAddress + " : {}", e2.getMessage());
199         isBlackListed = true;
200         // close immediately the connection
201         WaarpSslUtility.closingSslChannel(netChannel);
202         return;
203       }
204       if (admin.isCompatibleWithThreadSharedConnexion()) {
205         dbSession = new DbSession(admin, false);
206         dbSession.useConnection();
207       } else {
208         logger.debug("DbSession will be adjusted on LocalChannelReference");
209         dbSession = admin.getSession();
210       }
211     } catch (final WaarpDatabaseNoConnectionException e1) {
212       // Cannot connect so use default connection
213       logger.warn("Use default database connection");
214       dbSession = admin.getSession();
215     }
216     logger.debug("Network Channel Connected: {} ", ctx.channel().id());
217     ctx.read();
218   }
219 
220   @Override
221   public void userEventTriggered(final ChannelHandlerContext ctx,
222                                  final Object evt) throws Exception {
223     if (Configuration.configuration.isShutdown()) {
224       return;
225     }
226     if (evt instanceof IdleStateEvent) {
227       if (networkChannelReference != null &&
228           networkChannelReference.checkLastTime(
229               Configuration.configuration.getTimeoutCon() * 2) <= 0) {
230         resetKeepAlive();
231         return;
232       }
233       if (keepAlivedSent.get() > 0) {
234         final int nbLocalChannels = networkChannelReference != null?
235             networkChannelReference.nbLocalChannels() : 0;
236         if (nbLocalChannels > 0 && keepAlivedSent.get() < 5) {
237           // ignore this time
238           keepAlivedSent.getAndIncrement();
239           return;
240         }
241         if (networkChannelReference != null &&
242             networkChannelReference.isSomeLocalChannelsActive()) {
243           // Reset counter but still waiting for a KA
244           logger.info(
245               "No KAlive yet while {} LocalChannels and {} tentatives, reset " +
246               "KA to 1", nbLocalChannels, keepAlivedSent.get());
247           keepAlivedSent.set(1);
248           return;
249         }
250         if (keepAlivedSent.get() < 5) {
251           keepAlivedSent.getAndIncrement();
252           return;
253         }
254         logger.error(
255             "Not getting KAlive: closing channel while {} LocalChannels" +
256             " and {} tentatives", nbLocalChannels, keepAlivedSent.get());
257         if (Configuration.configuration.getR66Mib() != null) {
258           Configuration.configuration.getR66Mib()
259                                      .notifyWarning("KeepAlive get no answer",
260                                                     "Closing network connection");
261         }
262         ChannelCloseTimer.closeFutureChannel(ctx.channel());
263       } else {
264         keepAlivedSent.set(1);
265         final KeepAlivePacket keepAlivePacket = new KeepAlivePacket();
266         final NetworkPacket response =
267             new NetworkPacket(ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
268                               keepAlivePacket, null);
269         logger.info("Write KAlive");
270         ctx.channel().writeAndFlush(response);
271         if (networkChannelReference != null) {
272           networkChannelReference.useIfUsed();
273         }
274       }
275     }
276   }
277 
278   public final void resetKeepAlive() {
279     keepAlivedSent.set(0);
280     if (networkChannelReference != null) {
281       networkChannelReference.useIfUsed();
282     }
283   }
284 
285   @Override
286   public void channelRead0(final ChannelHandlerContext ctx,
287                            final NetworkPacket msg) {
288     try {
289       if (isBlackListed || isShuttingDown) {
290         // ignore message since close on going
291         msg.clear();
292         return;
293       }
294       resetKeepAlive();
295       final Channel channel = ctx.channel();
296       if (msg.getCode() == LocalPacketFactory.NOOPPACKET) {
297         msg.clear();
298         // Do nothing
299         return;
300       } else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
301         logger.debug("NetworkRecv: {}", msg);
302         // Special code to STOP here
303         if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
304           final int nb = networkChannelReference.nbLocalChannels();
305           if (nb > 0) {
306             try {
307               logger.warn(
308                   "Tentative of connection failed ({}) but still some connection" +
309                   " are there so not closing the server channel immediately: {}",
310                   LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()), nb);
311             } catch (final OpenR66ProtocolPacketException ignore) {
312               logger.warn(
313                   "Tentative of connection failed but still some connection" +
314                   " are there so not closing the server channel immediately: {}",
315                   nb);
316             }
317             msg.clear();
318             return;
319           }
320           // No way to know what is wrong: close all connections with
321           // remote host
322           logger.error(
323               "Will close NETWORK channel, Cannot continue connection with remote Host: " +
324               msg + " : " + channel.remoteAddress() + " : " + nb);
325           msg.clear();
326           WaarpSslUtility.closingSslChannel(channel);
327           return;
328         }
329       } else if (msg.getCode() == LocalPacketFactory.KEEPALIVEPACKET) {
330         try {
331           final KeepAlivePacket keepAlivePacket =
332               (KeepAlivePacket) LocalPacketCodec.decodeNetworkPacket(
333                   msg.getBuffer());
334           if (keepAlivePacket.isToValidate()) {
335             keepAlivePacket.validate();
336             final NetworkPacket response =
337                 new NetworkPacket(ChannelUtils.NOCHANNEL,
338                                   ChannelUtils.NOCHANNEL, keepAlivePacket,
339                                   null);
340             logger.info("Answer KAlive");
341             ctx.writeAndFlush(response);
342           } else {
343             logger.info("Get KAlive");
344           }
345         } catch (final OpenR66ProtocolPacketException ignored) {
346           // nothing
347         } finally {
348           msg.clear();
349         }
350         return;
351       }
352       networkChannelReference.use();
353       final LocalChannelReference localChannelReference;
354       if (msg.getLocalId() == ChannelUtils.NOCHANNEL) {
355         localChannelReference =
356             NetworkTransaction.createConnectionFromNetworkChannelStartup(
357                 networkChannelReference, msg, isSSL);
358       } else {
359         if (msg.getCode() == LocalPacketFactory.ENDREQUESTPACKET) {
360           // Coming from remote
361           try {
362             localChannelReference =
363                 Configuration.configuration.getLocalTransaction()
364                                            .getClient(msg.getRemoteId(),
365                                                       msg.getLocalId());
366           } catch (final OpenR66ProtocolSystemException e1) {
367             // do not send anything since the packet is external
368             try {
369               logger.info(
370                   "Cannot get LocalChannel while an end of request comes: {}",
371                   LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
372             } catch (final OpenR66ProtocolPacketException e2) {
373               logger.info(
374                   "Cannot get LocalChannel while an end of request comes: {}",
375                   msg);
376             }
377             msg.clear();
378             return;
379           }
380           // OK continue and send to the local channel
381         } else if (msg.getCode() == LocalPacketFactory.CONNECTERRORPACKET) {
382           // Not a local error but a remote one
383           try {
384             localChannelReference =
385                 Configuration.configuration.getLocalTransaction()
386                                            .getClient(msg.getRemoteId(),
387                                                       msg.getLocalId());
388           } catch (final OpenR66ProtocolSystemException e1) {
389             // do not send anything since the packet is external
390             try {
391               logger.info(
392                   "Cannot get LocalChannel while an external error comes: {}",
393                   LocalPacketCodec.decodeNetworkPacket(msg.getBuffer()));
394             } catch (final OpenR66ProtocolPacketException e2) {
395               logger.info(
396                   "Cannot get LocalChannel while an external error comes: {}",
397                   msg);
398             }
399             msg.clear();
400             return;
401           }
402           // OK continue and send to the local channel
403         } else {
404           try {
405             localChannelReference =
406                 Configuration.configuration.getLocalTransaction()
407                                            .getClient(msg.getRemoteId(),
408                                                       msg.getLocalId());
409           } catch (final OpenR66ProtocolSystemException e1) {
410             if (remoteAddress == null) {
411               remoteAddress = channel.remoteAddress();
412             }
413             if (NetworkTransaction.isShuttingdownNetworkChannel(
414                 remoteAddress) || WaarpShutdownHook.isShutdownStarting()) {
415               // ignore
416               msg.clear();
417               return;
418             }
419             // try to send later
420             logger.info("Cannot get LocalChannel: {} due to {}", msg,
421                         e1.getMessage());
422             final ConnectionErrorPacket error = new ConnectionErrorPacket(
423                 "Cannot get localChannel since localId is not found anymore",
424                 String.valueOf(msg.getLocalId()));
425             writeError(channel, msg.getRemoteId(), msg.getLocalId(), error);
426             msg.clear();
427             return;
428           }
429         }
430       }
431       // check if not already in shutdown or closed
432       if (NetworkTransaction.isShuttingdownNetworkChannel(remoteAddress) ||
433           WaarpShutdownHook.isShutdownStarting()) {
434         logger.debug(
435             "Cannot use LocalChannel since already in shutdown: " + msg);
436         // ignore
437         msg.clear();
438         return;
439       }
440       LocalServerHandler.channelRead0(localChannelReference, msg);
441     } finally {
442       ctx.read();
443     }
444   }
445 
446   @Override
447   public void exceptionCaught(final ChannelHandlerContext ctx,
448                               final Throwable cause) {
449     final Channel channel = ctx.channel();
450     if (isBlackListed || isShuttingDown) {
451       logger.info("While partner is blacklisted, Network Channel Exception: {}",
452                   channel.id(), cause.getClass().getName() + " : " + cause);
453       // ignore
454       return;
455     }
456     logger.debug("Network Channel Exception: {}", channel.id(), cause);
457     if (cause instanceof ReadTimeoutException) {
458       final ReadTimeoutException exception = (ReadTimeoutException) cause;
459       // No read for too long
460       logger.error("ReadTimeout so Will close NETWORK channel {}",
461                    exception.getClass().getName() + " : " +
462                    exception.getMessage());
463       ChannelCloseTimer.closeFutureChannel(channel);
464       return;
465     }
466     if (cause instanceof BindException) {
467       // received when not yet connected
468       logger.debug("BindException");
469       ChannelCloseTimer.closeFutureChannel(channel);
470       return;
471     }
472     final OpenR66Exception exception =
473         OpenR66ExceptionTrappedFactory.getExceptionFromTrappedException(channel,
474                                                                         cause);
475     if (exception != null) {
476       if (exception instanceof OpenR66ProtocolBusinessNoWriteBackException) {
477         if (networkChannelReference != null &&
478             networkChannelReference.nbLocalChannels() > 0) {
479           logger.info("Network Channel Exception: {} {}", channel.id(),
480                       exception.getClass().getName() + " : " +
481                       exception.getMessage());
482         }
483         logger.debug("Will close NETWORK channel");
484         ChannelCloseTimer.closeFutureChannel(channel);
485         return;
486       } else if (exception instanceof OpenR66ProtocolNoConnectionException) {
487         logger.info("Connection impossible with NETWORK channel {}",
488                     exception.getClass().getName() + " : " +
489                     exception.getMessage());
490         channel.close();
491         return;
492       } else {
493         logger.info("Network Channel Exception: {} {}", channel.id(),
494                     exception.getClass().getName() + " : " +
495                     exception.getMessage());
496       }
497       final ConnectionErrorPacket errorPacket = new ConnectionErrorPacket(
498           exception.getClass().getName() + " : " + exception.getMessage(),
499           null);
500       writeError(channel, ChannelUtils.NOCHANNEL, ChannelUtils.NOCHANNEL,
501                  errorPacket);
502       logger.debug("Will close NETWORK channel: {}",
503                    exception.getClass().getName() + " : " +
504                    exception.getMessage());
505       ChannelCloseTimer.closeFutureChannel(channel);
506     } else {
507       // Nothing to do
508     }
509   }
510 
511   /**
512    * Write error back to remote client
513    *
514    * @param channel
515    * @param remoteId
516    * @param localId
517    * @param error
518    */
519   public static void writeError(final Channel channel, final Integer remoteId,
520                                 final Integer localId,
521                                 final AbstractLocalPacket error) {
522     if (channel.isActive()) {
523       NetworkPacket networkPacket = null;
524       try {
525         networkPacket = new NetworkPacket(localId, remoteId, error, null);
526       } catch (final OpenR66ProtocolPacketException ignored) {
527         // nothing
528       }
529       if (networkPacket != null) {
530         final NetworkPacket finalNP = networkPacket;
531         channel.eventLoop().submit(new finalNPWrite(channel, finalNP));
532       }
533     }
534   }
535 
536   private static class finalNPWrite implements Runnable {
537     private final Channel channel;
538     private final NetworkPacket finalNP;
539 
540     private finalNPWrite(final Channel channel, final NetworkPacket finalNP) {
541       this.channel = channel;
542       this.finalNP = finalNP;
543     }
544 
545     @Override
546     public void run() {
547       channel.writeAndFlush(finalNP).awaitUninterruptibly();
548       finalNP.clear();
549     }
550   }
551 
552   /**
553    * @return the dbSession
554    */
555   public final DbSession getDbSession() {
556     return dbSession;
557   }
558 
559   /**
560    * @return True if this Handler is for SSL
561    */
562   public final boolean isSsl() {
563     return isSSL;
564   }
565 }