View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.ftp.core.config;
21  
22  import io.netty.bootstrap.Bootstrap;
23  import io.netty.bootstrap.ServerBootstrap;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.group.ChannelGroup;
29  import io.netty.channel.group.DefaultChannelGroup;
30  import io.netty.channel.nio.NioEventLoopGroup;
31  import io.netty.handler.traffic.ChannelTrafficShapingHandler;
32  import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
33  import io.netty.util.concurrent.EventExecutorGroup;
34  import org.waarp.common.command.exception.Reply425Exception;
35  import org.waarp.common.crypto.ssl.WaarpSslUtility;
36  import org.waarp.common.logging.WaarpLogger;
37  import org.waarp.common.logging.WaarpLoggerFactory;
38  import org.waarp.common.utility.DetectionUtils;
39  import org.waarp.common.utility.WaarpNettyUtil;
40  import org.waarp.common.utility.WaarpShutdownHook;
41  import org.waarp.common.utility.WaarpThreadFactory;
42  import org.waarp.ftp.core.control.FtpInitializer;
43  import org.waarp.ftp.core.control.ftps.FtpsInitializer;
44  import org.waarp.ftp.core.data.handler.FtpDataInitializer;
45  import org.waarp.ftp.core.data.handler.ftps.FtpsDataInitializer;
46  import org.waarp.ftp.core.exception.FtpNoConnectionException;
47  import org.waarp.ftp.core.session.FtpSession;
48  import org.waarp.ftp.core.session.FtpSessionReference;
49  import org.waarp.ftp.core.utils.FtpChannelUtils;
50  import org.waarp.ftp.core.utils.FtpShutdownHook;
51  
52  import java.net.InetAddress;
53  import java.net.InetSocketAddress;
54  import java.util.concurrent.ConcurrentHashMap;
55  import java.util.concurrent.ExecutorService;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.ScheduledExecutorService;
58  import java.util.concurrent.atomic.AtomicInteger;
59  
60  /**
61   * Internal configuration of the FTP server, related to Netty
62   */
63  public class FtpInternalConfiguration {
64    // Static values
65    /**
66     * Internal Logger
67     */
68    private static final WaarpLogger logger =
69        WaarpLoggerFactory.getLogger(FtpInternalConfiguration.class);
70  
71    // Network Internals
72    /**
73     * Time elapse for retry in ms
74     */
75    public static final long RETRYINMS = 10;
76  
77    /**
78     * Number of retry before error
79     */
80    public static final int RETRYNB = 10;
81  
82    /**
83     * Hack to say Windows or Unix (USR1 not OK on Windows)
84     */
85    static Boolean isUnix;
86  
87    // Dynamic values
88    /**
89     * List of all Command Channels to enable the close call on them using Netty
90     * ChannelGroup
91     */
92    private ChannelGroup commandChannelGroup;
93  
94    /**
95     * ExecutorService Server
96     */
97    private final EventLoopGroup execServer;
98  
99    /**
100    * ExecutorService Worker
101    */
102   private final EventLoopGroup execWorker;
103 
104   /**
105    * List of all Data Channels to enable the close call on them using Netty
106    * ChannelGroup
107    */
108   private ChannelGroup dataChannelGroup;
109 
110   /**
111    * ExecutorService Command Event Loop
112    */
113   private final EventLoopGroup execCommandEvent;
114 
115   /**
116    * ExecutorService Data Event Loop
117    */
118   private final EventLoopGroup execDataEvent;
119 
120   /**
121    * ExecutorService Data Active Server
122    */
123   private final EventLoopGroup execDataServer;
124 
125   /**
126    * ExecutorService Data Active Worker
127    */
128   private final EventLoopGroup execDataWorker;
129 
130   /**
131    * FtpSession references used by Data Connection process
132    */
133   private final FtpSessionReference ftpSessionReference =
134       new FtpSessionReference();
135 
136   /**
137    * Bootstrap for Active connections
138    */
139   private Bootstrap activeBootstrap;
140 
141   /**
142    * ServerBootStrap for Passive connections
143    */
144   private ServerBootstrap passiveBootstrap;
145 
146   /**
147    * Scheduler for TrafficCounter
148    */
149   private final ScheduledExecutorService executorService =
150       Executors.newScheduledThreadPool(2,
151                                        new WaarpThreadFactory("TimerTrafficFtp",
152                                                               false));
153 
154   /**
155    * Global TrafficCounter (set from global configuration)
156    */
157   private FtpGlobalTrafficShapingHandler globalTrafficShapingHandler;
158 
159   /**
160    * Does the FTP will be SSL native based (990 989 port)
161    */
162   private boolean usingNativeSsl;
163 
164   /**
165    * Does the FTP accept AUTH and PROT
166    */
167   private boolean acceptAuthProt;
168   /**
169    * Bootstrap for Active Ssl connections
170    */
171   private Bootstrap activeSslBootstrap;
172 
173   /**
174    * ServerBootStrap for Passive Ssl connections
175    */
176   private ServerBootstrap passiveSslBootstrap;
177 
178   /**
179    * org.waarp.ftp.core.config BindAddress
180    */
181   public static class BindAddress {
182     /**
183      * Parent passive channel
184      */
185     public final Channel parent;
186 
187     /**
188      * Number of binded Data connections
189      */
190     public final AtomicInteger nbBind = new AtomicInteger();
191 
192     /**
193      * Constructor
194      *
195      * @param channel
196      */
197     public BindAddress(final Channel channel) {
198       parent = channel;
199       nbBind.set(0);
200     }
201   }
202 
203   /**
204    * List of already bind local addresses for Passive connections
205    */
206   private final ConcurrentHashMap<InetSocketAddress, BindAddress>
207       hashBindPassiveDataConn =
208       new ConcurrentHashMap<InetSocketAddress, BindAddress>();
209 
210   /**
211    * Global Configuration
212    */
213   private final FtpConfiguration configuration;
214 
215   /**
216    * Constructor
217    *
218    * @param configuration
219    */
220   public FtpInternalConfiguration(final FtpConfiguration configuration) {
221     this.configuration = configuration;
222     isUnix = !DetectionUtils.isWindows();
223     configuration.getShutdownConfiguration().timeout =
224         configuration.getTimeoutCon();
225     new FtpShutdownHook(configuration.getShutdownConfiguration(),
226                         configuration);
227     execCommandEvent = new NioEventLoopGroup(configuration.getClientThread(),
228                                              new WaarpThreadFactory("Command"));
229     execDataEvent = new NioEventLoopGroup(configuration.getClientThread(),
230                                           new WaarpThreadFactory("Data"));
231     execServer = new NioEventLoopGroup(configuration.getServerThread(),
232                                        new WaarpThreadFactory("CommandServer"));
233     execWorker = new NioEventLoopGroup(configuration.getClientThread(),
234                                        new WaarpThreadFactory("CommandWorker"));
235     execDataServer = new NioEventLoopGroup(configuration.getServerThread(),
236                                            new WaarpThreadFactory(
237                                                "DataServer"));
238     execDataWorker = new NioEventLoopGroup(configuration.getClientThread() * 2,
239                                            new WaarpThreadFactory(
240                                                "DataWorker"));
241   }
242 
243   /**
244    * Startup the server
245    *
246    * @throws FtpNoConnectionException
247    */
248   public final void serverStartup() throws FtpNoConnectionException {
249     logger.debug("Start groups");
250     // Command
251     commandChannelGroup =
252         new DefaultChannelGroup(configuration.fromClass.getName(),
253                                 execWorker.next());
254     // Data
255     dataChannelGroup =
256         new DefaultChannelGroup(configuration.fromClass.getName() + ".data",
257                                 execWorker.next());
258 
259     logger.debug("Start data connections");
260     // Passive Data Connections
261     passiveBootstrap = new ServerBootstrap();
262     WaarpNettyUtil.setServerBootstrap(passiveBootstrap, execDataServer,
263                                       execDataWorker,
264                                       (int) configuration.getTimeoutCon(),
265                                       configuration.getBlocksize() + 1024,
266                                       true);
267     if (usingNativeSsl) {
268       passiveBootstrap.childHandler(
269           new FtpsDataInitializer(configuration.dataBusinessHandler,
270                                   configuration, false));
271     } else {
272       passiveBootstrap.childHandler(
273           new FtpDataInitializer(configuration.dataBusinessHandler,
274                                  configuration, false));
275     }
276     if (acceptAuthProt) {
277       passiveSslBootstrap = new ServerBootstrap();
278       WaarpNettyUtil.setServerBootstrap(passiveSslBootstrap, execDataServer,
279                                         execDataWorker,
280                                         (int) configuration.getTimeoutCon(),
281                                         configuration.getBlocksize() + 1024,
282                                         true);
283       passiveSslBootstrap.childHandler(
284           new FtpsDataInitializer(configuration.dataBusinessHandler,
285                                   configuration, false));
286     } else {
287       passiveSslBootstrap = passiveBootstrap;
288     }
289 
290     // Active Data Connections
291     activeBootstrap = new Bootstrap();
292     WaarpNettyUtil.setBootstrap(activeBootstrap, execDataWorker,
293                                 (int) configuration.getTimeoutCon(),
294                                 configuration.getBlocksize() + 1024, true);
295     if (usingNativeSsl) {
296       activeBootstrap.handler(
297           new FtpsDataInitializer(configuration.dataBusinessHandler,
298                                   configuration, true));
299     } else {
300       activeBootstrap.handler(
301           new FtpDataInitializer(configuration.dataBusinessHandler,
302                                  configuration, true));
303     }
304     if (acceptAuthProt) {
305       activeSslBootstrap = new Bootstrap();
306       WaarpNettyUtil.setBootstrap(activeSslBootstrap, execDataWorker,
307                                   (int) configuration.getTimeoutCon(),
308                                   configuration.getBlocksize() + 1024, true);
309       activeSslBootstrap.handler(
310           new FtpsDataInitializer(configuration.dataBusinessHandler,
311                                   configuration, true));
312     } else {
313       activeSslBootstrap = activeBootstrap;
314     }
315 
316     logger.debug("Start command connections {}", configuration.getServerPort());
317     // Main Command server
318     /*
319      * Bootstrap for Command server
320      */
321     final ServerBootstrap serverBootstrap = new ServerBootstrap();
322     WaarpNettyUtil.setServerBootstrap(serverBootstrap, execServer, execWorker,
323                                       (int) configuration.getTimeoutCon(),
324                                       configuration.getBlocksize(), true);
325     if (usingNativeSsl) {
326       serverBootstrap.childHandler(
327           new FtpsInitializer(configuration.businessHandler, configuration));
328     } else {
329       serverBootstrap.childHandler(
330           new FtpInitializer(configuration.businessHandler, configuration));
331     }
332     final InetSocketAddress socketAddress =
333         new InetSocketAddress(configuration.getServerPort());
334     ChannelFuture future = serverBootstrap.bind(socketAddress);
335     try {
336       future = future.sync();
337     } catch (final InterruptedException e) {//NOSONAR
338       logger.error("Cannot start command conections: {}", e.getMessage());
339       throw new FtpNoConnectionException("Can't initiate the FTP server", e);
340     }
341     if (!future.isSuccess()) {
342       logger.error("Cannot start command conections");
343       throw new FtpNoConnectionException("Can't initiate the FTP server");
344     }
345     FtpChannelUtils.addCommandChannel(future.channel(), configuration);
346 
347     // Init Shutdown Hook handler
348     configuration.getShutdownConfiguration().timeout =
349         configuration.getTimeoutCon();
350     WaarpShutdownHook.addShutdownHook();
351     // Factory for TrafficShapingHandler
352     globalTrafficShapingHandler =
353         new FtpGlobalTrafficShapingHandler(executorService,
354                                            configuration.getServerGlobalWriteLimit(),
355                                            configuration.getServerGlobalReadLimit(),
356                                            configuration.getServerChannelWriteLimit(),
357                                            configuration.getServerChannelReadLimit(),
358                                            configuration.getDelayLimit());
359   }
360 
361   /**
362    * @return an ExecutorService
363    */
364   public final ExecutorService getWorker() {
365     return execWorker;
366   }
367 
368   /**
369    * Add a session from a couple of addresses
370    *
371    * @param ipOnly
372    * @param fullIp
373    * @param session
374    */
375   public final void setNewFtpSession(final InetAddress ipOnly,
376                                      final InetSocketAddress fullIp,
377                                      final FtpSession session) {
378     ftpSessionReference.setNewFtpSession(ipOnly, fullIp, session);
379   }
380 
381   /**
382    * Return and remove the FtpSession
383    *
384    * @param channel
385    *
386    * @return the FtpSession if it exists associated to this channel
387    */
388   public final FtpSession getFtpSession(final Channel channel) {
389     return ftpSessionReference.getPassiveFtpSession(channel);
390   }
391 
392   /**
393    * Remove the FtpSession
394    *
395    * @param ipOnly
396    * @param fullIp
397    */
398   public final void delFtpSession(final InetAddress ipOnly,
399                                   final InetSocketAddress fullIp) {
400     ftpSessionReference.delFtpSession(ipOnly, fullIp);
401   }
402 
403   /**
404    * @return the number of Active Sessions
405    */
406   public final int getNumberSessions() {
407     return ftpSessionReference.sessionsNumber();
408   }
409 
410   /**
411    * @param channel
412    *
413    * @return the FtpSession if found
414    */
415   public final FtpSession findPassiveFtpSession(final Channel channel) {
416     return ftpSessionReference.findPassive(channel);
417   }
418 
419   /**
420    * Try to add a Passive Channel listening to the specified local address
421    *
422    * @param address
423    * @param ssl
424    *
425    * @throws Reply425Exception in case the channel cannot be opened
426    */
427   public final void bindPassive(final InetSocketAddress address,
428                                 final boolean ssl) throws Reply425Exception {
429     configuration.bindLock();
430     try {
431       BindAddress bindAddress = hashBindPassiveDataConn.get(address);
432       if (bindAddress == null) {
433         logger.debug("Bind really to {}", address);
434         final Channel parentChannel;
435         try {
436           final ChannelFuture future;
437           if (ssl) {
438             future = passiveSslBootstrap.bind(address);
439           } else {
440             future = passiveBootstrap.bind(address);
441           }
442           if (future.await(configuration.getTimeoutCon())) {
443             parentChannel = future.sync().channel();
444           } else {
445             logger.warn("Cannot open passive connection due to Timeout");
446             throw new Reply425Exception(
447                 "Cannot open a Passive Connection due to Timeout");
448           }
449         } catch (final ChannelException e) {
450           logger.warn("Cannot open passive connection {}", e.getMessage());
451           throw new Reply425Exception("Cannot open a Passive Connection");
452         } catch (final InterruptedException e) {//NOSONAR
453           logger.warn("Cannot open passive connection {}", e.getMessage());
454           throw new Reply425Exception("Cannot open a Passive Connection");
455         }
456         bindAddress = new BindAddress(parentChannel);
457         FtpChannelUtils.addDataChannel(parentChannel, configuration);
458         hashBindPassiveDataConn.put(address, bindAddress);
459       }
460       bindAddress.nbBind.getAndIncrement();
461       logger.debug("Bind number to {} is {}", address, bindAddress.nbBind);
462     } finally {
463       configuration.bindUnlock();
464     }
465   }
466 
467   /**
468    * Try to unbind (closing the parent channel) the Passive Channel listening
469    * to
470    * the specified local address if
471    * the last one. It returns only when the underlying parent channel is
472    * closed
473    * if this was the last session
474    * that wants to open on this local address.
475    *
476    * @param address
477    */
478   public final void unbindPassive(final InetSocketAddress address) {
479     configuration.bindLock();
480     try {
481       final BindAddress bindAddress = hashBindPassiveDataConn.get(address);
482       if (bindAddress != null) {
483         bindAddress.nbBind.getAndDecrement();
484         logger.debug("Bind number to {} left is {}", address,
485                      bindAddress.nbBind);
486         if (bindAddress.nbBind.get() == 0) {
487           final ChannelFuture future =
488               WaarpSslUtility.closingSslChannel(bindAddress.parent);
489           hashBindPassiveDataConn.remove(address);
490           future.awaitUninterruptibly();
491         }
492       } else {
493         logger.warn("No Bind to {}", address);
494       }
495     } finally {
496       configuration.bindUnlock();
497     }
498   }
499 
500   /**
501    * @return the number of Binded Passive Connections
502    */
503   public final int getNbBindedPassive() {
504     return hashBindPassiveDataConn.size();
505   }
506 
507   /**
508    * Return the associated Executor for Command Event
509    *
510    * @return the Command Event Executor
511    */
512   public final EventExecutorGroup getExecutor() {
513     return execCommandEvent;
514   }
515 
516   /**
517    * Return the associated Executor for Data Event
518    *
519    * @return the Data Event Executor
520    */
521   public final EventExecutorGroup getDataExecutor() {
522     return execDataEvent;
523   }
524 
525   /**
526    * @param ssl
527    *
528    * @return the ActiveBootstrap
529    */
530   public final Bootstrap getActiveBootstrap(final boolean ssl) {
531     if (ssl) {
532       return activeSslBootstrap;
533     } else {
534       return activeBootstrap;
535     }
536   }
537 
538   /**
539    * @return the commandChannelGroup
540    */
541   public final ChannelGroup getCommandChannelGroup() {
542     return commandChannelGroup;
543   }
544 
545   /**
546    * @return the dataChannelGroup
547    */
548   public final ChannelGroup getDataChannelGroup() {
549     return dataChannelGroup;
550   }
551 
552   /**
553    * @return The TrafficCounterFactory
554    */
555   public final FtpGlobalTrafficShapingHandler getGlobalTrafficShapingHandler() {
556     return globalTrafficShapingHandler;
557   }
558 
559   /**
560    * @return a new ChannelTrafficShapingHandler
561    */
562   public final ChannelTrafficShapingHandler newChannelTrafficShapingHandler() {
563     if (configuration.getServerChannelWriteLimit() == 0 &&
564         configuration.getServerChannelReadLimit() == 0) {
565       return null;
566     }
567     if (globalTrafficShapingHandler instanceof GlobalChannelTrafficShapingHandler) {
568       return null;
569     }
570     return new FtpChannelTrafficShapingHandler(
571         configuration.getServerChannelWriteLimit(),
572         configuration.getServerChannelReadLimit(),
573         configuration.getDelayLimit());
574   }
575 
576   public final void releaseResources() {
577     WaarpSslUtility.forceCloseAllSslChannels();
578     execWorker.shutdownGracefully();
579     execDataWorker.shutdownGracefully();
580     execServer.shutdownGracefully();
581     execDataServer.shutdownGracefully();
582     if (globalTrafficShapingHandler != null) {
583       globalTrafficShapingHandler.release();
584     }
585     executorService.shutdown();
586   }
587 
588   public final boolean isAcceptAuthProt() {
589     return acceptAuthProt;
590   }
591 
592   /**
593    * @return the usingNativeSsl
594    */
595   public final boolean isUsingNativeSsl() {
596     return usingNativeSsl;
597   }
598 
599   /**
600    * @param usingNativeSsl the usingNativeSsl to set
601    */
602   public final void setUsingNativeSsl(final boolean usingNativeSsl) {
603     this.usingNativeSsl = usingNativeSsl;
604   }
605 
606   /**
607    * @param acceptAuthProt the acceptAuthProt to set
608    */
609   public final void setAcceptAuthProt(final boolean acceptAuthProt) {
610     this.acceptAuthProt = acceptAuthProt;
611   }
612 
613 }