View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.ftp.core.data.handler;
21  
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelException;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.SimpleChannelInboundHandler;
28  import org.waarp.common.crypto.ssl.WaarpSslUtility;
29  import org.waarp.common.exception.FileTransferException;
30  import org.waarp.common.exception.InvalidArgumentException;
31  import org.waarp.common.file.DataBlock;
32  import org.waarp.common.logging.SysErrLogger;
33  import org.waarp.common.logging.WaarpLogger;
34  import org.waarp.common.logging.WaarpLoggerFactory;
35  import org.waarp.common.utility.WaarpNettyUtil;
36  import org.waarp.common.utility.WaarpStringUtils;
37  import org.waarp.ftp.core.config.FtpConfiguration;
38  import org.waarp.ftp.core.config.FtpInternalConfiguration;
39  import org.waarp.ftp.core.control.NetworkHandler;
40  import org.waarp.ftp.core.data.FtpTransfer;
41  import org.waarp.ftp.core.data.FtpTransferControl;
42  import org.waarp.ftp.core.exception.FtpNoConnectionException;
43  import org.waarp.ftp.core.exception.FtpNoFileException;
44  import org.waarp.ftp.core.exception.FtpNoTransferException;
45  import org.waarp.ftp.core.session.FtpSession;
46  import org.waarp.ftp.core.utils.FtpChannelUtils;
47  
48  import java.io.IOException;
49  import java.net.BindException;
50  import java.net.ConnectException;
51  import java.nio.channels.CancelledKeyException;
52  import java.nio.channels.ClosedChannelException;
53  import java.nio.channels.NotYetConnectedException;
54  
55  /**
56   * Network handler for Data connections
57   */
58  public class DataNetworkHandler extends SimpleChannelInboundHandler<DataBlock> {
59    /**
60     * Internal Logger
61     */
62    private static final WaarpLogger logger =
63        WaarpLoggerFactory.getLogger(DataNetworkHandler.class);
64  
65    /**
66     * Business Data Handler
67     */
68    private DataBusinessHandler dataBusinessHandler;
69  
70    /**
71     * Configuration
72     */
73    protected final FtpConfiguration configuration;
74  
75    /**
76     * Is this Data Connection an Active or Passive one
77     */
78    private final boolean isActive;
79  
80    /**
81     * Internal store for the SessionInterface
82     */
83    protected FtpSession session;
84  
85    /**
86     * The associated Channel
87     */
88    private Channel dataChannel;
89  
90    /**
91     * Pipeline
92     */
93    private ChannelPipeline channelPipeline;
94  
95    /**
96     * The associated FtpTransfer
97     */
98    private FtpTransfer ftpTransfer;
99  
100   /**
101    * Constructor from DataBusinessHandler
102    *
103    * @param configuration
104    * @param handler
105    * @param active
106    */
107   public DataNetworkHandler(final FtpConfiguration configuration,
108                             final DataBusinessHandler handler,
109                             final boolean active) {
110     this.configuration = configuration;
111     dataBusinessHandler = handler;
112     dataBusinessHandler.setDataNetworkHandler(this);
113     isActive = active;
114   }
115 
116   /**
117    * @return the dataBusinessHandler
118    *
119    * @throws FtpNoConnectionException
120    */
121   public DataBusinessHandler getDataBusinessHandler()
122       throws FtpNoConnectionException {
123     if (dataBusinessHandler == null) {
124       throw new FtpNoConnectionException("No Data Connection active");
125     }
126     return dataBusinessHandler;
127   }
128 
129   /**
130    * @return the session
131    */
132   public final FtpSession getFtpSession() {
133     return session;
134   }
135 
136   /**
137    * @return the NetworkHandler associated with the control connection
138    */
139   public final NetworkHandler getNetworkHandler() {
140     return session.getBusinessHandler().getNetworkHandler();
141   }
142 
143   /**
144    * Run firstly executeChannelClosed.
145    *
146    * @throws Exception
147    */
148   @Override
149   public void channelInactive(final ChannelHandlerContext ctx)
150       throws Exception {
151     logger.debug("Data Channel closed with a session ? {}", session != null);
152     if (session != null) {
153       if (!session.getDataConn().checkCorrectChannel(ctx.channel())) {
154         for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 10; i++) {
155           Thread.sleep(FtpInternalConfiguration.RETRYINMS);
156           if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
157             break;
158           }
159         }
160       }
161       if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
162         session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
163       } else {
164         session.getDataConn().getFtpTransferControl()
165                .setTransferAbortedFromInternal(true);
166       }
167       try {
168         getDataBusinessHandler().executeChannelClosed();
169         // release file and other permanent objects
170         getDataBusinessHandler().clear();
171       } catch (final FtpNoConnectionException ignored) {
172         // nothing
173       }
174       session.getDataConn().unbindData();
175       dataBusinessHandler = null;
176       channelPipeline = null;
177       dataChannel = null;
178     }
179     super.channelInactive(ctx);
180   }
181 
182   protected final void setSession(final Channel channel) {
183     if (session != null) {
184       return;
185     }
186     // First get the ftpSession from inetaddresses
187     session = configuration.getFtpSession(channel, isActive);
188     if (session == null) {
189       // Not found !!!
190       logger.error("Session not found for {}!", isActive? "Active" : "Passive");
191       WaarpSslUtility.closingSslChannel(channel);
192       // Problem: control connection could not be directly informed!!!
193       // Only timeout will occur
194     }
195   }
196 
197   /**
198    * Initialize the Handler.
199    */
200   @Override
201   public void channelActive(final ChannelHandlerContext ctx) {
202     final Channel channel = ctx.channel();
203     channel.config().setAutoRead(false);
204     if (session == null) {
205       setSession(channel);
206     }
207     logger.debug("Data Channel opened as {}", channel);
208     if (session == null) {
209       logger.debug(
210           "DataChannel immediately closed since no session is assigned");
211       WaarpSslUtility.closingSslChannel(ctx.channel());
212       return;
213     }
214     channelPipeline = ctx.pipeline();
215     dataChannel = channel;
216     dataBusinessHandler.setFtpSession(getFtpSession());
217     FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
218     logger.debug("DataChannel connected: {}", session.getReplyCode());
219     if (session.getCurrentCommand() != null && session.getReplyCode() != null &&
220         session.getReplyCode().getCode() >= 400) {
221       // shall not be except if an error early occurs
222       switch (session.getCurrentCommand().getCode()) {
223         case RETR:
224         case APPE:
225         case STOR:
226         case STOU:
227           // close the data channel immediately
228           logger.info(
229               "DataChannel immediately closed since {} is not ok at startup",
230               session.getCurrentCommand().getCode());
231           WaarpSslUtility.closingSslChannel(ctx.channel());
232           session.getDataConn().getFtpTransferControl()
233                  .setOpenedDataChannel(null, this);
234           return;
235         default:
236           break;
237       }
238     }
239     if (isStillAlive()) {
240       try {
241         setCorrectCodec();
242       } catch (final FtpNoConnectionException e) {
243         logger.error(e.getMessage());
244       }
245       unlockModeCodec();
246       session.getDataConn().getFtpTransferControl()
247              .setOpenedDataChannel(channel, this);
248       logger.debug("DataChannel fully configured");
249     } else {
250       // Cannot continue
251       logger.info("Connected but no more alive so will disconnect");
252       session.getDataConn().getFtpTransferControl()
253              .setOpenedDataChannel(null, this);
254     }
255   }
256 
257   /**
258    * Set the CODEC according to the mode. Must be called after each call of
259    * MODE, STRU or TYPE
260    */
261   public final void setCorrectCodec() throws FtpNoConnectionException {
262     if (channelPipeline == null) {
263       logger.error("No channelPipeline defined while session is ", session);
264       throw new FtpNoConnectionException("No Data socket opened");
265     }
266     final FtpDataModeCodec modeCodec =
267         (FtpDataModeCodec) channelPipeline.get(FtpDataInitializer.CODEC_MODE);
268     final FtpDataTypeCodec typeCodec =
269         (FtpDataTypeCodec) channelPipeline.get(FtpDataInitializer.CODEC_TYPE);
270     final FtpDataStructureCodec structureCodec =
271         (FtpDataStructureCodec) channelPipeline.get(
272             FtpDataInitializer.CODEC_STRUCTURE);
273     if (modeCodec == null || typeCodec == null || structureCodec == null) {
274       return;
275     }
276     modeCodec.setMode(session.getDataConn().getMode());
277     modeCodec.setStructure(session.getDataConn().getStructure());
278     typeCodec.setFullType(session.getDataConn().getType(),
279                           session.getDataConn().getSubType());
280     structureCodec.setStructure(session.getDataConn().getStructure());
281     logger.debug("codec setup");
282   }
283 
284   /**
285    * Unlock the Mode Codec from openConnection of {@link FtpTransferControl}
286    */
287   public final void unlockModeCodec() {
288     final FtpDataModeCodec modeCodec =
289         (FtpDataModeCodec) channelPipeline.get(FtpDataInitializer.CODEC_MODE);
290     modeCodec.setCodecReady();
291   }
292 
293   /**
294    * Default exception task: close the current connection after calling
295    * exceptionLocalCaught.
296    */
297   @Override
298   public void exceptionCaught(final ChannelHandlerContext ctx,
299                               final Throwable cause) {
300     if (session == null) {
301       logger.info("Error without any session active {}", cause);
302       return;
303     }
304     if (cause instanceof ConnectException) {
305       final ConnectException e2 = (ConnectException) cause;
306       logger.warn("Connection impossible since {}", e2.getMessage());
307     } else if (cause instanceof ChannelException) {
308       final ChannelException e2 = (ChannelException) cause;
309       logger.warn("Connection (example: timeout) impossible since {}",
310                   e2.getMessage());
311     } else if (cause instanceof ClosedChannelException) {
312       logger.debug("Connection closed before end");
313     } else if (cause instanceof InvalidArgumentException) {
314       final InvalidArgumentException e2 = (InvalidArgumentException) cause;
315       logger.warn("Bad configuration in Codec in {}", e2.getMessage());
316     } else if (cause instanceof NullPointerException) {
317       final NullPointerException e2 = (NullPointerException) cause;
318       logger.warn("Null pointer Exception", e2);
319       try {
320         if (dataBusinessHandler != null) {
321           dataBusinessHandler.exceptionLocalCaught(cause);
322           if (session.getDataConn() != null &&
323               session.getDataConn().checkCorrectChannel(ctx.channel())) {
324             session.getDataConn().getFtpTransferControl()
325                    .setTransferAbortedFromInternal(true);
326           }
327         }
328       } catch (final NullPointerException ignored) {
329         // nothing
330       }
331       return;
332     } else if (cause instanceof CancelledKeyException) {
333       final CancelledKeyException e2 = (CancelledKeyException) cause;
334       logger.warn("Connection aborted since {}", e2.getMessage());
335       // XXX TODO FIXME is it really what we should do ?
336       // No action
337       return;
338     } else if (cause instanceof IOException) {
339       final IOException e2 = (IOException) cause;
340       logger.warn("Connection aborted since {}", e2.getMessage());
341     } else if (cause instanceof NotYetConnectedException) {
342       final NotYetConnectedException e2 = (NotYetConnectedException) cause;
343       logger.debug("Ignore this exception {}", e2.getMessage());
344       return;
345     } else if (cause instanceof BindException) {
346       final BindException e2 = (BindException) cause;
347       logger.warn("Address already in use {}", e2.getMessage());
348     } else {
349       logger.warn("Unexpected exception from Outband: {}", cause.getMessage(),
350                   cause);
351     }
352     if (dataBusinessHandler != null) {
353       dataBusinessHandler.exceptionLocalCaught(cause);
354     }
355     if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
356       session.getDataConn().getFtpTransferControl()
357              .setTransferAbortedFromInternal(true);
358     }
359   }
360 
361   public final synchronized void setFtpTransfer(final FtpTransfer ftpTransfer) {
362     this.ftpTransfer = ftpTransfer;
363   }
364 
365   /**
366    * Act as needed according to the receive DataBlock message
367    */
368   @Override
369   public synchronized void channelRead0(final ChannelHandlerContext ctx,
370                                         final DataBlock dataBlock) {
371     if (ftpTransfer == null) {
372       for (int i = 0; i < 20; i++) {
373         try {
374           ftpTransfer = session.getDataConn().getFtpTransferControl()
375                                .getExecutingFtpTransfer();
376           if (ftpTransfer != null) {
377             break;
378           }
379         } catch (final FtpNoTransferException e) {
380           try {
381             Thread.sleep(WaarpNettyUtil.SIMPLE_DELAY_MS);//NOSONAR
382           } catch (final InterruptedException e1) {//NOSONAR
383             SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
384             break;
385           }
386         }
387       }
388       if (ftpTransfer == null) {
389         logger.info("No ExecutionFtpTransfer found");
390         session.getDataConn().getFtpTransferControl()
391                .setTransferAbortedFromInternal(true);
392         return;
393       }
394     }
395     if (isStillAlive()) {
396       try {
397         ftpTransfer.getFtpFile().writeDataBlock(dataBlock);
398       } catch (final FtpNoFileException e1) {
399         logger.warn(e1);
400         session.getDataConn().getFtpTransferControl()
401                .setTransferAbortedFromInternal(true);
402       } catch (final FileTransferException e1) {
403         logger.warn("File Transfer Exception: {}", e1.getMessage());
404         session.getDataConn().getFtpTransferControl()
405                .setTransferAbortedFromInternal(true);
406       }
407     } else {
408       // Shutdown
409       session.getDataConn().getFtpTransferControl()
410              .setTransferAbortedFromInternal(true);
411       WaarpSslUtility.closingSslChannel(ctx.channel());
412     }
413   }
414 
415   /**
416    * Write a simple message (like LIST) and wait for it
417    *
418    * @param message
419    *
420    * @return True if the message is correctly written
421    */
422   public final boolean writeMessage(final String message) {
423     final DataBlock dataBlock = new DataBlock();
424     dataBlock.setEOF(true);
425     dataBlock.setBlock(message.getBytes(WaarpStringUtils.UTF8));
426     final ChannelFuture future;
427     if (logger.isDebugEnabled()) {
428       logger.debug("Will write: {}", message);
429     }
430     future = dataChannel.writeAndFlush(dataBlock);
431     WaarpNettyUtil.awaitOrInterrupted(future);
432     logger.debug("Write result: {} {}", future.isSuccess(), future.cause());
433     return future.isSuccess();
434   }
435 
436   /**
437    * If the service is going to shutdown, it sends back a 421 message to the
438    * connection
439    *
440    * @return True if the service is alive, else False if the system is going
441    *     down
442    */
443   private boolean isStillAlive() {
444     if (session.getConfiguration().isShutdown()) {
445       session.setExitErrorCode("Service is going down: disconnect");
446       return false;
447     }
448     return true;
449   }
450 }