1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.waarp.ftp.core.data;
19
20 import java.net.InetAddress;
21 import java.net.InetSocketAddress;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.TimeUnit;
26
27 import io.netty.bootstrap.Bootstrap;
28 import io.netty.channel.Channel;
29 import io.netty.channel.ChannelFuture;
30
31 import org.waarp.common.command.ReplyCode;
32 import org.waarp.common.command.exception.CommandAbstractException;
33 import org.waarp.common.command.exception.Reply425Exception;
34 import org.waarp.common.crypto.ssl.WaarpSslUtility;
35 import org.waarp.common.future.WaarpChannelFuture;
36 import org.waarp.common.future.WaarpFuture;
37 import org.waarp.common.logging.WaarpLogger;
38 import org.waarp.common.logging.WaarpLoggerFactory;
39 import org.waarp.ftp.core.command.FtpCommandCode;
40 import org.waarp.ftp.core.command.service.ABOR;
41 import org.waarp.ftp.core.config.FtpConfiguration;
42 import org.waarp.ftp.core.config.FtpInternalConfiguration;
43 import org.waarp.ftp.core.control.NetworkHandler;
44 import org.waarp.ftp.core.data.handler.DataNetworkHandler;
45 import org.waarp.ftp.core.exception.FtpNoConnectionException;
46 import org.waarp.ftp.core.exception.FtpNoFileException;
47 import org.waarp.ftp.core.exception.FtpNoTransferException;
48 import org.waarp.ftp.core.file.FtpFile;
49 import org.waarp.ftp.core.session.FtpSession;
50
51
52
53
54
55
56
57 public class FtpTransferControl {
58
59
60
61 private static final WaarpLogger logger = WaarpLoggerFactory.getLogger(FtpTransferControl.class);
62
63
64
65
66 private final FtpSession session;
67
68
69
70
71 private volatile boolean isDataNetworkHandlerReady = false;
72
73
74
75
76 private volatile Channel dataChannel = null;
77
78
79
80
81 private volatile WaarpChannelFuture waitForOpenedDataChannel = new WaarpChannelFuture(true);
82
83
84
85
86 private volatile boolean isExecutingCommandFinished = true;
87
88
89
90 private volatile WaarpFuture commandSetup = null;
91
92
93
94
95 private volatile WaarpFuture commandFinishing = null;
96
97
98
99
100 private volatile FtpTransfer executingCommand = null;
101
102
103
104
105 private ExecutorService executorService = null;
106
107
108
109
110
111 private volatile WaarpFuture endOfCommand = null;
112
113
114
115
116 private volatile boolean isCheckAlreadyCalled = false;
117
118
119
120
121
122 public FtpTransferControl(FtpSession session) {
123 this.session = session;
124 endOfCommand = null;
125 }
126
127
128
129
130
131
132 private void setDataNetworkHandlerReady() {
133 isCheckAlreadyCalled = false;
134 isDataNetworkHandlerReady = true;
135 }
136
137
138
139
140
141
142
143 public void waitForDataNetworkHandlerReady() throws InterruptedException {
144 if (!isDataNetworkHandlerReady) {
145 Thread.sleep(10);
146 if (!isDataNetworkHandlerReady) {
147
148 throw new InterruptedException("Bad initialization");
149 }
150 }
151 }
152
153
154
155
156
157
158
159 public void setOpenedDataChannel(Channel channel,
160 DataNetworkHandler dataNetworkHandler) {
161 logger.debug("SetOpenedDataChannel: " + (channel != null ? channel.remoteAddress() : "no channel"));
162 if (channel != null) {
163 session.getDataConn().setDataNetworkHandler(dataNetworkHandler);
164 waitForOpenedDataChannel.setChannel(channel);
165 waitForOpenedDataChannel.setSuccess();
166 } else {
167 waitForOpenedDataChannel.cancel();
168 }
169 }
170
171
172
173
174
175
176
177
178 public Channel waitForOpenedDataChannel() throws InterruptedException {
179 Channel channel = null;
180 if (waitForOpenedDataChannel.await(
181 session.getConfiguration().getTIMEOUTCON() + 1000,
182 TimeUnit.MILLISECONDS)) {
183 if (waitForOpenedDataChannel.isSuccess()) {
184 channel = waitForOpenedDataChannel.channel();
185 } else {
186 logger.warn("data connection is in error");
187 }
188 } else {
189 logger.warn("Timeout occurs during data connection");
190 }
191 return channel;
192 }
193
194
195
196
197 public void resetWaitForOpenedDataChannel() {
198 if (waitForOpenedDataChannel != null) {
199 waitForOpenedDataChannel.cancel();
200 }
201 waitForOpenedDataChannel = new WaarpChannelFuture(true);
202 }
203
204
205
206
207
208
209
210
211 public synchronized boolean openDataConnection() throws Reply425Exception {
212
213
214
215 if (commandSetup != null) {
216 commandSetup.cancel();
217 }
218 commandSetup = new WaarpFuture(true);
219 FtpDataAsyncConn dataAsyncConn = session.getDataConn();
220 if (!dataAsyncConn.isStreamFile()) {
221
222 if (dataAsyncConn.isActive()) {
223
224 logger.debug("Connection already open");
225 session.setReplyCode(
226 ReplyCode.REPLY_125_DATA_CONNECTION_ALREADY_OPEN,
227 dataAsyncConn.getType().name() +
228 " mode data connection already open");
229 return true;
230 }
231 } else {
232
233 if (dataAsyncConn.isActive()) {
234 logger
235 .error("Connection already open but should not since in Stream mode");
236 setTransferAbortedFromInternal(false);
237 throw new Reply425Exception(
238 "Connection already open but should not since in Stream mode");
239 }
240 }
241
242 session.setReplyCode(ReplyCode.REPLY_150_FILE_STATUS_OKAY, "Opening " +
243 dataAsyncConn.getType().name() + " mode data connection");
244 if (dataAsyncConn.isPassiveMode()) {
245 if (!dataAsyncConn.isBind()) {
246
247 throw new Reply425Exception(
248 "No passive data connection prepared");
249 }
250
251 logger.debug("Passive mode standby");
252 try {
253 dataChannel = waitForOpenedDataChannel();
254 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
255 } catch (InterruptedException e) {
256 logger.warn("Connection abort in passive mode", e);
257
258 throw new Reply425Exception(
259 "Cannot open passive data connection");
260 }
261 logger.debug("Passive mode connected");
262 } else {
263
264 InetAddress inetAddress = dataAsyncConn.getLocalAddress().getAddress();
265 InetSocketAddress inetSocketAddress = dataAsyncConn.getRemoteAddress();
266 if (session.getConfiguration().getFtpInternalConfiguration().hasFtpSession(inetAddress, inetSocketAddress)) {
267 throw new Reply425Exception(
268 "Cannot open active data connection since remote address is already in use: "
269 +
270 inetSocketAddress);
271 }
272 logger.debug("Active mode standby");
273 Bootstrap bootstrap = session.getConfiguration().getFtpInternalConfiguration()
274 .getActiveBootstrap(session.isDataSsl());
275 session.getConfiguration().setNewFtpSession(inetAddress, inetSocketAddress, session);
276
277 String mylog = session.toString();
278 logger.debug("DataConn for: " + session.getCurrentCommand().getCommand() + " to "
279 + inetSocketAddress.toString());
280 ChannelFuture future = bootstrap.connect(inetSocketAddress, dataAsyncConn.getLocalAddress());
281 try {
282 future.await();
283 } catch (InterruptedException e1) {
284 }
285 if (!future.isSuccess()) {
286 logger.warn("Connection abort in active mode from future while session: " +
287 session.toString() +
288 "\nTrying connect to: " + inetSocketAddress.toString() +
289 " From: " + dataAsyncConn.getLocalAddress() +
290 "\nWas: " + mylog,
291 future.cause());
292
293 session.getConfiguration().delFtpSession(inetAddress,
294 inetSocketAddress);
295 throw new Reply425Exception(
296 "Cannot open active data connection");
297 }
298 try {
299 dataChannel = waitForOpenedDataChannel();
300 dataAsyncConn.setNewOpenedDataChannel(dataChannel);
301 } catch (InterruptedException e) {
302 logger.warn("Connection abort in active mode", e);
303
304 session.getConfiguration().delFtpSession(inetAddress,
305 inetSocketAddress);
306 throw new Reply425Exception(
307 "Cannot open active data connection");
308 }
309 logger.debug("Active mode connected");
310 }
311 if (dataChannel == null) {
312
313 if (!dataAsyncConn.isPassiveMode()) {
314 session.getConfiguration().getFtpInternalConfiguration()
315 .delFtpSession(
316 dataAsyncConn.getLocalAddress().getAddress(),
317 dataAsyncConn.getRemoteAddress());
318 }
319 throw new Reply425Exception(
320 "Cannot open data connection, shuting down");
321 }
322 return true;
323 }
324
325
326
327
328
329 private void runExecutor() {
330 endOfCommand = new WaarpFuture(true);
331 try {
332 session.getDataConn().getDataNetworkHandler().setFtpTransfer(executingCommand);
333 } catch (FtpNoConnectionException e1) {
334 }
335 waitForOpenedDataChannel.channel().config().setAutoRead(true);
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363 if (executorService == null) {
364 executorService = Executors.newSingleThreadExecutor();
365 }
366 executorService.execute(new FtpTransferExecutor(session,
367 executingCommand));
368 try {
369 commandFinishing.await();
370 if (commandFinishing.isFailed()) {
371 endOfCommand.cancel();
372 }
373 } catch (InterruptedException e) {
374 }
375 }
376
377
378
379
380
381
382
383
384
385 public void setNewFtpTransfer(FtpCommandCode command, FtpFile file) {
386 isExecutingCommandFinished = false;
387 commandFinishing = new WaarpFuture(true);
388 logger.debug("setNewCommand: {}", command);
389 setDataNetworkHandlerReady();
390 executingCommand = new FtpTransfer(command, file);
391 runExecutor();
392 commandFinishing = null;
393 commandSetup.setSuccess();
394 if (!session.getDataConn().isStreamFile()) {
395 waitForOpenedDataChannel.channel().config().setAutoRead(false);
396 }
397 }
398
399
400
401
402
403
404
405
406
407
408
409 public void setNewFtpTransfer(FtpCommandCode command, List<String> list,
410 String path) {
411 isExecutingCommandFinished = false;
412 commandFinishing = new WaarpFuture(true);
413 logger.debug("setNewCommand: {}", command);
414 setDataNetworkHandlerReady();
415 executingCommand = new FtpTransfer(command, list, path);
416 runExecutor();
417 commandFinishing = null;
418 commandSetup.setSuccess();
419 if (!session.getDataConn().isStreamFile()) {
420 waitForOpenedDataChannel.channel().config().setAutoRead(false);
421 }
422 try {
423 session.getDataConn().getDataNetworkHandler().setFtpTransfer(null);
424 } catch (FtpNoConnectionException e1) {
425 }
426 }
427
428 public boolean waitFtpTransferExecuting() {
429 boolean notFinished = true;
430 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 100; i++) {
431 if (isExecutingCommandFinished
432 || commandFinishing == null
433 || session.isCurrentCommandFinished()
434 ||
435 (commandFinishing != null && commandFinishing
436 .awaitUninterruptibly(FtpInternalConfiguration.RETRYINMS))) {
437 notFinished = false;
438 break;
439 }
440 }
441 return notFinished;
442 }
443
444
445
446
447
448
449
450 public boolean isFtpTransferExecuting() {
451 return !isExecutingCommandFinished;
452 }
453
454
455
456
457
458
459 public FtpTransfer getExecutingFtpTransfer() throws FtpNoTransferException {
460 if (executingCommand != null) {
461 return executingCommand;
462 }
463 throw new FtpNoTransferException("No Command currently running");
464 }
465
466
467
468
469
470
471
472
473 boolean isExecutingRetrLikeTransfer()
474 throws FtpNoTransferException, CommandAbstractException,
475 FtpNoFileException {
476 return !session.isCurrentCommandFinished() &&
477 FtpCommandCode.isRetrLikeCommand(getExecutingFtpTransfer()
478 .getCommand()) &&
479 getExecutingFtpTransfer().getFtpFile().isInReading();
480 }
481
482
483
484
485
486
487
488 private boolean checkFtpTransferStatus() throws FtpNoTransferException {
489 if (isCheckAlreadyCalled) {
490 logger.warn("Check: ALREADY CALLED");
491 return true;
492 }
493 if (isExecutingCommandFinished) {
494
495 logger.warn("Check: already Finished");
496 if (commandFinishing != null) {
497 commandFinishing.cancel();
498 }
499 throw new FtpNoTransferException("No transfer running");
500 }
501 if (!isDataNetworkHandlerReady) {
502
503 logger.warn("Check: already DNH not ready");
504 throw new FtpNoTransferException("No connection");
505 }
506 isCheckAlreadyCalled = true;
507 FtpTransfer executedTransfer = getExecutingFtpTransfer();
508 logger.debug("Check: command {}", executedTransfer.getCommand());
509
510 if (FtpCommandCode.isListLikeCommand(executedTransfer.getCommand())) {
511 if (executedTransfer.getStatus()) {
512
513 logger.debug("Check: List OK");
514 closeTransfer();
515 return false;
516 }
517 logger.debug("Check: List Ko");
518 abortTransfer();
519 return false;
520 } else if (FtpCommandCode.isRetrLikeCommand(executedTransfer
521 .getCommand())) {
522 FtpFile file = null;
523 try {
524 file = executedTransfer.getFtpFile();
525 } catch (FtpNoFileException e) {
526 logger.debug("Check: Retr no FtpFile for Retr");
527 abortTransfer();
528 return false;
529 }
530 try {
531 if (file.isInReading()) {
532 logger
533 .debug("Check: Retr FtpFile still in reading KO");
534 abortTransfer();
535 } else {
536 logger
537 .debug("Check: Retr FtpFile no more in reading OK");
538 closeTransfer();
539 }
540 } catch (CommandAbstractException e) {
541 logger.warn("Retr Test is in Reading problem", e);
542 closeTransfer();
543 }
544 return false;
545 } else if (FtpCommandCode.isStoreLikeCommand(executedTransfer
546 .getCommand())) {
547
548 closeTransfer();
549 return false;
550 } else {
551 logger.warn("Check: Unknown command");
552 abortTransfer();
553 }
554 return false;
555 }
556
557
558
559
560 private void abortTransfer() {
561 logger.debug("Will abort transfer and write: ", new Exception("trace only"));
562 FtpFile file = null;
563 FtpTransfer current = null;
564 try {
565 current = getExecutingFtpTransfer();
566 file = current.getFtpFile();
567 file.abortFile();
568 } catch (FtpNoTransferException e) {
569 logger.warn("Abort problem", e);
570 } catch (FtpNoFileException e) {
571 } catch (CommandAbstractException e) {
572 logger.warn("Abort problem", e);
573 }
574 if (current != null) {
575 current.setStatus(false);
576 }
577 endDataConnection();
578 session.setReplyCode(
579 ReplyCode.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
580 "Transfer aborted for " +
581 (current == null ? "Unknown command" : current
582 .toString()));
583 if (current != null) {
584 if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
585 try {
586 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
587 } catch (CommandAbstractException e) {
588 session.setReplyCode(e);
589 }
590 }
591 }
592 finalizeExecution();
593 }
594
595
596
597
598
599 private void closeTransfer() {
600 logger.debug("Will close transfer");
601 FtpFile file = null;
602 FtpTransfer current = null;
603 try {
604 current = getExecutingFtpTransfer();
605 file = current.getFtpFile();
606 file.closeFile();
607 } catch (FtpNoTransferException e) {
608 logger.warn("Close problem", e);
609 } catch (FtpNoFileException e) {
610 } catch (CommandAbstractException e) {
611 logger.warn("Close problem", e);
612 }
613 if (current != null) {
614 current.setStatus(true);
615 }
616 if (session.getDataConn().isStreamFile()) {
617 endDataConnection();
618 }
619 session.setReplyCode(ReplyCode.REPLY_226_CLOSING_DATA_CONNECTION,
620 "Transfer complete for " +
621 (current == null ? "Unknown command" : current
622 .toString()));
623 if (current != null) {
624 if (!FtpCommandCode.isListLikeCommand(current.getCommand())) {
625 try {
626 session.getBusinessHandler().afterTransferDoneBeforeAnswer(current);
627 } catch (CommandAbstractException e) {
628 session.setReplyCode(e);
629 }
630 } else {
631
632 try {
633 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
634 } catch (InterruptedException e) {
635 }
636 }
637 }
638 finalizeExecution();
639 }
640
641
642
643
644
645
646 public void setEndOfTransfer() {
647 try {
648 checkFtpTransferStatus();
649 } catch (FtpNoTransferException e) {
650 return;
651 }
652 }
653
654
655
656
657
658
659
660
661 public void setTransferAbortedFromInternal(boolean write) {
662 logger.debug("Set transfer aborted internal {}", write);
663 abortTransfer();
664 if (write) {
665 session.getNetworkHandler().writeIntermediateAnswer();
666 }
667 if (endOfCommand != null) {
668 endOfCommand.cancel();
669 }
670 }
671
672
673
674
675
676 public void setPreEndOfTransfer() {
677 if (endOfCommand != null) {
678 endOfCommand.setSuccess();
679 logger.debug("Transfer completed");
680 }
681 }
682
683
684
685
686
687
688 public void waitForEndOfTransfer() throws InterruptedException {
689 if (endOfCommand != null) {
690 endOfCommand.await();
691 if (endOfCommand.isFailed()) {
692 throw new InterruptedException("Transfer aborted");
693 }
694 }
695
696 }
697
698
699
700
701
702
703 private void finalizeExecution() {
704
705 if (commandFinishing != null) {
706 commandFinishing.setSuccess();
707 }
708 isExecutingCommandFinished = true;
709 executingCommand = null;
710 resetWaitForOpenedDataChannel();
711 }
712
713
714
715
716
717 private synchronized void endDataConnection() {
718 logger.debug("End Data connection");
719 if (isDataNetworkHandlerReady && dataChannel != null) {
720 try {
721 WaarpSslUtility.closingSslChannel(dataChannel).await(FtpConfiguration.getDATATIMEOUTCON(),
722 TimeUnit.MILLISECONDS);
723 } catch (InterruptedException e) {
724 }
725 isDataNetworkHandlerReady = false;
726
727 dataChannel = null;
728 }
729 }
730
731
732
733
734
735
736 public void clear() {
737
738 endDataConnection();
739 finalizeExecution();
740 if (endOfCommand != null) {
741 endOfCommand.cancel();
742 }
743 if (waitForOpenedDataChannel != null) {
744 waitForOpenedDataChannel.cancel();
745 }
746 if (commandSetup != null) {
747 commandSetup.cancel();
748 }
749 if (executorService != null) {
750 executorService.shutdownNow();
751 executorService = null;
752 }
753 }
754 }