View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.ftp.core.control;
21  
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelException;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelHandler;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.SimpleChannelInboundHandler;
28  import io.netty.handler.ssl.SslHandler;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.GenericFutureListener;
31  import org.waarp.common.command.ReplyCode;
32  import org.waarp.common.command.exception.CommandAbstractException;
33  import org.waarp.common.command.exception.Reply421Exception;
34  import org.waarp.common.command.exception.Reply503Exception;
35  import org.waarp.common.crypto.ssl.WaarpSslUtility;
36  import org.waarp.common.logging.WaarpLogger;
37  import org.waarp.common.logging.WaarpLoggerFactory;
38  import org.waarp.common.utility.WaarpNettyUtil;
39  import org.waarp.ftp.core.command.AbstractCommand;
40  import org.waarp.ftp.core.command.FtpCommandCode;
41  import org.waarp.ftp.core.command.access.USER;
42  import org.waarp.ftp.core.command.internal.ConnectionCommand;
43  import org.waarp.ftp.core.command.internal.IncorrectCommand;
44  import org.waarp.ftp.core.control.ftps.FtpsInitializer;
45  import org.waarp.ftp.core.data.FtpTransferControl;
46  import org.waarp.ftp.core.exception.FtpNoConnectionException;
47  import org.waarp.ftp.core.session.FtpSession;
48  import org.waarp.ftp.core.utils.FtpChannelUtils;
49  
50  import java.io.IOException;
51  import java.net.BindException;
52  import java.net.ConnectException;
53  import java.nio.channels.ClosedChannelException;
54  import java.util.concurrent.RejectedExecutionException;
55  
56  /**
57   * Main Network Handler (Control part) implementing RFC 959, 775, 2389, 2428,
58   * 3659 and supports XCRC and XMD5
59   * commands.
60   */
61  public class NetworkHandler extends SimpleChannelInboundHandler<String> {
62    private static final String INTERNAL_ERROR_DISCONNECT =
63        "Internal error: disconnect";
64  
65    /**
66     * Internal Logger
67     */
68    private static final WaarpLogger logger =
69        WaarpLoggerFactory.getLogger(NetworkHandler.class);
70  
71    /**
72     * Business Handler
73     */
74    private final BusinessHandler businessHandler;
75  
76    /**
77     * Internal store for the SessionInterface
78     */
79    private final FtpSession session;
80  
81    /**
82     * The associated Channel
83     */
84    private Channel controlChannel;
85    /**
86     * ChannelHandlerContext that could be used whenever needed
87     */
88    private ChannelHandlerContext ctx;
89  
90    /**
91     * Constructor from session
92     *
93     * @param session
94     */
95    public NetworkHandler(final FtpSession session) {
96      this.session = session;
97      businessHandler = session.getBusinessHandler();
98      businessHandler.setNetworkHandler(this);
99    }
100 
101   /**
102    * @return the businessHandler
103    */
104   public final BusinessHandler getBusinessHandler() {
105     return businessHandler;
106   }
107 
108   /**
109    * @return the session
110    */
111   public final FtpSession getFtpSession() {
112     return session;
113   }
114 
115   /**
116    * @return the Control Channel
117    */
118   public final Channel getControlChannel() {
119     return controlChannel;
120   }
121 
122   /**
123    * Run firstly executeChannelClosed.
124    */
125   @Override
126   public void channelInactive(final ChannelHandlerContext ctx)
127       throws Exception {
128     if (session == null || session.getDataConn() == null ||
129         session.getDataConn().getFtpTransferControl() == null) {
130       super.channelInactive(ctx);
131       return;
132     }
133     // Wait for any command running before closing (bad client sometimes
134     // don't wait for answer)
135     int limit = 200;
136     while (session.getDataConn().getFtpTransferControl()
137                   .isFtpTransferExecuting()) {
138       Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
139       limit--;
140       if (limit <= 0) {
141         logger.warn("Waiting for transfer finished but 2s is not enough");
142         break; // wait at most 1s
143       }
144     }
145     businessHandler.executeChannelClosed();
146     // release file and other permanent objects
147     businessHandler.clear();
148     session.clear();
149     super.channelInactive(ctx);
150   }
151 
152   /**
153    * Initialize the Handler.
154    */
155   @Override
156   public void channelActive(final ChannelHandlerContext ctx) throws Exception {
157     this.ctx = ctx;
158     final Channel channel = ctx.channel();
159     controlChannel = channel;
160     session.setControlConnected();
161     FtpChannelUtils.addCommandChannel(channel, session.getConfiguration());
162     if (isStillAlive(ctx)) {
163       // Make the first execution ready
164       final AbstractCommand command = new ConnectionCommand(getFtpSession());
165       session.setNextCommand(command);
166       // This command can change the next Command
167       businessHandler.executeChannelConnected(channel);
168       // Answer ready to continue from first command = Connection
169       messageRunAnswer(ctx);
170       getFtpSession().setReady(true);
171     }
172   }
173 
174   /**
175    * If the service is going to shutdown, it sends back a 421 message to the
176    * connection
177    *
178    * @return True if the service is alive, else False if the system is going
179    *     down
180    */
181   private boolean isStillAlive(final ChannelHandlerContext ctx) {
182     if (session.getConfiguration().isShutdown()) {
183       session.setExitErrorCode("Service is going down: disconnect");
184       writeFinalAnswer(ctx);
185       return false;
186     }
187     return true;
188   }
189 
190   /**
191    * Default exception task: close the current connection after calling
192    * exceptionLocalCaught and writing if
193    * possible the current replyCode.
194    */
195   @Override
196   public void exceptionCaught(final ChannelHandlerContext ctx,
197                               final Throwable cause) {
198     this.ctx = ctx;
199     final Channel channel = ctx.channel();
200     if (session == null) {
201       // should not be
202       logger.warn("NO SESSION", cause);
203       return;
204     }
205     if (cause instanceof ConnectException) {
206       final ConnectException e2 = (ConnectException) cause;
207       logger.warn("Connection impossible since {} with Channel {}",
208                   e2.getMessage(), channel);
209     } else if (cause instanceof ChannelException) {
210       final ChannelException e2 = (ChannelException) cause;
211       logger.warn(
212           "Connection (example: timeout) impossible since {} with Channel {}",
213           e2.getMessage(), channel);
214     } else if (cause instanceof ClosedChannelException) {
215       logger.debug("Connection closed before end");
216       session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
217       if (channel.isActive()) {
218         writeFinalAnswer(ctx);
219       }
220       return;
221     } else if (cause instanceof CommandAbstractException) {
222       // FTP Exception: not close if not necessary
223       final CommandAbstractException e2 = (CommandAbstractException) cause;
224       logger.warn("Command Error Reply {}", e2.getMessage());
225       session.setReplyCode(e2);
226       businessHandler.afterRunCommandKo(e2);
227       if (channel.isActive()) {
228         writeFinalAnswer(ctx);
229       }
230       return;
231     } else if (cause instanceof NullPointerException) {
232       final NullPointerException e2 = (NullPointerException) cause;
233       logger.warn("Null pointer Exception: " + ctx.channel(), e2);
234       try {
235         session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
236         if (businessHandler != null && session.getDataConn() != null) {
237           businessHandler.exceptionLocalCaught(cause);
238           if (channel.isActive()) {
239             writeFinalAnswer(ctx);
240           }
241         }
242       } catch (final NullPointerException ignored) {
243         // nothing
244       }
245       return;
246     } else if (cause instanceof BindException) {
247       final BindException e2 = (BindException) cause;
248       logger.warn("Connection aborted since {} with Channel {}",
249                   e2.getMessage(), channel);
250       logger.debug("DEBUG", cause);
251     } else if (cause instanceof IOException) {
252       final IOException e2 = (IOException) cause;
253       logger.warn("Connection aborted since {} with Channel {}",
254                   e2.getMessage(), channel);
255       logger.debug("DEBUG", cause);
256     } else if (cause instanceof RejectedExecutionException) {
257       logger.debug("Rejected execution (shutdown) from {}", channel);
258       return;
259     } else {
260       logger.warn("Unexpected exception from Outband Ref Channel: " + channel +
261                   " Exception: " + cause.getMessage(), cause);
262     }
263     session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
264     businessHandler.exceptionLocalCaught(cause);
265     if (channel.isActive()) {
266       writeFinalAnswer(ctx);
267     }
268   }
269 
270   /**
271    * Simply call messageRun with the received message
272    */
273   @Override
274   public void channelRead0(final ChannelHandlerContext ctx, final String e) {
275     this.ctx = ctx;
276     if (isStillAlive(ctx)) {
277       // First wait for the initialization to be fully done
278       if (!session.isReady()) {
279         session.setReplyCode(
280             ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION,
281             null);
282         businessHandler.afterRunCommandKo(
283             new Reply421Exception(session.getReplyCode().getMesg()));
284         writeIntermediateAnswer(ctx);
285         return;
286       }
287       AbstractCommand command = FtpCommandCode.getFromLine(getFtpSession(), e);
288       logger.debug("RECVMSG: {} CMD: {} {}", command.getCode(), e,
289                    command.getCommand());
290       // First check if the command is an ABORT, QUIT or STAT
291       if (!FtpCommandCode.isSpecialCommand(command.getCode())) {
292         // Now check if a transfer is on its way: illegal to have at
293         // same time two commands (except ABORT). Wait is at most 100x
294         // RETRYINMS=1s
295         final FtpTransferControl control =
296             session.getDataConn().getFtpTransferControl();
297         final boolean notFinished = control.waitFtpTransferExecuting();
298         if (notFinished) {
299           session.setReplyCode(ReplyCode.REPLY_503_BAD_SEQUENCE_OF_COMMANDS,
300                                "Previous transfer command is not finished yet");
301           businessHandler.afterRunCommandKo(
302               new Reply503Exception(session.getReplyCode().getMesg()));
303           writeIntermediateAnswer(ctx);
304           return;
305         }
306       }
307       // Default message
308       session.setReplyCode(ReplyCode.REPLY_200_COMMAND_OKAY, null);
309       // Special check for SSL AUTH/PBSZ/PROT/USER/PASS/ACCT/CCC
310       if (FtpCommandCode.isSslOrAuthCommand(command.getCode())) {
311         session.setNextCommand(command);
312         messageRunAnswer(ctx);
313         return;
314       }
315       if (session.getCurrentCommand().isNextCommandValid(command)) {
316         logger.debug("Previous: {} Next: {}",
317                      session.getCurrentCommand().getCode(), command.getCode());
318         session.setNextCommand(command);
319         messageRunAnswer(ctx);
320       } else {
321         if (!session.getAuth().isIdentified()) {
322           session.setReplyCode(ReplyCode.REPLY_530_NOT_LOGGED_IN, null);
323           session.setNextCommand(new USER());
324           writeFinalAnswer(ctx);
325           return;
326         }
327         command = new IncorrectCommand();
328         command.setArgs(getFtpSession(), e, null,
329                         FtpCommandCode.IncorrectSequence);
330         session.setNextCommand(command);
331         messageRunAnswer(ctx);
332       }
333     }
334   }
335 
336   /**
337    * Write the current answer and eventually close channel if necessary (421
338    * or
339    * 221)
340    *
341    * @return True if the channel is closed due to the code
342    */
343   private boolean writeFinalAnswer(final ChannelHandlerContext ctx) {
344     if (session.getReplyCode() ==
345         ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION ||
346         session.getReplyCode() ==
347         ReplyCode.REPLY_221_CLOSING_CONTROL_CONNECTION) {
348       session.getDataConn().getFtpTransferControl().clear();
349       writeIntermediateAnswer(ctx).addListener(WaarpSslUtility.SSLCLOSE);
350       return true;
351     }
352     writeIntermediateAnswer(ctx);
353     session.setCurrentCommandFinished();
354     return false;
355   }
356 
357   /**
358    * Write an intermediate Answer from Business before last answer also set by
359    * the Business
360    *
361    * @return the ChannelFuture associated with the write
362    */
363   public final ChannelFuture writeIntermediateAnswer(
364       final ChannelHandlerContext ctx) {
365     logger.debug("Answer: {}", session.getAnswer());
366     return ctx.writeAndFlush(session.getAnswer());
367   }
368 
369   /**
370    * Write an intermediate Answer from Business before last answer also set by
371    * the Business
372    *
373    * @return the ChannelFuture associated with the write
374    */
375   public final ChannelFuture writeIntermediateAnswer() {
376     return writeIntermediateAnswer(ctx);
377   }
378 
379   /**
380    * To be extended to inform of an error to SNMP support
381    *
382    * @param error1
383    * @param error2
384    */
385   protected void callForSnmp(final String error1, final String error2) {
386     // ignore
387   }
388 
389   /**
390    * Execute one command and write the following answer
391    */
392   private void messageRunAnswer(final ChannelHandlerContext ctx) {
393     boolean error = false;
394     logger.debug("Code: {}", session.getCurrentCommand().getCode());
395     try {
396       businessHandler.beforeRunCommand();
397       final AbstractCommand command = session.getCurrentCommand();
398       logger.debug("Run {}", command.getCommand());
399       command.exec();
400       businessHandler.afterRunCommandOk();
401     } catch (final CommandAbstractException e) {
402       logger.debug("Command in error", e);
403       error = true;
404       session.setReplyCode(e);
405       businessHandler.afterRunCommandKo(e);
406     }
407     logger.debug("Code: {} [{}]", session.getCurrentCommand().getCode(),
408                  session.getReplyCode());
409     if (error) {
410       if (session.getCurrentCommand().getCode() !=
411           FtpCommandCode.INTERNALSHUTDOWN) {
412         writeFinalAnswer(ctx);
413       }
414       // In error so Check that Data is closed
415       if (session.getDataConn().isActive()) {
416         logger.debug("Closing DataChannel while command is in error");
417         try {
418           session.getDataConn().getCurrentDataChannel().close();
419         } catch (final FtpNoConnectionException e) {
420           // ignore
421         }
422       }
423       return;
424     }
425     if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH ||
426         session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
427       controlChannel.config().setAutoRead(false);
428       final ChannelFuture future = writeIntermediateAnswer(ctx);
429       session.setCurrentCommandFinished();
430       if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH) {
431         logger.debug("SSL to be added to pipeline");
432         ChannelHandler sslHandler = ctx.pipeline().first();
433         if (sslHandler instanceof SslHandler) {
434           logger.debug("Already got a SslHandler");
435         } else {
436           logger.debug("Add Explicitely SSL support to Command");
437           // add the SSL support
438           sslHandler =
439               FtpsInitializer.waarpSslContextFactory.createHandlerServer(
440                   FtpsInitializer.waarpSslContextFactory.needClientAuthentication(),
441                   false, ctx.channel());
442           session.prepareSsl();
443           WaarpSslUtility.addSslHandler(future, ctx.pipeline(), sslHandler,
444                                         new GenericFutureListener<Future<? super Channel>>() {
445                                           @Override
446                                           public final void operationComplete(
447                                               final Future<? super Channel> future) {
448                                             if (!future.isSuccess()) {
449                                               final String error2 =
450                                                   future.cause() != null?
451                                                       future.cause()
452                                                             .getMessage() :
453                                                       "During Handshake";
454                                               logger.error(
455                                                   "Cannot finalize Ssl Command channel {}",
456                                                   error2, future.cause());
457                                               callForSnmp(
458                                                   "SSL Connection Error",
459                                                   error2);
460                                               session.setSsl(false);
461                                               ctx.close();
462                                             } else {
463                                               logger.debug(
464                                                   "End of initialization of SSL and command channel: {}",
465                                                   ctx.channel());
466                                               session.setSsl(true);
467                                             }
468                                           }
469                                         });
470         }
471       } else if (session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
472         logger.debug("SSL to be removed from pipeline");
473         // remove the SSL support
474         session.prepareSsl();
475         WaarpSslUtility.removingSslHandler(future, controlChannel, false);
476       }
477     } else if (session.getCurrentCommand().getCode() !=
478                FtpCommandCode.INTERNALSHUTDOWN) {
479       writeFinalAnswer(ctx);
480     }
481   }
482 }