View Javadoc

1   /**
2    * This file is part of Waarp Project.
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author tags. See the
5    * COPYRIGHT.txt in the distribution for a full listing of individual contributors.
6    * 
7    * All Waarp Project is free software: you can redistribute it and/or modify it under the terms of
8    * the GNU General Public License as published by the Free Software Foundation, either version 3 of
9    * the License, or (at your option) any later version.
10   * 
11   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
12   * the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13   * Public License for more details.
14   * 
15   * You should have received a copy of the GNU General Public License along with Waarp . If not, see
16   * <http://www.gnu.org/licenses/>.
17   */
18  package org.waarp.ftp.core.data.handler;
19  
20  import java.io.IOException;
21  import java.net.BindException;
22  import java.net.ConnectException;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.NotYetConnectedException;
26  
27  import io.netty.buffer.ByteBuf;
28  import io.netty.buffer.Unpooled;
29  import io.netty.channel.Channel;
30  import io.netty.channel.ChannelException;
31  import io.netty.channel.ChannelFuture;
32  import io.netty.channel.ChannelHandlerContext;
33  import io.netty.channel.ChannelPipeline;
34  import io.netty.channel.SimpleChannelInboundHandler;
35  
36  import org.waarp.common.crypto.ssl.WaarpSslUtility;
37  import org.waarp.common.exception.FileTransferException;
38  import org.waarp.common.exception.InvalidArgumentException;
39  import org.waarp.common.file.DataBlock;
40  import org.waarp.common.logging.WaarpLogger;
41  import org.waarp.common.logging.WaarpLoggerFactory;
42  import org.waarp.common.utility.WaarpStringUtils;
43  import org.waarp.ftp.core.config.FtpConfiguration;
44  import org.waarp.ftp.core.config.FtpInternalConfiguration;
45  import org.waarp.ftp.core.control.NetworkHandler;
46  import org.waarp.ftp.core.data.FtpTransfer;
47  import org.waarp.ftp.core.data.FtpTransferControl;
48  import org.waarp.ftp.core.exception.FtpNoConnectionException;
49  import org.waarp.ftp.core.exception.FtpNoFileException;
50  import org.waarp.ftp.core.exception.FtpNoTransferException;
51  import org.waarp.ftp.core.session.FtpSession;
52  import org.waarp.ftp.core.utils.FtpChannelUtils;
53  
54  /**
55   * Network handler for Data connections
56   * 
57   * @author Frederic Bregier
58   * 
59   */
60  public class DataNetworkHandler extends SimpleChannelInboundHandler<DataBlock> {
61      /**
62       * Internal Logger
63       */
64      private static final WaarpLogger logger = WaarpLoggerFactory
65              .getLogger(DataNetworkHandler.class);
66  
67      /**
68       * Business Data Handler
69       */
70      private DataBusinessHandler dataBusinessHandler = null;
71  
72      /**
73       * Configuration
74       */
75      protected final FtpConfiguration configuration;
76  
77      /**
78       * Is this Data Connection an Active or Passive one
79       */
80      private final boolean isActive;
81  
82      /**
83       * Internal store for the SessionInterface
84       */
85      protected FtpSession session = null;
86  
87      /**
88       * The associated Channel
89       */
90      private Channel dataChannel = null;
91  
92      /**
93       * Pipeline
94       */
95      private ChannelPipeline channelPipeline = null;
96  
97      /**
98       * The associated FtpTransfer
99       */
100     private volatile FtpTransfer ftpTransfer = null;
101     
102     /**
103      * Constructor from DataBusinessHandler
104      * 
105      * @param configuration
106      * @param handler
107      * @param active
108      */
109     public DataNetworkHandler(FtpConfiguration configuration,
110             DataBusinessHandler handler, boolean active) {
111         super();
112         this.configuration = configuration;
113         dataBusinessHandler = handler;
114         dataBusinessHandler.setDataNetworkHandler(this);
115         isActive = active;
116     }
117 
118     /**
119      * @return the dataBusinessHandler
120      * @throws FtpNoConnectionException
121      */
122     public DataBusinessHandler getDataBusinessHandler()
123             throws FtpNoConnectionException {
124         if (dataBusinessHandler == null) {
125             throw new FtpNoConnectionException("No Data Connection active");
126         }
127         return dataBusinessHandler;
128     }
129 
130     /**
131      * @return the session
132      */
133     public FtpSession getFtpSession() {
134         return session;
135     }
136 
137     /**
138      * 
139      * @return the NetworkHandler associated with the control connection
140      */
141     public NetworkHandler getNetworkHandler() {
142         return session.getBusinessHandler().getNetworkHandler();
143     }
144 
145     /**
146      * Run firstly executeChannelClosed.
147      * 
148      * @throws Exception
149      */
150     @Override
151     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
152         logger.debug("Data Channel closed with a session ? "+(session !=null));
153         if (session != null) {
154             if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
155                 session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
156             } else {
157                 session.getDataConn().getFtpTransferControl().setTransferAbortedFromInternal(true);
158             }
159             session.getDataConn().unbindPassive();
160             try {
161                 getDataBusinessHandler().executeChannelClosed();
162                 // release file and other permanent objects
163                 getDataBusinessHandler().clear();
164             } catch (FtpNoConnectionException e1) {
165             }
166             dataBusinessHandler = null;
167             channelPipeline = null;
168             dataChannel = null;
169         }
170         super.channelInactive(ctx);
171     }
172 
173     protected void setSession(Channel channel) {
174         // First get the ftpSession from inetaddresses
175         for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
176             session = configuration.getFtpSession(channel, isActive);
177             if (session == null) {
178                 logger.warn("Session not found at try " + i);
179                 try {
180                     Thread.sleep(FtpInternalConfiguration.RETRYINMS);
181                 } catch (InterruptedException e1) {
182                     break;
183                 }
184             } else {
185                 break;
186             }
187         }
188         if (session == null) {
189             // Not found !!!
190             logger.error("Session not found!");
191             WaarpSslUtility.closingSslChannel(channel);
192             // Problem: control connection could not be directly informed!!!
193             // Only timeout will occur
194             return;
195         }
196     }
197 
198     /**
199      * Initialize the Handler.
200      * 
201      */
202     @Override
203     public void channelActive(ChannelHandlerContext ctx) throws Exception {
204         Channel channel = ctx.channel();
205         channel.config().setAutoRead(false);
206         if (session == null) {
207             setSession(channel);
208         }
209         logger.debug("Data Channel opened as "+channel);
210         if (session == null) {
211             logger.debug("DataChannel immediately closed since no session is assigned");
212             WaarpSslUtility.closingSslChannel(ctx.channel());
213             return;
214         }
215         channelPipeline = ctx.pipeline();
216         dataChannel = channel;
217         dataBusinessHandler.setFtpSession(getFtpSession());
218         FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
219         logger.debug("DataChannel connected: " + session.getReplyCode());
220         if (session.getReplyCode().getCode() >= 400) {
221             // shall not be except if an error early occurs
222             switch (session.getCurrentCommand().getCode()) {
223                 case RETR:
224                 case APPE:
225                 case STOR:
226                 case STOU:
227                     // close the data channel immediately
228                     logger.debug("DataChannel immediately closed since " + session.getCurrentCommand().getCode()
229                             + " is not ok at startup");
230                     WaarpSslUtility.closingSslChannel(ctx.channel());
231                     return;
232                 default:
233                     break;
234             }
235         }
236         if (isStillAlive()) {
237             setCorrectCodec();
238             unlockModeCodec();
239             session.getDataConn().getFtpTransferControl().setOpenedDataChannel(channel, this);
240             logger.debug("DataChannel fully configured");
241         } else {
242             // Cannot continue
243             logger.debug("Connected but no more alive so will disconnect");
244             session.getDataConn().getFtpTransferControl().setOpenedDataChannel(null, this);
245             return;
246         }
247     }
248 
249     /**
250      * Set the CODEC according to the mode. Must be called after each call of MODE, STRU or TYPE
251      */
252     public void setCorrectCodec() {
253         FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
254                 .get(FtpDataInitializer.CODEC_MODE);
255         FtpDataTypeCodec typeCodec = (FtpDataTypeCodec) channelPipeline
256                 .get(FtpDataInitializer.CODEC_TYPE);
257         FtpDataStructureCodec structureCodec = (FtpDataStructureCodec) channelPipeline
258                 .get(FtpDataInitializer.CODEC_STRUCTURE);
259         if (modeCodec == null || typeCodec == null || structureCodec == null) {
260             return;
261         }
262         modeCodec.setMode(session.getDataConn().getMode());
263         modeCodec.setStructure(session.getDataConn().getStructure());
264         typeCodec.setFullType(session.getDataConn().getType(), session
265                 .getDataConn().getSubType());
266         structureCodec.setStructure(session.getDataConn().getStructure());
267         logger.debug("codec setup");
268     }
269 
270     /**
271      * Unlock the Mode Codec from openConnection of {@link FtpTransferControl}
272      * 
273      */
274     public void unlockModeCodec() {
275         FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
276                 .get(FtpDataInitializer.CODEC_MODE);
277         modeCodec.setCodecReady();
278     }
279 
280     /**
281      * Default exception task: close the current connection after calling exceptionLocalCaught.
282      * 
283      */
284     @Override
285     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
286         if (session == null) {
287             logger.debug("Error without any session active {}", cause);
288             return;
289         }
290         Throwable e1 = cause;
291         if (e1 instanceof ConnectException) {
292             ConnectException e2 = (ConnectException) e1;
293             logger.warn("Connection impossible since {}", e2.getMessage());
294         } else if (e1 instanceof ChannelException) {
295             ChannelException e2 = (ChannelException) e1;
296             logger.warn("Connection (example: timeout) impossible since {}", e2
297                     .getMessage());
298         } else if (e1 instanceof ClosedChannelException) {
299             logger.debug("Connection closed before end");
300         } else if (e1 instanceof InvalidArgumentException) {
301             InvalidArgumentException e2 = (InvalidArgumentException) e1;
302             logger.warn("Bad configuration in Codec in {}", e2.getMessage());
303         } else if (e1 instanceof NullPointerException) {
304             NullPointerException e2 = (NullPointerException) e1;
305             logger.warn("Null pointer Exception", e2);
306             try {
307                 if (dataBusinessHandler != null) {
308                     dataBusinessHandler.exceptionLocalCaught(e1);
309                     if (session.getDataConn() != null) {
310                         if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
311                             session.getDataConn().getFtpTransferControl()
312                                     .setTransferAbortedFromInternal(true);
313                         }
314                     }
315                 }
316             } catch (NullPointerException e3) {
317             }
318             return;
319         } else if (e1 instanceof CancelledKeyException) {
320             CancelledKeyException e2 = (CancelledKeyException) e1;
321             logger.warn("Connection aborted since {}", e2.getMessage());
322             // XXX TODO FIXME is it really what we should do ?
323             // No action
324             return;
325         } else if (e1 instanceof IOException) {
326             IOException e2 = (IOException) e1;
327             logger.warn("Connection aborted since {}", e2.getMessage());
328         } else if (e1 instanceof NotYetConnectedException) {
329             NotYetConnectedException e2 = (NotYetConnectedException) e1;
330             logger.debug("Ignore this exception {}", e2.getMessage());
331             return;
332         } else if (e1 instanceof BindException) {
333             BindException e2 = (BindException) e1;
334             logger.warn("Address already in use {}", e2.getMessage());
335         } else if (e1 instanceof ConnectException) {
336             ConnectException e2 = (ConnectException) e1;
337             logger.warn("Timeout occurs {}", e2.getMessage());
338         } else {
339             logger.warn("Unexpected exception from Outband: {}", e1.getMessage(), e1);
340         }
341         if (dataBusinessHandler != null) {
342             dataBusinessHandler.exceptionLocalCaught(e1);
343         }
344         if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
345             session.getDataConn().getFtpTransferControl()
346                     .setTransferAbortedFromInternal(true);
347         }
348     }
349 
350     public void setFtpTransfer(FtpTransfer ftpTransfer) {
351         this.ftpTransfer = ftpTransfer;
352     }
353     /**
354      * Act as needed according to the receive DataBlock message
355      * 
356      */
357     @Override
358     public void channelRead0(ChannelHandlerContext ctx, DataBlock dataBlock) {
359         if (ftpTransfer == null) {
360             try {
361                 ftpTransfer = session.getDataConn().getFtpTransferControl().getExecutingFtpTransfer();
362             } catch (FtpNoTransferException e) {
363                 logger.debug(e);
364                 session.getDataConn().getFtpTransferControl()
365                         .setTransferAbortedFromInternal(true);
366             }
367             if (ftpTransfer == null) {
368                 logger.debug("No ExecutionFtpTransfer found");
369                 session.getDataConn().getFtpTransferControl()
370                     .setTransferAbortedFromInternal(true);
371                 return;
372             }
373         }
374         try {
375             if (isStillAlive()) {
376                 try {
377                     ftpTransfer.getFtpFile().writeDataBlock(dataBlock);
378                 } catch (FtpNoFileException e1) {
379                     logger.debug(e1);
380                     session.getDataConn().getFtpTransferControl()
381                             .setTransferAbortedFromInternal(true);
382                     return;
383                 } catch (FileTransferException e1) {
384                     logger.debug(e1);
385                     session.getDataConn().getFtpTransferControl()
386                             .setTransferAbortedFromInternal(true);
387                 }
388             } else {
389                 // Shutdown
390                 session.getDataConn().getFtpTransferControl()
391                         .setTransferAbortedFromInternal(true);
392                 WaarpSslUtility.closingSslChannel(ctx.channel());
393             }
394         } finally {
395             dataBlock.getBlock().release();
396         }
397     }
398 
399     /**
400      * Write a simple message (like LIST) and wait for it
401      * 
402      * @param message
403      * @return True if the message is correctly written
404      */
405     public boolean writeMessage(String message) {
406         DataBlock dataBlock = new DataBlock();
407         dataBlock.setEOF(true);
408         ByteBuf buffer = Unpooled.wrappedBuffer(message.getBytes(WaarpStringUtils.UTF8));
409         dataBlock.setBlock(buffer);
410         ChannelFuture future;
411         logger.debug("Will write: " + buffer.toString(WaarpStringUtils.UTF8));
412         try {
413             future = dataChannel.writeAndFlush(dataBlock);
414             future.await(FtpConfiguration.getDATATIMEOUTCON());
415         } catch (InterruptedException e) {
416             logger.debug("Interrupted", e);
417             return false;
418         }
419         logger.debug("Write result: " + future.isSuccess(), future.cause());
420         return future.isSuccess();
421     }
422 
423     /**
424      * If the service is going to shutdown, it sends back a 421 message to the connection
425      * 
426      * @return True if the service is alive, else False if the system is going down
427      */
428     private boolean isStillAlive() {
429         if (session.getConfiguration().isShutdown()) {
430             session.setExitErrorCode("Service is going down: disconnect");
431             return false;
432         }
433         return true;
434     }
435 }