View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.ftp.core.data;
21  
22  import io.netty.bootstrap.Bootstrap;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import org.waarp.common.command.ReplyCode;
27  import org.waarp.common.command.exception.CommandAbstractException;
28  import org.waarp.common.command.exception.Reply425Exception;
29  import org.waarp.common.crypto.ssl.WaarpSslUtility;
30  import org.waarp.common.future.WaarpChannelFuture;
31  import org.waarp.common.future.WaarpFuture;
32  import org.waarp.common.logging.SysErrLogger;
33  import org.waarp.common.logging.WaarpLogger;
34  import org.waarp.common.logging.WaarpLoggerFactory;
35  import org.waarp.common.utility.WaarpNettyUtil;
36  import org.waarp.ftp.core.command.FtpCommandCode;
37  import org.waarp.ftp.core.command.service.ABOR;
38  import org.waarp.ftp.core.config.FtpConfiguration;
39  import org.waarp.ftp.core.config.FtpInternalConfiguration;
40  import org.waarp.ftp.core.control.NetworkHandler;
41  import org.waarp.ftp.core.data.handler.DataNetworkHandler;
42  import org.waarp.ftp.core.exception.FtpNoConnectionException;
43  import org.waarp.ftp.core.exception.FtpNoFileException;
44  import org.waarp.ftp.core.exception.FtpNoTransferException;
45  import org.waarp.ftp.core.file.FtpFile;
46  import org.waarp.ftp.core.session.FtpSession;
47  
48  import java.net.InetSocketAddress;
49  import java.util.List;
50  import java.util.concurrent.ExecutorService;
51  import java.util.concurrent.Executors;
52  import java.util.concurrent.atomic.AtomicBoolean;
53  
54  /**
55   * Main class that handles transfers and their execution
56   */
57  public class FtpTransferControl {
58    /**
59     * Internal Logger
60     */
61    private static final WaarpLogger logger =
62        WaarpLoggerFactory.getLogger(FtpTransferControl.class);
63  
64    /**
65     * SessionInterface
66     */
67    private final FtpSession session;
68  
69    /**
70     * Step in order to wait that the DataNetworkHandler is ready
71     */
72    private final AtomicBoolean isDataNetworkHandlerReady =
73        new AtomicBoolean(false);
74  
75    /**
76     * The associated DataChannel
77     */
78    private Channel dataChannel;
79  
80    /**
81     * Waiter for the dataChannel to be opened
82     */
83    private WaarpChannelFuture waitForOpenedDataChannel =
84        new WaarpChannelFuture(true);
85  
86    /**
87     * Is the current Command Finished (or previously current command)
88     */
89    private final AtomicBoolean isExecutingCommandFinished =
90        new AtomicBoolean(true);
91    /**
92     * Waiter for the Command to be setup
93     */
94    private WaarpFuture commandSetup;
95  
96    /**
97     * Waiter for the Command finishing
98     */
99    private WaarpFuture commandFinishing;
100 
101   /**
102    * Current command executed
103    */
104   private FtpTransfer executingCommand;
105 
106   /**
107    * Thread pool for execution of transfer command
108    */
109   private ExecutorService executorService;
110 
111   /**
112    * Blocking step for the Executor in order to wait for the end of the
113    * command
114    * (internal wait, not to be used
115    * outside).
116    */
117   private WaarpFuture endOfCommand;
118 
119   /**
120    * A boolean to know if Check was called once
121    */
122   private final AtomicBoolean isCheckAlreadyCalled = new AtomicBoolean(false);
123 
124   /**
125    * @param session
126    */
127   public FtpTransferControl(final FtpSession session) {
128     this.session = session;
129     endOfCommand = null;
130   }
131 
132   // XXX DataNetworkHandler functions
133 
134   /**
135    * The DataNetworkHandler is ready (from setNewFtpExecuteTransfer)
136    */
137   private void setDataNetworkHandlerReady() {
138     isCheckAlreadyCalled.set(false);
139     isDataNetworkHandlerReady.set(true);
140   }
141 
142   /**
143    * Wait for the DataNetworkHandler to be ready (from trueRetrieve of {@link
144    * FtpFile})
145    *
146    * @throws InterruptedException
147    */
148   public final void waitForDataNetworkHandlerReady()
149       throws InterruptedException {
150     final AtomicBoolean isDNHReady = isDataNetworkHandlerReady;
151     if (!isDNHReady.get()) {
152       for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 10; i++) {
153         Thread.sleep(FtpInternalConfiguration.RETRYINMS * 10);
154         if (isDNHReady.get()) {
155           return;
156         }
157       }
158       if (!isDNHReady.get()) {
159         throw new InterruptedException("Bad initialization");
160       }
161     }
162   }
163 
164   /**
165    * Set the new opened Channel (from channelConnected of {@link
166    * DataNetworkHandler})
167    *
168    * @param channel
169    * @param dataNetworkHandler
170    */
171   public final void setOpenedDataChannel(final Channel channel,
172                                          final DataNetworkHandler dataNetworkHandler) {
173     logger.debug("SetOpenedDataChannel: {}",
174                  (channel != null? channel.remoteAddress() : "no channel"));
175     final WaarpChannelFuture wait = waitForOpenedDataChannel;
176     if (channel != null) {
177       session.getDataConn().setDataNetworkHandler(dataNetworkHandler);
178       wait.setChannel(channel);
179       wait.setSuccess();
180     } else {
181       wait.cancel();
182     }
183   }
184 
185   /**
186    * Wait that the new opened connection is ready (same method in {@link
187    * FtpDataAsyncConn} from openConnection)
188    *
189    * @return the new opened Channel
190    *
191    * @throws InterruptedException
192    */
193   public final Channel waitForOpenedDataChannel() throws InterruptedException {
194     Channel channel = null;
195     final WaarpChannelFuture wait = waitForOpenedDataChannel;
196     if (wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon())) {
197       if (wait.isSuccess()) {
198         channel = wait.channel();
199       } else {
200         logger.warn("data connection is in error");
201         throw new InterruptedException("Cannot get data connection");
202       }
203       return channel;
204     } else {
205       wait.cancel();
206       final boolean isPassive = session.getDataConn().isPassiveMode();
207       final InetSocketAddress address =
208           isPassive? session.getDataConn().getLocalAddress() :
209               session.getDataConn().getRemoteAddress();
210       logger.error(
211           "Timeout occurs during data {} connection to {} after timeout {}",
212           isPassive? "Passive" : "Active", address,
213           FtpConfiguration.getDataTimeoutCon());
214       throw new InterruptedException(
215           "Cannot get data connection to " + address);
216     }
217   }
218 
219   /**
220    * Allow to reset the waitForOpenedDataChannel
221    */
222   public final synchronized void resetWaitForOpenedDataChannel() {
223     final WaarpChannelFuture wait = waitForOpenedDataChannel;
224     if (wait != null) {
225       wait.setSuccess();
226     }
227     waitForOpenedDataChannel = new WaarpChannelFuture(true);
228   }
229 
230   private void removeSession() {
231     if (session.getDataConn().isPassiveMode()) {
232       session.getConfiguration().delFtpSession(
233           session.getDataConn().getRemoteAddress().getAddress(),
234           session.getDataConn().getLocalAddress());
235     }
236     if (waitForOpenedDataChannel != null) {
237       resetWaitForOpenedDataChannel();
238     }
239   }
240 
241   /**
242    * Wait for the client to be connected (Passive) or Wait for the server to
243    * be
244    * connected to the client (Active)
245    *
246    * @throws Reply425Exception
247    */
248   public final synchronized void openDataConnection() throws Reply425Exception {
249     // Prepare this Data channel to be closed ;-)
250     // In fact, prepare the future close op which should occur since it is
251     // now opened
252     final WaarpFuture commandSetupFuture = commandSetup;
253     if (commandSetupFuture != null && !commandSetupFuture.isDone()) {
254       commandSetupFuture.cancel();
255     }
256     commandSetup = new WaarpFuture(true);
257     final FtpDataAsyncConn dataAsyncConn = session.getDataConn();
258     if (!dataAsyncConn.isStreamFile()) {
259       // FIXME isActive or isDNHReady ?
260       if (dataAsyncConn.isActive()) {
261         // Already connected
262         logger.debug("Connection already open");
263         session.setReplyCode(ReplyCode.REPLY_125_DATA_CONNECTION_ALREADY_OPEN,
264                              dataAsyncConn.getType().name() +
265                              " mode data connection already open");
266         return;
267       }
268     } else {
269       // Stream, Data Connection should not be opened
270       if (dataAsyncConn.isActive()) {
271         logger.error(
272             "Connection already open but should not since in Stream mode");
273         setTransferAbortedFromInternal(false);
274         throw new Reply425Exception(
275             "Connection already open but should not since in Stream mode");
276       }
277     }
278     // Need to open connection
279     session.setReplyCode(ReplyCode.REPLY_150_FILE_STATUS_OKAY,
280                          "Opening " + dataAsyncConn.getType().name() +
281                          " mode data connection");
282     if (dataAsyncConn.isPassiveMode()) {
283       if (!dataAsyncConn.isBind()) {
284         // No passive connection prepared
285         removeSession();
286         throw new Reply425Exception("No passive data connection prepared");
287       }
288       // Wait for the connection to be done by the client
289       logger.debug("Passive mode standby");
290       try {
291         dataChannel = waitForOpenedDataChannel();
292         dataAsyncConn.setNewOpenedDataChannel(dataChannel);
293       } catch (final InterruptedException e) {//NOSONAR
294         logger.warn("Connection abort in passive mode: {}", e.getMessage());
295         removeSession();
296         // Cannot open connection
297         throw new Reply425Exception("Cannot open passive data connection");
298       }
299       logger.debug("Passive mode connected");
300     } else {
301       // Wait for the server to be connected to the client
302       final InetSocketAddress inetSocketAddress =
303           dataAsyncConn.getRemoteAddress();
304       final InetSocketAddress localAddress = dataAsyncConn.getLocalAddress();
305       logger.debug("Active mode standby");
306       final Bootstrap bootstrap =
307           session.getConfiguration().getFtpInternalConfiguration()
308                  .getActiveBootstrap(session.isDataSsl());
309       final String mylog = session.toString();
310       logger.debug("DataConn for: {} to {}",
311                    session.getCurrentCommand().getCommand(), inetSocketAddress);
312       ChannelFuture future = null;
313       for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
314         future =
315             activeInitConnection(bootstrap, inetSocketAddress, localAddress);
316         if (future.isSuccess()) {
317           break;
318         }
319         try {
320           Thread.sleep(FtpInternalConfiguration.RETRYINMS * 10); //NOSONAR
321         } catch (final InterruptedException e) { // NOSONAR
322           // ignore
323         }
324       }
325       if (future == null || !future.isSuccess()) {
326         logger.warn(
327             "Connection abort in active mode from future while session: " +
328             "{}\nTrying connect to: {} From: {}\nWas: {}", session,
329             inetSocketAddress, localAddress, mylog,
330             future != null? future.cause() : null);
331         // Cannot open connection
332         throw new Reply425Exception("Cannot open active data connection");
333       }
334       try {
335         dataChannel = waitForOpenedDataChannel();
336         dataAsyncConn.setNewOpenedDataChannel(dataChannel);
337       } catch (final InterruptedException e) {//NOSONAR
338         WaarpSslUtility.closingSslChannel(future.channel())
339                        .awaitUninterruptibly();
340         logger.warn("Connection abort in active mode ({})", e.getMessage());
341         // Cannot open connection
342         throw new Reply425Exception("Cannot open active data connection");
343       }
344       logger.debug("Active mode connected");
345     }
346     if (dataChannel == null) {
347       // Cannot have a new Data connection since shutdown
348       throw new Reply425Exception("Cannot open data connection, shutting down");
349     }
350   }
351 
352   private ChannelFuture activeInitConnection(final Bootstrap bootstrap,
353                                              final InetSocketAddress inetSocketAddress,
354                                              final InetSocketAddress localAddress) {
355     final ChannelFuture future =
356         bootstrap.connect(inetSocketAddress, localAddress);
357     // Set the session for the future dataChannel
358     future.addListener(new ChannelFutureListener() {
359       @Override
360       public void operationComplete(final ChannelFuture channelFuture) {
361         if (future.isSuccess()) {
362           future.channel().attr(FtpConfiguration.FTP_SESSION_ATTRIBUTE_KEY)
363                 .set(session);
364         }
365       }
366     });
367     try {
368       future.await(session.getConfiguration().getTimeoutCon());
369     } catch (final InterruptedException e1) {//NOSONAR
370       SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
371     }
372     return future;
373   }
374 
375   // XXX FtpTransfer functions
376 
377   /**
378    * Run the command from an executor
379    */
380   private void runExecutor() {
381     final WaarpChannelFuture wait = waitForOpenedDataChannel;
382     endOfCommand = new WaarpFuture(true);
383     final WaarpFuture commandFinishingFuture = commandFinishing;
384     final WaarpFuture endCommandFuture = endOfCommand;
385     try {
386       session.getDataConn().getDataNetworkHandler()
387              .setFtpTransfer(executingCommand);
388     } catch (final FtpNoConnectionException ignored) {
389       // nothing
390     }
391     for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
392       if (wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon())) {
393         break;
394       }
395     }
396     if (!wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon()) &&
397         !wait.isDone()) {
398       wait.cancel();
399       if (!commandFinishingFuture.isDone()) {
400         logger.error("Execution time out {}",
401                      FtpConfiguration.getDataTimeoutCon());
402         commandFinishingFuture.cancel();
403       }
404       if (!endCommandFuture.isDone()) {
405         logger.error("Command cancelled");
406         endCommandFuture.cancel();
407       }
408       return;
409     }
410     if (wait != null && (!wait.isSuccess() || wait.channel() == null)) {
411       if (!commandFinishingFuture.isDone()) {
412         logger.error("Connection aborted");
413         commandFinishingFuture.cancel();
414       }
415       if (!endCommandFuture.isDone()) {
416         logger.error("Command cancelled");
417         endCommandFuture.cancel();
418       }
419       return;
420     }
421     wait.channel().config().setAutoRead(true);
422     // Run the command
423     if (executorService == null) {
424       executorService = Executors.newSingleThreadExecutor();
425     }
426     executorService.execute(new FtpTransferExecutor(session, executingCommand));
427     commandFinishingFuture.awaitOrInterruptible();
428     if (commandFinishingFuture.isFailed()) {
429       endCommandFuture.cancel();
430     }
431   }
432 
433   /**
434    * Add a new transfer to be executed. This is to be called from Command
435    * after
436    * connection is opened and before
437    * answering to the client that command is ready to be executed (for Store
438    * or
439    * Retrieve like operations).
440    *
441    * @param command
442    * @param file
443    */
444   public final void setNewFtpTransfer(final FtpCommandCode command,
445                                       final FtpFile file) {
446     final WaarpChannelFuture wait = waitForOpenedDataChannel;
447     isExecutingCommandFinished.set(false);
448     commandFinishing = new WaarpFuture(true);
449     logger.debug("setNewCommand: {}", command);
450     setDataNetworkHandlerReady();
451     executingCommand = new FtpTransfer(command, file);
452     runExecutor();
453     commandFinishing = null;
454     commandSetup.setSuccess();
455     if (!session.getDataConn().isStreamFile()) {
456       wait.channel().config().setAutoRead(false);
457     }
458   }
459 
460   /**
461    * Add a new transfer to be executed. This is to be called from Command
462    * after
463    * connection is opened and before
464    * answering to the client that command is ready to be executed (for List
465    * like
466    * operations).
467    *
468    * @param command
469    * @param list
470    * @param path as Original Path
471    */
472   public final void setNewFtpTransfer(final FtpCommandCode command,
473                                       final List<String> list,
474                                       final String path) {
475     final WaarpChannelFuture wait = waitForOpenedDataChannel;
476     isExecutingCommandFinished.set(false);
477     commandFinishing = new WaarpFuture(true);
478     logger.debug("setNewCommand: {}", command);
479     setDataNetworkHandlerReady();
480     executingCommand = new FtpTransfer(command, list, path);
481     runExecutor();
482     commandFinishing = null;
483     commandSetup.setSuccess();
484     if (!session.getDataConn().isStreamFile()) {
485       wait.channel().config().setAutoRead(false);
486     }
487     try {
488       session.getDataConn().getDataNetworkHandler().setFtpTransfer(null);
489     } catch (final FtpNoConnectionException ignored) {
490       // nothing
491     }
492   }
493 
494   /**
495    * @return True if transfer is not yet finished
496    */
497   public final boolean waitFtpTransferExecuting() {
498     boolean notFinished = true;
499     final WaarpFuture commandFinishingFuture = commandFinishing;
500     final AtomicBoolean commandFinished = isExecutingCommandFinished;
501     for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 100; i++) {
502       if (commandFinished.get() || commandFinishingFuture == null ||
503           session.isCurrentCommandFinished() ||
504           commandFinishingFuture != null &&
505           commandFinishingFuture.awaitOrInterruptible(
506               FtpInternalConfiguration.RETRYINMS)) {
507         notFinished = false;
508         break;
509       }
510     }
511     return notFinished;
512   }
513 
514   /**
515    * Is a command currently executing (called from {@link NetworkHandler} when
516    * a
517    * message is received to see if
518    * another transfer command is already in execution, which is not allowed)
519    *
520    * @return True if a command is currently executing
521    */
522   public final boolean isFtpTransferExecuting() {
523     return !isExecutingCommandFinished.get();
524   }
525 
526   /**
527    * @return the current executing FtpTransfer
528    *
529    * @throws FtpNoTransferException
530    */
531   public final FtpTransfer getExecutingFtpTransfer()
532       throws FtpNoTransferException {
533     for (int i = 0; i < 100; i++) {
534       if (executingCommand != null) {
535         return executingCommand;
536       }
537       try {
538         Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
539       } catch (final InterruptedException e1) {//NOSONAR
540         throw new FtpNoTransferException("No Command currently running", e1);
541       }
542     }
543 
544     throw new FtpNoTransferException("No Command currently running");
545   }
546 
547   /**
548    * @return True if the current FtpTransfer is a Retrieve like transfer
549    *
550    * @throws FtpNoTransferException
551    * @throws CommandAbstractException
552    * @throws FtpNoFileException
553    */
554   boolean isExecutingRetrLikeTransfer()
555       throws FtpNoTransferException, CommandAbstractException,
556              FtpNoFileException {
557     return !session.isCurrentCommandFinished() &&
558            FtpCommandCode.isRetrLikeCommand(
559                getExecutingFtpTransfer().getCommand()) &&
560            getExecutingFtpTransfer().getFtpFile().isInReading();
561   }
562 
563   /**
564    * Called when a transfer is finished from setEndOfTransfer
565    *
566    * @return True if it was already called before
567    *
568    * @throws FtpNoTransferException
569    */
570   private boolean checkFtpTransferStatus() throws FtpNoTransferException {
571     final WaarpFuture commandFinishingFuture = commandFinishing;
572     final AtomicBoolean commandFinished = isExecutingCommandFinished;
573     final AtomicBoolean isDNHReady = isDataNetworkHandlerReady;
574     if (isCheckAlreadyCalled.get()) {
575       logger.warn("Check: ALREADY CALLED");
576       return true;
577     }
578     for (int i = 0; i < 10; i++) {
579       if (!commandFinished.get()) {
580         break;
581       }
582       try {
583         Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
584       } catch (InterruptedException e) {//NOSONAR
585         throw new FtpNoTransferException("No transfer running and Interrupted",
586                                          e);
587       }
588     }
589     if (commandFinished.get()) {
590       // already done
591       logger.warn("Check: already Finished");
592       if (commandFinishingFuture != null && !commandFinishingFuture.isDone()) {
593         commandFinishingFuture.cancel();
594       }
595       throw new FtpNoTransferException("No transfer running");
596     }
597     for (int i = 0; i < 10; i++) {
598       if (isDNHReady.get()) {
599         break;
600       }
601       try {
602         Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
603       } catch (InterruptedException e) {//NOSONAR
604         throw new FtpNoTransferException("No connection and Interrupted", e);
605       }
606     }
607     if (!isDNHReady.get()) {
608       // already done
609       logger.warn("Check: already DNH not ready");
610       throw new FtpNoTransferException("No connection");
611     }
612     isCheckAlreadyCalled.set(true);
613     final FtpTransfer executedTransfer = getExecutingFtpTransfer();
614     logger.debug("Check: command {}", executedTransfer.getCommand());
615     // DNH is ready and Transfer is running
616     if (FtpCommandCode.isListLikeCommand(executedTransfer.getCommand())) {
617       if (executedTransfer.getStatus()) {
618         // Special status for List Like command
619         logger.debug("Check: List OK");
620         closeTransfer();
621         return false;
622       }
623       logger.info("Check: List Ko");
624       abortTransfer();
625       return false;
626     } else if (FtpCommandCode.isRetrLikeCommand(
627         executedTransfer.getCommand())) {
628       final FtpFile file;
629       try {
630         file = executedTransfer.getFtpFile();
631       } catch (final FtpNoFileException e) {
632         logger.info("Check: Retr no FtpFile for Retr");
633         abortTransfer();
634         return false;
635       }
636       try {
637         if (file.isInReading()) {
638           logger.info("Check: Retr FtpFile still in reading KO");
639           abortTransfer();
640         } else {
641           logger.debug("Check: Retr FtpFile no more in reading OK");
642           closeTransfer();
643         }
644       } catch (final CommandAbstractException e) {
645         logger.warn("Retr Test is in Reading problem : {}", e.getMessage());
646         closeTransfer();
647       }
648       return false;
649     } else if (FtpCommandCode.isStoreLikeCommand(
650         executedTransfer.getCommand())) {
651       closeTransfer();
652       return false;
653     } else {
654       logger.warn("Check: Unknown command");
655       abortTransfer();
656     }
657     return false;
658   }
659 
660   /**
661    * Abort the current transfer
662    */
663   private void abortTransfer() {
664     if (logger.isDebugEnabled()) {
665       logger.debug("Will abort transfer and write: ",
666                    new Exception("trace only"));
667     }
668     final FtpFile file;
669     FtpTransfer current = null;
670     try {
671       current = getExecutingFtpTransfer();
672       file = current.getFtpFile();
673       file.abortFile();
674     } catch (final FtpNoTransferException e) {
675       logger.info("Warning Abort problem in {} : {}", session, e.getMessage());
676     } catch (final FtpNoFileException ignored) {
677       // nothing
678     } catch (final CommandAbstractException e) {
679       logger.warn("Abort problem in {} : {}", session, e.getMessage());
680     }
681     if (current != null) {
682       current.setStatus(false);
683     }
684     endDataConnection();
685     session.setReplyCode(ReplyCode.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
686                          "Transfer aborted for " +
687                          (current == null? "Unknown command" :
688                              current.toString()));
689     if (current != null &&
690         !FtpCommandCode.isListLikeCommand(current.getCommand())) {
691       try {
692         session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
693       } catch (final CommandAbstractException e) {
694         session.setReplyCode(e);
695       }
696     }
697     finalizeExecution();
698   }
699 
700   /**
701    * Finish correctly a transfer
702    */
703   private void closeTransfer() {
704     logger.debug("Will close transfer");
705     final FtpFile file;
706     FtpTransfer current = null;
707     try {
708       current = getExecutingFtpTransfer();
709       file = current.getFtpFile();
710       file.closeFile();
711     } catch (final FtpNoTransferException e) {
712       logger.warn("Close problem" + " : {}", e.getMessage());
713     } catch (final FtpNoFileException ignored) {
714       // nothing
715     } catch (final CommandAbstractException e) {
716       logger.warn("Close problem" + " : {}", e.getMessage());
717     }
718     if (current != null) {
719       current.setStatus(true);
720     }
721     if (session.getDataConn().isStreamFile()) {
722       endDataConnection();
723     }
724     session.setReplyCode(ReplyCode.REPLY_226_CLOSING_DATA_CONNECTION,
725                          "Transfer complete for " +
726                          (current == null? "Unknown command" :
727                              current.toString()));
728     if (current != null) {
729       if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
730         try {
731           session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
732         } catch (final CommandAbstractException e) {
733           session.setReplyCode(e);
734         }
735       } else {
736         // Special wait to prevent fast LIST following by STOR or RETR command
737         try {
738           Thread.sleep(FtpInternalConfiguration.RETRYINMS);
739         } catch (final InterruptedException e) {//NOSONAR
740           SysErrLogger.FAKE_LOGGER.ignoreLog(e);
741         }
742       }
743     }
744     finalizeExecution();
745   }
746 
747   /**
748    * Set the current transfer as finished. Called from {@link
749    * FtpTransferExecutor} when a transfer is over.
750    */
751   public final void setEndOfTransfer() {
752     try {
753       checkFtpTransferStatus();
754     } catch (final FtpNoTransferException ignored) {
755       // nothing
756     }
757   }
758 
759   /**
760    * To enable abort from internal error
761    *
762    * @param write True means the message is write back to the control
763    *     command, false it is only prepared
764    */
765   public final void setTransferAbortedFromInternal(final boolean write) {
766     logger.debug("Set transfer aborted internal {}", write);
767     abortTransfer();
768     if (write && session.getNetworkHandler() != null) {
769       session.getNetworkHandler().writeIntermediateAnswer();
770     }
771     if (endOfCommand != null && !endOfCommand.isDone()) {
772       logger.warn("Command cancelled");
773       if (logger.isDebugEnabled()) {
774         logger.debug(new Exception("Trace only"));
775       }
776       endOfCommand.cancel();
777     }
778   }
779 
780   /**
781    * Called by channelClosed (from {@link DataNetworkHandler} ) or
782    * trueRetrieve
783    * (from {@link FtpFile}) when the
784    * transfer is over
785    */
786   public final void setPreEndOfTransfer() {
787     if (endOfCommand != null) {
788       endOfCommand.setSuccess();
789       logger.debug("Transfer completed");
790     }
791   }
792 
793   /**
794    * Wait for the current transfer to finish, called from {@link
795    * FtpTransferExecutor}
796    *
797    * @throws InterruptedException
798    */
799   public final void waitForEndOfTransfer() throws InterruptedException {
800     if (endOfCommand != null) {
801       endOfCommand.awaitOrInterruptible();
802       if (endOfCommand.isFailed()) {
803         logger.error("Transfer aborted");
804         if (logger.isDebugEnabled()) {
805           logger.debug(new Exception("Trace only"));
806         }
807         throw new InterruptedException("Transfer aborted");
808       }
809     }
810   }
811 
812   /**
813    * Finalize execution
814    */
815   private void finalizeExecution() {
816     if (commandFinishing != null) {
817       commandFinishing.setSuccess();
818     }
819     isExecutingCommandFinished.set(true);
820     executingCommand = null;
821     resetWaitForOpenedDataChannel();
822   }
823 
824   /**
825    * End the data connection if any
826    */
827   private synchronized void endDataConnection() {
828     logger.debug("End Data connection");
829     if (isDataNetworkHandlerReady.get() && dataChannel != null) {
830       WaarpNettyUtil.awaitOrInterrupted(
831           WaarpSslUtility.closingSslChannel(dataChannel));
832       isDataNetworkHandlerReady.set(false);
833       dataChannel = null;
834     } else if (dataChannel != null) {
835       WaarpSslUtility.closingSslChannel(dataChannel);
836       isDataNetworkHandlerReady.set(false);
837       dataChannel = null;
838     }
839     isDataNetworkHandlerReady.set(false);
840   }
841 
842   /**
843    * Clear the FtpTransferControl (called when the data connection must be
844    * over like from clear of
845    * {@link FtpDataAsyncConn}, abort from {@link ABOR} or ending control
846    * connection from {@link NetworkHandler}.
847    */
848   public final void clear() {
849     final WaarpFuture commandSetupFuture = commandSetup;
850     endDataConnection();
851     finalizeExecution();
852     if (endOfCommand != null && !endOfCommand.isDone()) {
853       logger.error("Command cancelled");
854       endOfCommand.cancel();
855     }
856     final WaarpChannelFuture wait = waitForOpenedDataChannel;
857     if (wait != null && !wait.isDone()) {
858       wait.cancel();
859     }
860     if (commandSetupFuture != null && !commandSetupFuture.isDone()) {
861       commandSetupFuture.cancel();
862     }
863     if (executorService != null) {
864       executorService.shutdownNow();
865       executorService = null;
866     }
867   }
868 }