1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.ftp.core.data;
21
22 import io.netty.bootstrap.Bootstrap;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import org.waarp.common.command.ReplyCode;
27 import org.waarp.common.command.exception.CommandAbstractException;
28 import org.waarp.common.command.exception.Reply425Exception;
29 import org.waarp.common.crypto.ssl.WaarpSslUtility;
30 import org.waarp.common.future.WaarpChannelFuture;
31 import org.waarp.common.future.WaarpFuture;
32 import org.waarp.common.logging.SysErrLogger;
33 import org.waarp.common.logging.WaarpLogger;
34 import org.waarp.common.logging.WaarpLoggerFactory;
35 import org.waarp.common.utility.WaarpNettyUtil;
36 import org.waarp.ftp.core.command.FtpCommandCode;
37 import org.waarp.ftp.core.command.service.ABOR;
38 import org.waarp.ftp.core.config.FtpConfiguration;
39 import org.waarp.ftp.core.config.FtpInternalConfiguration;
40 import org.waarp.ftp.core.control.NetworkHandler;
41 import org.waarp.ftp.core.data.handler.DataNetworkHandler;
42 import org.waarp.ftp.core.exception.FtpNoConnectionException;
43 import org.waarp.ftp.core.exception.FtpNoFileException;
44 import org.waarp.ftp.core.exception.FtpNoTransferException;
45 import org.waarp.ftp.core.file.FtpFile;
46 import org.waarp.ftp.core.session.FtpSession;
47
48 import java.net.InetSocketAddress;
49 import java.util.List;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 public class FtpTransferControl {
58
59
60
61 private static final WaarpLogger logger =
62 WaarpLoggerFactory.getLogger(FtpTransferControl.class);
63
64
65
66
67 private final FtpSession session;
68
69
70
71
72 private final AtomicBoolean isDataNetworkHandlerReady =
73 new AtomicBoolean(false);
74
75
76
77
78 private Channel dataChannel;
79
80
81
82
83 private WaarpChannelFuture waitForOpenedDataChannel =
84 new WaarpChannelFuture(true);
85
86
87
88
89 private final AtomicBoolean isExecutingCommandFinished =
90 new AtomicBoolean(true);
91
92
93
94 private WaarpFuture commandSetup;
95
96
97
98
99 private WaarpFuture commandFinishing;
100
101
102
103
104 private FtpTransfer executingCommand;
105
106
107
108
109 private ExecutorService executorService;
110
111
112
113
114
115
116
117 private WaarpFuture endOfCommand;
118
119
120
121
122 private final AtomicBoolean isCheckAlreadyCalled = new AtomicBoolean(false);
123
124
125
126
127 public FtpTransferControl(final FtpSession session) {
128 this.session = session;
129 endOfCommand = null;
130 }
131
132
133
134
135
136
137 private void setDataNetworkHandlerReady() {
138 isCheckAlreadyCalled.set(false);
139 isDataNetworkHandlerReady.set(true);
140 }
141
142
143
144
145
146
147
148 public final void waitForDataNetworkHandlerReady()
149 throws InterruptedException {
150 final AtomicBoolean isDNHReady = isDataNetworkHandlerReady;
151 if (!isDNHReady.get()) {
152 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 10; i++) {
153 Thread.sleep(FtpInternalConfiguration.RETRYINMS * 10);
154 if (isDNHReady.get()) {
155 return;
156 }
157 }
158 if (!isDNHReady.get()) {
159 throw new InterruptedException("Bad initialization");
160 }
161 }
162 }
163
164
165
166
167
168
169
170
171 public final void setOpenedDataChannel(final Channel channel,
172 final DataNetworkHandler dataNetworkHandler) {
173 logger.debug("SetOpenedDataChannel: {}",
174 (channel != null? channel.remoteAddress() : "no channel"));
175 final WaarpChannelFuture wait = waitForOpenedDataChannel;
176 if (channel != null) {
177 session.getDataConn().setDataNetworkHandler(dataNetworkHandler);
178 wait.setChannel(channel);
179 wait.setSuccess();
180 } else {
181 wait.cancel();
182 }
183 }
184
185
186
187
188
189
190
191
192
193 public final Channel waitForOpenedDataChannel() throws InterruptedException {
194 Channel channel = null;
195 final WaarpChannelFuture wait = waitForOpenedDataChannel;
196 if (wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon())) {
197 if (wait.isSuccess()) {
198 channel = wait.channel();
199 } else {
200 logger.warn("data connection is in error");
201 throw new InterruptedException("Cannot get data connection");
202 }
203 return channel;
204 } else {
205 wait.cancel();
206 final boolean isPassive = session.getDataConn().isPassiveMode();
207 final InetSocketAddress address =
208 isPassive? session.getDataConn().getLocalAddress() :
209 session.getDataConn().getRemoteAddress();
210 logger.error(
211 "Timeout occurs during data {} connection to {} after timeout {}",
212 isPassive? "Passive" : "Active", address,
213 FtpConfiguration.getDataTimeoutCon());
214 throw new InterruptedException(
215 "Cannot get data connection to " + address);
216 }
217 }
218
219
220
221
222 public final synchronized void resetWaitForOpenedDataChannel() {
223 final WaarpChannelFuture wait = waitForOpenedDataChannel;
224 if (wait != null) {
225 wait.setSuccess();
226 }
227 waitForOpenedDataChannel = new WaarpChannelFuture(true);
228 }
229
230 private void removeSession() {
231 if (session.getDataConn().isPassiveMode()) {
232 session.getConfiguration().delFtpSession(
233 session.getDataConn().getRemoteAddress().getAddress(),
234 session.getDataConn().getLocalAddress());
235 }
236 if (waitForOpenedDataChannel != null) {
237 resetWaitForOpenedDataChannel();
238 }
239 }
240
241
242
243
244
245
246
247
248 public final synchronized void openDataConnection() throws Reply425Exception {
249
250
251
252 final WaarpFuture commandSetupFuture = commandSetup;
253 if (commandSetupFuture != null && !commandSetupFuture.isDone()) {
254 commandSetupFuture.cancel();
255 }
256 commandSetup = new WaarpFuture(true);
257 final FtpDataAsyncConn dataAsyncConn = session.getDataConn();
258 if (!dataAsyncConn.isStreamFile()) {
259
260 if (dataAsyncConn.isActive()) {
261
262 logger.debug("Connection already open");
263 session.setReplyCode(ReplyCode.REPLY_125_DATA_CONNECTION_ALREADY_OPEN,
264 dataAsyncConn.getType().name() +
265 " mode data connection already open");
266 return;
267 }
268 } else {
269
270 if (dataAsyncConn.isActive()) {
271 logger.error(
272 "Connection already open but should not since in Stream mode");
273 setTransferAbortedFromInternal(false);
274 throw new Reply425Exception(
275 "Connection already open but should not since in Stream mode");
276 }
277 }
278
279 session.setReplyCode(ReplyCode.REPLY_150_FILE_STATUS_OKAY,
280 "Opening " + dataAsyncConn.getType().name() +
281 " mode data connection");
282 if (dataAsyncConn.isPassiveMode()) {
283 if (!dataAsyncConn.isBind()) {
284
285 removeSession();
286 throw new Reply425Exception("No passive data connection prepared");
287 }
288
289 logger.debug("Passive mode standby");
290 try {
291 dataChannel = waitForOpenedDataChannel();
292 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
293 } catch (final InterruptedException e) {
294 logger.warn("Connection abort in passive mode: {}", e.getMessage());
295 removeSession();
296
297 throw new Reply425Exception("Cannot open passive data connection");
298 }
299 logger.debug("Passive mode connected");
300 } else {
301
302 final InetSocketAddress inetSocketAddress =
303 dataAsyncConn.getRemoteAddress();
304 final InetSocketAddress localAddress = dataAsyncConn.getLocalAddress();
305 logger.debug("Active mode standby");
306 final Bootstrap bootstrap =
307 session.getConfiguration().getFtpInternalConfiguration()
308 .getActiveBootstrap(session.isDataSsl());
309 final String mylog = session.toString();
310 logger.debug("DataConn for: {} to {}",
311 session.getCurrentCommand().getCommand(), inetSocketAddress);
312 ChannelFuture future = null;
313 for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
314 future =
315 activeInitConnection(bootstrap, inetSocketAddress, localAddress);
316 if (future.isSuccess()) {
317 break;
318 }
319 try {
320 Thread.sleep(FtpInternalConfiguration.RETRYINMS * 10);
321 } catch (final InterruptedException e) {
322
323 }
324 }
325 if (future == null || !future.isSuccess()) {
326 logger.warn(
327 "Connection abort in active mode from future while session: " +
328 "{}\nTrying connect to: {} From: {}\nWas: {}", session,
329 inetSocketAddress, localAddress, mylog,
330 future != null? future.cause() : null);
331
332 throw new Reply425Exception("Cannot open active data connection");
333 }
334 try {
335 dataChannel = waitForOpenedDataChannel();
336 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
337 } catch (final InterruptedException e) {
338 WaarpSslUtility.closingSslChannel(future.channel())
339 .awaitUninterruptibly();
340 logger.warn("Connection abort in active mode ({})", e.getMessage());
341
342 throw new Reply425Exception("Cannot open active data connection");
343 }
344 logger.debug("Active mode connected");
345 }
346 if (dataChannel == null) {
347
348 throw new Reply425Exception("Cannot open data connection, shutting down");
349 }
350 }
351
352 private ChannelFuture activeInitConnection(final Bootstrap bootstrap,
353 final InetSocketAddress inetSocketAddress,
354 final InetSocketAddress localAddress) {
355 final ChannelFuture future =
356 bootstrap.connect(inetSocketAddress, localAddress);
357
358 future.addListener(new ChannelFutureListener() {
359 @Override
360 public void operationComplete(final ChannelFuture channelFuture) {
361 if (future.isSuccess()) {
362 future.channel().attr(FtpConfiguration.FTP_SESSION_ATTRIBUTE_KEY)
363 .set(session);
364 }
365 }
366 });
367 try {
368 future.await(session.getConfiguration().getTimeoutCon());
369 } catch (final InterruptedException e1) {
370 SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
371 }
372 return future;
373 }
374
375
376
377
378
379
380 private void runExecutor() {
381 final WaarpChannelFuture wait = waitForOpenedDataChannel;
382 endOfCommand = new WaarpFuture(true);
383 final WaarpFuture commandFinishingFuture = commandFinishing;
384 final WaarpFuture endCommandFuture = endOfCommand;
385 try {
386 session.getDataConn().getDataNetworkHandler()
387 .setFtpTransfer(executingCommand);
388 } catch (final FtpNoConnectionException ignored) {
389
390 }
391 for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
392 if (wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon())) {
393 break;
394 }
395 }
396 if (!wait.awaitOrInterruptible(FtpConfiguration.getDataTimeoutCon()) &&
397 !wait.isDone()) {
398 wait.cancel();
399 if (!commandFinishingFuture.isDone()) {
400 logger.error("Execution time out {}",
401 FtpConfiguration.getDataTimeoutCon());
402 commandFinishingFuture.cancel();
403 }
404 if (!endCommandFuture.isDone()) {
405 logger.error("Command cancelled");
406 endCommandFuture.cancel();
407 }
408 return;
409 }
410 if (wait != null && (!wait.isSuccess() || wait.channel() == null)) {
411 if (!commandFinishingFuture.isDone()) {
412 logger.error("Connection aborted");
413 commandFinishingFuture.cancel();
414 }
415 if (!endCommandFuture.isDone()) {
416 logger.error("Command cancelled");
417 endCommandFuture.cancel();
418 }
419 return;
420 }
421 wait.channel().config().setAutoRead(true);
422
423 if (executorService == null) {
424 executorService = Executors.newSingleThreadExecutor();
425 }
426 executorService.execute(new FtpTransferExecutor(session, executingCommand));
427 commandFinishingFuture.awaitOrInterruptible();
428 if (commandFinishingFuture.isFailed()) {
429 endCommandFuture.cancel();
430 }
431 }
432
433
434
435
436
437
438
439
440
441
442
443
444 public final void setNewFtpTransfer(final FtpCommandCode command,
445 final FtpFile file) {
446 final WaarpChannelFuture wait = waitForOpenedDataChannel;
447 isExecutingCommandFinished.set(false);
448 commandFinishing = new WaarpFuture(true);
449 logger.debug("setNewCommand: {}", command);
450 setDataNetworkHandlerReady();
451 executingCommand = new FtpTransfer(command, file);
452 runExecutor();
453 commandFinishing = null;
454 commandSetup.setSuccess();
455 if (!session.getDataConn().isStreamFile()) {
456 wait.channel().config().setAutoRead(false);
457 }
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471
472 public final void setNewFtpTransfer(final FtpCommandCode command,
473 final List<String> list,
474 final String path) {
475 final WaarpChannelFuture wait = waitForOpenedDataChannel;
476 isExecutingCommandFinished.set(false);
477 commandFinishing = new WaarpFuture(true);
478 logger.debug("setNewCommand: {}", command);
479 setDataNetworkHandlerReady();
480 executingCommand = new FtpTransfer(command, list, path);
481 runExecutor();
482 commandFinishing = null;
483 commandSetup.setSuccess();
484 if (!session.getDataConn().isStreamFile()) {
485 wait.channel().config().setAutoRead(false);
486 }
487 try {
488 session.getDataConn().getDataNetworkHandler().setFtpTransfer(null);
489 } catch (final FtpNoConnectionException ignored) {
490
491 }
492 }
493
494
495
496
497 public final boolean waitFtpTransferExecuting() {
498 boolean notFinished = true;
499 final WaarpFuture commandFinishingFuture = commandFinishing;
500 final AtomicBoolean commandFinished = isExecutingCommandFinished;
501 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 100; i++) {
502 if (commandFinished.get() || commandFinishingFuture == null ||
503 session.isCurrentCommandFinished() ||
504 commandFinishingFuture != null &&
505 commandFinishingFuture.awaitOrInterruptible(
506 FtpInternalConfiguration.RETRYINMS)) {
507 notFinished = false;
508 break;
509 }
510 }
511 return notFinished;
512 }
513
514
515
516
517
518
519
520
521
522 public final boolean isFtpTransferExecuting() {
523 return !isExecutingCommandFinished.get();
524 }
525
526
527
528
529
530
531 public final FtpTransfer getExecutingFtpTransfer()
532 throws FtpNoTransferException {
533 for (int i = 0; i < 100; i++) {
534 if (executingCommand != null) {
535 return executingCommand;
536 }
537 try {
538 Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
539 } catch (final InterruptedException e1) {
540 throw new FtpNoTransferException("No Command currently running", e1);
541 }
542 }
543
544 throw new FtpNoTransferException("No Command currently running");
545 }
546
547
548
549
550
551
552
553
554 boolean isExecutingRetrLikeTransfer()
555 throws FtpNoTransferException, CommandAbstractException,
556 FtpNoFileException {
557 return !session.isCurrentCommandFinished() &&
558 FtpCommandCode.isRetrLikeCommand(
559 getExecutingFtpTransfer().getCommand()) &&
560 getExecutingFtpTransfer().getFtpFile().isInReading();
561 }
562
563
564
565
566
567
568
569
570 private boolean checkFtpTransferStatus() throws FtpNoTransferException {
571 final WaarpFuture commandFinishingFuture = commandFinishing;
572 final AtomicBoolean commandFinished = isExecutingCommandFinished;
573 final AtomicBoolean isDNHReady = isDataNetworkHandlerReady;
574 if (isCheckAlreadyCalled.get()) {
575 logger.warn("Check: ALREADY CALLED");
576 return true;
577 }
578 for (int i = 0; i < 10; i++) {
579 if (!commandFinished.get()) {
580 break;
581 }
582 try {
583 Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
584 } catch (InterruptedException e) {
585 throw new FtpNoTransferException("No transfer running and Interrupted",
586 e);
587 }
588 }
589 if (commandFinished.get()) {
590
591 logger.warn("Check: already Finished");
592 if (commandFinishingFuture != null && !commandFinishingFuture.isDone()) {
593 commandFinishingFuture.cancel();
594 }
595 throw new FtpNoTransferException("No transfer running");
596 }
597 for (int i = 0; i < 10; i++) {
598 if (isDNHReady.get()) {
599 break;
600 }
601 try {
602 Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
603 } catch (InterruptedException e) {
604 throw new FtpNoTransferException("No connection and Interrupted", e);
605 }
606 }
607 if (!isDNHReady.get()) {
608
609 logger.warn("Check: already DNH not ready");
610 throw new FtpNoTransferException("No connection");
611 }
612 isCheckAlreadyCalled.set(true);
613 final FtpTransfer executedTransfer = getExecutingFtpTransfer();
614 logger.debug("Check: command {}", executedTransfer.getCommand());
615
616 if (FtpCommandCode.isListLikeCommand(executedTransfer.getCommand())) {
617 if (executedTransfer.getStatus()) {
618
619 logger.debug("Check: List OK");
620 closeTransfer();
621 return false;
622 }
623 logger.info("Check: List Ko");
624 abortTransfer();
625 return false;
626 } else if (FtpCommandCode.isRetrLikeCommand(
627 executedTransfer.getCommand())) {
628 final FtpFile file;
629 try {
630 file = executedTransfer.getFtpFile();
631 } catch (final FtpNoFileException e) {
632 logger.info("Check: Retr no FtpFile for Retr");
633 abortTransfer();
634 return false;
635 }
636 try {
637 if (file.isInReading()) {
638 logger.info("Check: Retr FtpFile still in reading KO");
639 abortTransfer();
640 } else {
641 logger.debug("Check: Retr FtpFile no more in reading OK");
642 closeTransfer();
643 }
644 } catch (final CommandAbstractException e) {
645 logger.warn("Retr Test is in Reading problem : {}", e.getMessage());
646 closeTransfer();
647 }
648 return false;
649 } else if (FtpCommandCode.isStoreLikeCommand(
650 executedTransfer.getCommand())) {
651 closeTransfer();
652 return false;
653 } else {
654 logger.warn("Check: Unknown command");
655 abortTransfer();
656 }
657 return false;
658 }
659
660
661
662
663 private void abortTransfer() {
664 if (logger.isDebugEnabled()) {
665 logger.debug("Will abort transfer and write: ",
666 new Exception("trace only"));
667 }
668 final FtpFile file;
669 FtpTransfer current = null;
670 try {
671 current = getExecutingFtpTransfer();
672 file = current.getFtpFile();
673 file.abortFile();
674 } catch (final FtpNoTransferException e) {
675 logger.info("Warning Abort problem in {} : {}", session, e.getMessage());
676 } catch (final FtpNoFileException ignored) {
677
678 } catch (final CommandAbstractException e) {
679 logger.warn("Abort problem in {} : {}", session, e.getMessage());
680 }
681 if (current != null) {
682 current.setStatus(false);
683 }
684 endDataConnection();
685 session.setReplyCode(ReplyCode.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
686 "Transfer aborted for " +
687 (current == null? "Unknown command" :
688 current.toString()));
689 if (current != null &&
690 !FtpCommandCode.isListLikeCommand(current.getCommand())) {
691 try {
692 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
693 } catch (final CommandAbstractException e) {
694 session.setReplyCode(e);
695 }
696 }
697 finalizeExecution();
698 }
699
700
701
702
703 private void closeTransfer() {
704 logger.debug("Will close transfer");
705 final FtpFile file;
706 FtpTransfer current = null;
707 try {
708 current = getExecutingFtpTransfer();
709 file = current.getFtpFile();
710 file.closeFile();
711 } catch (final FtpNoTransferException e) {
712 logger.warn("Close problem" + " : {}", e.getMessage());
713 } catch (final FtpNoFileException ignored) {
714
715 } catch (final CommandAbstractException e) {
716 logger.warn("Close problem" + " : {}", e.getMessage());
717 }
718 if (current != null) {
719 current.setStatus(true);
720 }
721 if (session.getDataConn().isStreamFile()) {
722 endDataConnection();
723 }
724 session.setReplyCode(ReplyCode.REPLY_226_CLOSING_DATA_CONNECTION,
725 "Transfer complete for " +
726 (current == null? "Unknown command" :
727 current.toString()));
728 if (current != null) {
729 if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
730 try {
731 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
732 } catch (final CommandAbstractException e) {
733 session.setReplyCode(e);
734 }
735 } else {
736
737 try {
738 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
739 } catch (final InterruptedException e) {
740 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
741 }
742 }
743 }
744 finalizeExecution();
745 }
746
747
748
749
750
751 public final void setEndOfTransfer() {
752 try {
753 checkFtpTransferStatus();
754 } catch (final FtpNoTransferException ignored) {
755
756 }
757 }
758
759
760
761
762
763
764
765 public final void setTransferAbortedFromInternal(final boolean write) {
766 logger.debug("Set transfer aborted internal {}", write);
767 abortTransfer();
768 if (write && session.getNetworkHandler() != null) {
769 session.getNetworkHandler().writeIntermediateAnswer();
770 }
771 if (endOfCommand != null && !endOfCommand.isDone()) {
772 logger.warn("Command cancelled");
773 if (logger.isDebugEnabled()) {
774 logger.debug(new Exception("Trace only"));
775 }
776 endOfCommand.cancel();
777 }
778 }
779
780
781
782
783
784
785
786 public final void setPreEndOfTransfer() {
787 if (endOfCommand != null) {
788 endOfCommand.setSuccess();
789 logger.debug("Transfer completed");
790 }
791 }
792
793
794
795
796
797
798
799 public final void waitForEndOfTransfer() throws InterruptedException {
800 if (endOfCommand != null) {
801 endOfCommand.awaitOrInterruptible();
802 if (endOfCommand.isFailed()) {
803 logger.error("Transfer aborted");
804 if (logger.isDebugEnabled()) {
805 logger.debug(new Exception("Trace only"));
806 }
807 throw new InterruptedException("Transfer aborted");
808 }
809 }
810 }
811
812
813
814
815 private void finalizeExecution() {
816 if (commandFinishing != null) {
817 commandFinishing.setSuccess();
818 }
819 isExecutingCommandFinished.set(true);
820 executingCommand = null;
821 resetWaitForOpenedDataChannel();
822 }
823
824
825
826
827 private synchronized void endDataConnection() {
828 logger.debug("End Data connection");
829 if (isDataNetworkHandlerReady.get() && dataChannel != null) {
830 WaarpNettyUtil.awaitOrInterrupted(
831 WaarpSslUtility.closingSslChannel(dataChannel));
832 isDataNetworkHandlerReady.set(false);
833 dataChannel = null;
834 } else if (dataChannel != null) {
835 WaarpSslUtility.closingSslChannel(dataChannel);
836 isDataNetworkHandlerReady.set(false);
837 dataChannel = null;
838 }
839 isDataNetworkHandlerReady.set(false);
840 }
841
842
843
844
845
846
847
848 public final void clear() {
849 final WaarpFuture commandSetupFuture = commandSetup;
850 endDataConnection();
851 finalizeExecution();
852 if (endOfCommand != null && !endOfCommand.isDone()) {
853 logger.error("Command cancelled");
854 endOfCommand.cancel();
855 }
856 final WaarpChannelFuture wait = waitForOpenedDataChannel;
857 if (wait != null && !wait.isDone()) {
858 wait.cancel();
859 }
860 if (commandSetupFuture != null && !commandSetupFuture.isDone()) {
861 commandSetupFuture.cancel();
862 }
863 if (executorService != null) {
864 executorService.shutdownNow();
865 executorService = null;
866 }
867 }
868 }