1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.client;
21
22 import org.dom4j.Document;
23 import org.dom4j.DocumentException;
24 import org.waarp.common.database.exception.WaarpDatabaseException;
25 import org.waarp.common.filemonitor.FileMonitor;
26 import org.waarp.common.filemonitor.FileMonitor.FileItem;
27 import org.waarp.common.filemonitor.FileMonitorCommandFactory;
28 import org.waarp.common.filemonitor.FileMonitorCommandRunnableFuture;
29 import org.waarp.common.filemonitor.RegexFileFilter;
30 import org.waarp.common.logging.WaarpLogger;
31 import org.waarp.common.logging.WaarpLoggerFactory;
32 import org.waarp.common.logging.WaarpSlf4JLoggerFactory;
33 import org.waarp.common.utility.WaarpShutdownHook;
34 import org.waarp.common.utility.WaarpSystemUtil;
35 import org.waarp.common.utility.WaarpThreadFactory;
36 import org.waarp.common.xml.XmlDecl;
37 import org.waarp.common.xml.XmlHash;
38 import org.waarp.common.xml.XmlType;
39 import org.waarp.common.xml.XmlUtil;
40 import org.waarp.common.xml.XmlValue;
41 import org.waarp.openr66.configuration.FileBasedConfiguration;
42 import org.waarp.openr66.context.ErrorCode;
43 import org.waarp.openr66.context.R66Result;
44 import org.waarp.openr66.context.task.SpooledInformTask;
45 import org.waarp.openr66.database.data.DbRule;
46 import org.waarp.openr66.database.data.DbTaskRunner;
47 import org.waarp.openr66.protocol.configuration.Configuration;
48 import org.waarp.openr66.protocol.configuration.Messages;
49 import org.waarp.openr66.protocol.localhandler.packet.BusinessRequestPacket;
50 import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
51 import org.waarp.openr66.protocol.utils.R66Future;
52
53 import java.io.File;
54 import java.io.FileFilter;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.concurrent.ExecutorService;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.TimeUnit;
60
61 import static org.waarp.common.database.DbConstant.*;
62 import static org.waarp.openr66.context.ErrorCode.*;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public class SpooledDirectoryTransfer implements Runnable {
116 public static final String NEEDFULL = "needfull";
117 public static final String PARTIALOK = "Validated";
118
119
120
121
122 protected static volatile WaarpLogger logger;
123
124 protected static String infoArgs =
125 Messages.getString("SpooledDirectoryTransfer.0");
126
127 protected static final String NO_INFO_ARGS = "noinfo";
128
129 protected final R66Future future;
130
131 public final String name;
132
133 protected final List<String> directory;
134
135 protected final String statusFile;
136
137 protected final String stopFile;
138
139 protected final String ruleName;
140
141 protected final String fileInfo;
142
143 protected final boolean isMD5;
144
145 protected final List<String> remoteHosts;
146
147 protected final String regexFilter;
148
149 protected final List<String> waarpHosts;
150
151 protected final int blocksize;
152
153 protected final long elapseTime;
154
155 protected final long elapseWaarpTime;
156
157 protected final boolean parallel;
158
159 protected final int limitParallelTasks;
160
161 protected final boolean submit;
162
163 protected final boolean nolog;
164
165 protected final boolean recurs;
166
167 protected final long minimalSize;
168
169 protected final boolean normalInfoAsWarn;
170
171 protected final boolean ignoreAlreadyUsed;
172
173 protected final NetworkTransaction networkTransaction;
174
175 protected FileMonitor monitor;
176
177 private long sent;
178 private long error;
179
180
181
182
183
184
185 public SpooledDirectoryTransfer(final R66Future future,
186 final Arguments arguments,
187 final NetworkTransaction networkTransaction) {
188 if (logger == null) {
189 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
190 }
191 this.future = future;
192 this.name = arguments.name;
193 this.directory = arguments.localDirectory;
194 statusFile = arguments.statusFile;
195 stopFile = arguments.stopFile;
196 this.ruleName = arguments.rule;
197 this.fileInfo = arguments.fileInfo;
198 this.isMD5 = arguments.isMd5;
199 this.remoteHosts = arguments.remoteHosts;
200 this.blocksize = arguments.block;
201 regexFilter = arguments.regex;
202 elapseTime = arguments.elapsed;
203 this.submit = arguments.toSubmit;
204 this.nolog = arguments.noLog && !arguments.toSubmit;
205 AbstractTransfer.nolog = this.nolog;
206 recurs = arguments.recursive;
207 elapseWaarpTime = arguments.elapsedWaarp;
208 if (this.submit) {
209 this.parallel = false;
210 } else {
211 this.parallel = arguments.isParallel;
212 }
213 limitParallelTasks = arguments.limitParallel;
214 waarpHosts = arguments.waarpHosts;
215 this.minimalSize = arguments.minimalSize;
216 normalInfoAsWarn = arguments.logWarn;
217 this.ignoreAlreadyUsed = arguments.ignoreAlreadyUsed;
218 this.networkTransaction = networkTransaction;
219 }
220
221 @Override
222 public void run() {
223 setSent(0);
224 setError(0);
225
226 final DbRule dbrule;
227 try {
228 dbrule = new DbRule(ruleName);
229 } catch (final WaarpDatabaseException e1) {
230 logger.error(Messages.getString("Transfer.18") + ": {}",
231 e1.getMessage());
232 future.setFailure(e1);
233 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
234 null) {
235 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
236 new Exception(Messages.getString("Transfer.18") + e1.getMessage()));
237 }
238 return;
239 }
240 if (dbrule.isRecvMode()) {
241 logger.error(
242 Messages.getString("SpooledDirectoryTransfer.5"));
243 future.cancel();
244 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
245 null) {
246 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
247 new Exception(Messages.getString("SpooledDirectoryTransfer.5")));
248 }
249 return;
250 }
251 final File status = new File(statusFile);
252 if (status.isDirectory()) {
253 logger.error(
254 Messages.getString("SpooledDirectoryTransfer.6"));
255 future.cancel();
256 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
257 null) {
258 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
259 new Exception(Messages.getString("SpooledDirectoryTransfer.6")));
260 }
261 return;
262 }
263 final File stop = new File(stopFile);
264 if (stop.isDirectory()) {
265 logger.error(
266 Messages.getString("SpooledDirectoryTransfer.7"));
267 future.cancel();
268 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
269 null) {
270 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
271 new Exception(Messages.getString("SpooledDirectoryTransfer.7")));
272 }
273 return;
274 } else if (stop.exists()) {
275 logger.warn(
276 Messages.getString("SpooledDirectoryTransfer.8"));
277 future.setSuccess();
278 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
279 null) {
280 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
281 new Exception(Messages.getString("SpooledDirectoryTransfer.8")));
282 }
283 return;
284 }
285 for (final String dirname : directory) {
286 final File dir = new File(dirname);
287 if (!dir.isDirectory()) {
288 logger.error(Messages.getString("SpooledDirectoryTransfer.9") + " : " +
289 dir);
290 future.cancel();
291 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
292 null) {
293 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
294 new Exception(Messages.getString("SpooledDirectoryTransfer.9")));
295 }
296 return;
297 }
298 }
299 FileFilter filter = null;
300 if (regexFilter != null) {
301 filter = new RegexFileFilter(regexFilter, minimalSize);
302 } else if (minimalSize > 0) {
303 filter = new RegexFileFilter(minimalSize);
304 }
305 final FileMonitorCommandRunnableFuture commandValidFile =
306 initializeFileMonitorCommandRunnableFuture(status, stop, filter);
307 File dir;
308 if (!monitor.initialized()) {
309
310 logger.error(
311 Messages.getString("Configuration.WrongInit") + " : already running");
312 future.cancel();
313 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
314 null) {
315 Configuration.configuration.getShutdownConfiguration().serviceFuture.setFailure(
316 new Exception(Messages.getString("Configuration.WrongInit") +
317 " : already running"));
318 }
319 return;
320 }
321 commandValidFile.setMonitor(monitor);
322 if (parallel) {
323 final FileMonitorCommandFactory factory =
324 new FileMonitorCommandFactory() {
325
326 @Override
327 public final FileMonitorCommandRunnableFuture create(
328 final FileItem fileItem) {
329 final SpooledRunner runner = new SpooledRunner(fileItem);
330 runner.setMonitor(monitor);
331 return runner;
332 }
333 };
334 monitor.setCommandValidFileFactory(factory, limitParallelTasks);
335 }
336 final FileMonitor monitorArg = monitor;
337 if (waarpHosts != null && !waarpHosts.isEmpty()) {
338 setWaarpHostCommand(monitorArg);
339 }
340 for (int i = 1; i < directory.size(); i++) {
341 dir = new File(directory.get(i));
342 monitor.addDirectory(dir);
343 }
344 logger.warn("SpooledDirectoryTransfer starts name:" + name + " directory:" +
345 directory + " statusFile:" + statusFile + " stopFile:" +
346 stopFile + " rulename:" + ruleName + " fileinfo:" + fileInfo +
347 " hosts:" + remoteHosts + " regex:" + regexFilter +
348 " minimalSize:" + minimalSize + " waarp:" + waarpHosts +
349 " elapse:" + elapseTime + " waarpElapse:" + elapseWaarpTime +
350 " parallel:" + parallel + " limitParallel:" +
351 limitParallelTasks + " submit:" + submit + " recursive:" +
352 recurs);
353 monitor.start();
354 monitor.waitForStopFile();
355 future.setSuccess();
356 if (Configuration.configuration.getShutdownConfiguration().serviceFuture !=
357 null) {
358 Configuration.configuration.getShutdownConfiguration().serviceFuture.setSuccess();
359 }
360 }
361
362 private void setWaarpHostCommand(final FileMonitor monitorArg) {
363 final FileMonitorCommandRunnableFuture waarpHostCommand;
364 waarpHostCommand = new FileMonitorCommandRunnableFuture() {
365 @Override
366 public void run(final FileItem notused) {
367 try {
368 Thread.currentThread().setName("FileMonitorInformation_" + name);
369 if (admin.getSession() != null && admin.getSession().isDisActive()) {
370 admin.getSession().checkConnectionNoException();
371 }
372 String status = monitorArg.getStatus();
373 if (normalInfoAsWarn) {
374 logger.warn("Will inform back Waarp hosts of current history: " +
375 monitorArg.getCurrentHistoryNb());
376 } else {
377 logger.info("Will inform back Waarp hosts of current history: {}",
378 monitorArg.getCurrentHistoryNb());
379 }
380 for (String host : waarpHosts) {
381 if (host == null) {
382 continue;
383 }
384 host = host.trim();
385 if (!host.isEmpty()) {
386 final R66Future r66Future = new R66Future(true);
387 final BusinessRequestPacket packet = new BusinessRequestPacket(
388 SpooledInformTask.class.getName() + ' ' + status, 0);
389 final BusinessRequest transaction =
390 new BusinessRequest(networkTransaction, r66Future, host,
391 packet);
392 transaction.run();
393 r66Future.awaitOrInterruptible();
394 if (!r66Future.isSuccess()) {
395 logger.info("Can't inform Waarp server: {} since {}", host,
396 r66Future.getCause());
397 } else {
398 final R66Result result = r66Future.getResult();
399 if (result == null) {
400 monitorArg.setNextAsFullStatus();
401 } else {
402 status = (String) result.getOther();
403 if (status == null || status.equalsIgnoreCase(NEEDFULL)) {
404 monitorArg.setNextAsFullStatus();
405 }
406 }
407 logger.debug("Inform back Waarp hosts over for: {}", host);
408 }
409 }
410 }
411 } catch (final Throwable e) {
412 logger.error("Issue during Waarp information", e);
413
414 }
415 }
416 };
417 monitor.setCommandCheckIteration(waarpHostCommand);
418 monitor.setElapseWaarpTime(elapseWaarpTime);
419 }
420
421 private FileMonitorCommandRunnableFuture initializeFileMonitorCommandRunnableFuture(
422 final File status, final File stop, final FileFilter filter) {
423
424 final FileMonitorCommandRunnableFuture commandValidFile =
425 new SpooledRunner(null);
426 final FileMonitorCommandRunnableFuture waarpRemovedCommand =
427 new FileMonitorCommandRunnableFuture() {
428 @Override
429 public void run(final FileItem file) {
430 if (normalInfoAsWarn) {
431 logger.warn("File removed: {}", file.file);
432 } else {
433 logger.info("File removed: {}", file.file);
434 }
435 }
436 };
437 final File dir = new File(directory.get(0));
438 monitor = new FileMonitor(name, status, stop, dir, null, elapseTime, filter,
439 recurs, commandValidFile, waarpRemovedCommand,
440 null);
441 monitor.setIgnoreAlreadyUsed(ignoreAlreadyUsed);
442 return commandValidFile;
443 }
444
445 public void stop() {
446 if (monitor != null) {
447 logger.info("Stop Monitor");
448 monitor.stop();
449 logger.info("Monitor Stopped");
450 } else {
451 logger.warn("NO MONITOR found");
452 }
453 }
454
455 public class SpooledRunner extends FileMonitorCommandRunnableFuture {
456 private static final String REQUEST_INFORMATION_FAILURE =
457 "RequestInformation.Failure";
458 private static final String REMOTE2 = "</REMOTE>";
459 private static final String REMOTE = "<REMOTE>";
460
461 public SpooledRunner(final FileItem fileItem) {
462 super(fileItem);
463 if (logger == null) {
464 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
465 }
466 }
467
468 @Override
469 public void run(final FileItem fileItem) {
470 setFileItem(fileItem);
471 checkReuse(ignoreAlreadyUsed);
472 if (admin.getSession() != null && admin.getSession().isDisActive()) {
473 admin.getSession().checkConnectionNoException();
474 }
475 boolean finalStatus = false;
476 int ko = 0;
477 long specialId =
478 remoteHosts.size() > 1? ILLEGALVALUE : fileItem.specialId;
479 long newSpecialId = ILLEGALVALUE;
480
481 if (isIgnored(ignoreAlreadyUsed)) {
482
483 return;
484 }
485 specialId = checkReuseUniqueHost(fileItem, specialId);
486 try {
487 for (String host : remoteHosts) {
488 if (host == null) {
489 continue;
490 }
491 host = host.trim();
492 if (!host.isEmpty()) {
493 final String filename = fileItem.file.getAbsolutePath();
494 logger.info(
495 "Launch transfer to " + host + " with file " + filename);
496 R66Future r66Future = new R66Future(true);
497 final String text;
498 if (submit) {
499 text = submitTransfer(specialId, host, filename, r66Future);
500 } else {
501 if (specialId != ILLEGALVALUE) {
502
503 cleanPreviousTransfer(specialId, host, filename, r66Future);
504 }
505 text = "Direct Transfer: ";
506 r66Future = new R66Future(true);
507 directTransfer(host, filename, r66Future, text);
508 }
509 r66Future.awaitOrInterruptible();
510 final R66Result r66result = r66Future.getResult();
511 if (r66Future.isSuccess()) {
512 finalStatus = true;
513 newSpecialId =
514 finalizeInSuccess(newSpecialId, host, text, r66result);
515 } else {
516 setError(getError() + 1);
517 ko++;
518 final DbTaskRunner runner;
519 if (r66result != null) {
520 String errMsg = "Unknown Error Message";
521 if (r66Future.getCause() != null) {
522 errMsg = r66Future.getCause().getMessage();
523 }
524 final boolean isConnectionImpossible =
525 r66result.getCode() == ErrorCode.ConnectionImpossible &&
526 !normalInfoAsWarn;
527 runner = r66result.getRunner();
528 if (runner != null) {
529 newSpecialId = koOnFoundRunner(host, text, runner, errMsg,
530 isConnectionImpossible);
531 } else {
532 ko = getKoOnNoRunner(ko, host, r66Future, text, r66result,
533 isConnectionImpossible);
534 }
535 } else {
536 logger.error(
537 text + Messages.getString(REQUEST_INFORMATION_FAILURE)
538
539 + REMOTE + host + REMOTE2, r66Future.getCause());
540 }
541 }
542 }
543 }
544 } catch (final Throwable e) {
545
546 logger.error("Error in SpooledDirectory", e);
547 finalStatus = false;
548 }
549 specialId = remoteHosts.size() > 1? ILLEGALVALUE : newSpecialId;
550 if (ko > 0) {
551
552 finalStatus = false;
553 }
554 finalizeValidFile(finalStatus, specialId);
555 }
556
557 private void directTransfer(final String host, final String filename,
558 final R66Future r66Future, final String text) {
559 final DirectTransfer transaction =
560 new DirectTransfer(r66Future, host, filename, ruleName, fileInfo,
561 isMD5, blocksize, ILLEGALVALUE,
562 networkTransaction);
563 if (!fileInfo.contains("-nofollow")) {
564 TransferArgs.forceAnalyzeFollow(transaction);
565 }
566
567 transaction.normalInfoAsWarn = normalInfoAsWarn;
568 logger.info("{}{}", text, host);
569 transaction.run();
570 }
571
572 private String submitTransfer(final long specialId, final String host,
573 final String filename,
574 final R66Future r66Future) {
575 final String text;
576 text = "Submit Transfer: ";
577 final SubmitTransfer transaction =
578 new SubmitTransfer(r66Future, host, filename, ruleName, fileInfo,
579 isMD5, blocksize, specialId, null);
580 if (!fileInfo.contains("-nofollow")) {
581 TransferArgs.forceAnalyzeFollow(transaction);
582 }
583 transaction.normalInfoAsWarn = normalInfoAsWarn;
584 logger.info("{}{}", text, host);
585 transaction.run();
586 return text;
587 }
588
589 private long koOnFoundRunner(final String host, final String text,
590 final DbTaskRunner runner, final String errMsg,
591 final boolean isConnectionImpossible) {
592 final long newSpecialId;
593 newSpecialId = runner.getSpecialId();
594 DbTaskRunner.removeNoDbSpecialId(newSpecialId);
595 if (isConnectionImpossible) {
596 if (logger.isInfoEnabled()) {
597 logger.info("{}{}{}{}{}{}{}{}", text,
598 Messages.getString(REQUEST_INFORMATION_FAILURE),
599
600 runner.toShortString(), REMOTE, host, "</REMOTE><REASON>",
601 errMsg, "</REASON>");
602 }
603 } else {
604 logger.error(text + Messages.getString(REQUEST_INFORMATION_FAILURE) +
605
606 runner.toShortString() + REMOTE + host +
607 "</REMOTE><REASON>" + errMsg + "</REASON>");
608 }
609 return newSpecialId;
610 }
611
612 private int getKoOnNoRunner(int ko, final String host,
613 final R66Future r66Future, final String text,
614 final R66Result r66result,
615 final boolean isConnectionImpossible) {
616 if (isConnectionImpossible) {
617 logger.info("{}{}{}{}{}", text,
618 Messages.getString(REQUEST_INFORMATION_FAILURE), REMOTE,
619 host, REMOTE2, r66Future.getCause());
620 } else {
621 if (r66result.getCode() == QueryRemotelyUnknown) {
622 logger.info("Transfer not found {}{}{}", REMOTE, host, REMOTE2);
623
624 ko--;
625 setError(getError() - 1);
626 } else {
627 logger.error(
628 text + Messages.getString(REQUEST_INFORMATION_FAILURE) + REMOTE +
629 host + REMOTE2, r66Future.getCause());
630 }
631 }
632 return ko;
633 }
634
635 private long finalizeInSuccess(long newSpecialId, final String host,
636 final String text,
637 final R66Result r66result) {
638 setSent(getSent() + 1);
639 final DbTaskRunner runner;
640 if (r66result != null) {
641 runner = r66result.getRunner();
642 if (runner != null) {
643 newSpecialId = runner.getSpecialId();
644 String status =
645 Messages.getString("RequestInformation.Success");
646 if (runner.getErrorInfo() == ErrorCode.Warning) {
647 status =
648 Messages.getString("RequestInformation.Warned");
649 }
650 if (normalInfoAsWarn) {
651 logger.warn(
652 text + " status: " + status + " " + runner.toShortString() +
653 " <REMOTE>" + host + REMOTE2 + " <FILEFINAL>" +
654 (r66result.getFile() != null?
655 r66result.getFile() + "</FILEFINAL>" : "no file"));
656 } else if (logger.isInfoEnabled()) {
657 logger.info(
658 "{} status: {} {} <REMOTE>{}</REMOTE> <FILEFINAL>{}",
659 text, status, runner.toShortString(), host,
660 (r66result.getFile() != null?
661 r66result.getFile() + "</FILEFINAL>" : "no file"));
662 }
663 if (nolog && !submit) {
664
665 try {
666 runner.delete();
667 } catch (final WaarpDatabaseException e) {
668 logger.warn(
669 "Cannot apply nolog to " + runner.toShortString() +
670 " : {}", e.getMessage());
671 }
672 }
673 DbTaskRunner.removeNoDbSpecialId(newSpecialId);
674 } else {
675 if (normalInfoAsWarn) {
676 logger.warn(text + Messages.getString("RequestInformation.Success")
677
678 + REMOTE + host + REMOTE2);
679 } else {
680 logger.info("{}{}{}{}{}", text,
681 Messages.getString("RequestInformation.Success"),
682
683 REMOTE, host, REMOTE2);
684 }
685 }
686 } else {
687 if (normalInfoAsWarn) {
688 logger.warn(text + Messages.getString("RequestInformation.Success")
689
690 + REMOTE + host + REMOTE2);
691 } else {
692 logger.info("{}{}{}{}{}", text,
693 Messages.getString("RequestInformation.Success"),
694
695 REMOTE, host, REMOTE2);
696 }
697 }
698 return newSpecialId;
699 }
700
701 private void cleanPreviousTransfer(final long specialId, final String host,
702 final String filename,
703 final R66Future r66Future) {
704 final String text;
705 try {
706 final String srequester = Configuration.configuration.getHostId(host);
707 text =
708 "Request Transfer Cancelled: " + specialId + ' ' + filename + ' ';
709
710 logger.debug("Will try to cancel {}", specialId);
711 final RequestTransfer transaction2 =
712 new RequestTransfer(r66Future, specialId, host, srequester, true,
713 false, false, networkTransaction);
714 transaction2.normalInfoAsWarn = normalInfoAsWarn;
715 logger.warn(text + host);
716 transaction2.run();
717
718 r66Future.awaitOrInterruptible();
719 } catch (final WaarpDatabaseException e) {
720 if (admin.getSession() != null) {
721 admin.getSession().checkConnectionNoException();
722 }
723 logger.warn(Messages.getString("RequestTransfer.5") + host + " : {}",
724 e.getMessage());
725 }
726 }
727
728 private long checkReuseUniqueHost(final FileItem fileItem, long specialId) {
729 if (isReuse() && remoteHosts.size() == 1) {
730
731
732 if (!submit) {
733
734 setValid(fileItem);
735 } else {
736
737 String host = remoteHosts.get(0);
738 if (host == null) {
739 return specialId;
740 }
741 host = host.trim();
742 if (!host.isEmpty()) {
743 final String filename = fileItem.file.getAbsolutePath();
744 final String text =
745 "Request Transfer to be cancelled: " + fileItem.specialId +
746 ' ' + filename + ' ';
747 try {
748 final R66Future r66Future = new R66Future(true);
749 final String srequester =
750 Configuration.configuration.getHostId(host);
751
752 final RequestTransfer transaction =
753 new RequestTransfer(r66Future, fileItem.specialId, host,
754 srequester, true, false, false,
755 networkTransaction);
756 transaction.normalInfoAsWarn = normalInfoAsWarn;
757 logger.info("{}{}", text, host);
758
759 transaction.run();
760 r66Future.awaitOrInterruptible();
761
762 setValid(fileItem);
763 specialId = fileItem.specialId;
764 } catch (final WaarpDatabaseException e) {
765 if (admin.getSession() != null) {
766 admin.getSession().checkConnectionNoException();
767 }
768 logger.warn(
769 Messages.getString("RequestTransfer.5") + host + " : {}",
770 e.getMessage());
771 }
772 }
773 }
774 }
775 return specialId;
776 }
777 }
778
779
780
781
782 public static class Arguments {
783 private String name;
784 private final List<String> remoteHosts = new ArrayList<String>();
785 private final List<String> localDirectory = new ArrayList<String>();
786 private String rule;
787 private String fileInfo = NO_INFO_ARGS;
788 private boolean isMd5;
789 private int block = 0x10000;
790 private String statusFile;
791 private String stopFile;
792 private String regex;
793 private long elapsed = 1000;
794 private long elapsedWaarp = 5000;
795 private boolean toSubmit = true;
796 private boolean noLog;
797 private boolean recursive;
798 private final List<String> waarpHosts = new ArrayList<String>();
799 private boolean isParallel = true;
800 private int limitParallel;
801 private long minimalSize;
802 private boolean logWarn = true;
803 private boolean ignoreAlreadyUsed = false;
804
805 public final String getName() {
806 return name;
807 }
808
809 public final void setName(final String name) {
810 this.name = name;
811 }
812
813 public final List<String> getRemoteHosts() {
814 return remoteHosts;
815 }
816
817 public final List<String> getLocalDirectory() {
818 return localDirectory;
819 }
820
821 public final String getRule() {
822 return rule;
823 }
824
825 public final void setRule(final String rule) {
826 this.rule = rule;
827 }
828
829 public final String getFileInfo() {
830 return fileInfo;
831 }
832
833 public final void setFileInfo(final String fileInfo) {
834 this.fileInfo = fileInfo;
835 }
836
837 public final boolean isMd5() {
838 return isMd5;
839 }
840
841 public final void setMd5(final boolean md5) {
842 this.isMd5 = md5;
843 }
844
845 public final int getBlock() {
846 return block;
847 }
848
849 public final void setBlock(final int block) {
850 this.block = block;
851 }
852
853 public final String getStatusFile() {
854 return statusFile;
855 }
856
857 public final void setStatusFile(final String statusFile) {
858 this.statusFile = statusFile;
859 }
860
861 public final String getStopFile() {
862 return stopFile;
863 }
864
865 public final void setStopFile(final String stopFile) {
866 this.stopFile = stopFile;
867 }
868
869 public final String getRegex() {
870 return regex;
871 }
872
873 public final void setRegex(final String regex) {
874 this.regex = regex;
875 }
876
877 public final long getElapsed() {
878 return elapsed;
879 }
880
881 public final void setElapsed(final long elapsed) {
882 this.elapsed = elapsed;
883 }
884
885 public final long getElapsedWaarp() {
886 return elapsedWaarp;
887 }
888
889 public final void setElapsedWaarp(final long elapsedWaarp) {
890 this.elapsedWaarp = elapsedWaarp;
891 }
892
893 public final boolean isToSubmit() {
894 return toSubmit;
895 }
896
897 public final void setToSubmit(final boolean toSubmit) {
898 this.toSubmit = toSubmit;
899 }
900
901 public final boolean isNoLog() {
902 return noLog;
903 }
904
905 public final void setNoLog(final boolean noLog) {
906 this.noLog = noLog;
907 }
908
909 public final boolean isRecursive() {
910 return recursive;
911 }
912
913 public final void setRecursive(final boolean recursive) {
914 this.recursive = recursive;
915 }
916
917 public final List<String> getWaarpHosts() {
918 return waarpHosts;
919 }
920
921 public final boolean isParallel() {
922 return isParallel;
923 }
924
925 public final void setParallel(final boolean parallel) {
926 this.isParallel = parallel;
927 }
928
929 public final int getLimitParallel() {
930 return limitParallel;
931 }
932
933 public final void setLimitParallel(final int limitParallel) {
934 this.limitParallel = limitParallel;
935 }
936
937 public final long getMinimalSize() {
938 return minimalSize;
939 }
940
941 public final void setMinimalSize(final long minimalSize) {
942 this.minimalSize = minimalSize;
943 }
944
945 public final boolean isLogWarn() {
946 return logWarn;
947 }
948
949 public final void setLogWarn(final boolean logWarn) {
950 this.logWarn = logWarn;
951 }
952
953 public final boolean isIgnoreAlreadyUsed() {
954 return ignoreAlreadyUsed;
955 }
956
957 public final void setIgnoreAlreadyUsed(final boolean ignoreAlreadyUsed) {
958 this.ignoreAlreadyUsed = ignoreAlreadyUsed;
959 }
960 }
961
962 protected static final List<Arguments> arguments = new ArrayList<Arguments>();
963 private static final String XML_ROOT = "/config/";
964 private static final String XML_SPOOLEDDAEMON = "spooleddaemon";
965 private static final String XML_STOPFILE = "stopfile";
966 private static final String XML_SPOOLED = "spooled";
967 private static final String XML_NAME = "name";
968 private static final String XML_TO = "to";
969 private static final String XML_RULE = "rule";
970 private static final String XML_STATUSFILE = "statusfile";
971 private static final String XML_DIRECTORY = "directory";
972 private static final String XML_REGEX = "regex";
973 private static final String XML_RECURSIVE = "recursive";
974 private static final String XML_ELAPSE = "elapse";
975 private static final String XML_SUBMIT = "submit";
976 private static final String XML_PARALLEL = "parallel";
977 private static final String XML_LIMIT_PARALLEL = "limitParallel";
978 private static final String XML_INFO = "info";
979 private static final String XML_MD_5 = "md5";
980 private static final String XML_BLOCK = "block";
981 private static final String XML_NOLOG = "nolog";
982 private static final String XML_WAARP = "waarp";
983 private static final String XML_ELAPSE_WAARP = "elapseWaarp";
984 private static final String XML_MINIMAL_SIZE = "minimalSize";
985 private static final String XML_LOG_WARN = "logWarn";
986 private static final String XML_IGNORED_ALREADY_USED = "ignoreAlreadyUsed";
987
988 private static final XmlDecl[] subSpooled = {
989 new XmlDecl(XmlType.STRING, XML_NAME),
990 new XmlDecl(XML_TO, XmlType.STRING, XML_TO, true),
991 new XmlDecl(XmlType.STRING, XML_RULE),
992 new XmlDecl(XmlType.STRING, XML_STATUSFILE),
993 new XmlDecl(XML_DIRECTORY, XmlType.STRING, XML_DIRECTORY, true),
994 new XmlDecl(XmlType.STRING, XML_REGEX),
995 new XmlDecl(XmlType.BOOLEAN, XML_RECURSIVE),
996 new XmlDecl(XmlType.LONG, XML_ELAPSE),
997 new XmlDecl(XmlType.BOOLEAN, XML_SUBMIT),
998 new XmlDecl(XmlType.BOOLEAN, XML_PARALLEL),
999 new XmlDecl(XmlType.INTEGER, XML_LIMIT_PARALLEL),
1000 new XmlDecl(XmlType.STRING, XML_INFO),
1001 new XmlDecl(XmlType.BOOLEAN, XML_MD_5),
1002 new XmlDecl(XmlType.INTEGER, XML_BLOCK),
1003 new XmlDecl(XmlType.BOOLEAN, XML_NOLOG),
1004 new XmlDecl(XML_WAARP, XmlType.STRING, XML_WAARP, true),
1005 new XmlDecl(XmlType.LONG, XML_ELAPSE_WAARP),
1006 new XmlDecl(XmlType.BOOLEAN, XML_IGNORED_ALREADY_USED),
1007 new XmlDecl(XmlType.LONG, XML_MINIMAL_SIZE)
1008 };
1009 private static final XmlDecl[] spooled = {
1010 new XmlDecl(XmlType.STRING, XML_STOPFILE),
1011 new XmlDecl(XmlType.BOOLEAN, XML_LOG_WARN),
1012 new XmlDecl(XML_SPOOLED, XmlType.XVAL, XML_SPOOLED, subSpooled, true)
1013 };
1014 private static final XmlDecl[] configSpooled = {
1015 new XmlDecl(XML_SPOOLEDDAEMON, XmlType.XVAL, XML_ROOT + XML_SPOOLEDDAEMON,
1016 spooled, false)
1017 };
1018
1019 @SuppressWarnings("unchecked")
1020 protected static boolean getParamsFromConfigFile(final String filename) {
1021 if (logger == null) {
1022 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
1023 }
1024 final Document document;
1025
1026 try {
1027 document = XmlUtil.getNewSaxReader().read(filename);
1028 } catch (final DocumentException e) {
1029 logger.error(Messages.getString("FileBasedConfiguration.CannotReadXml") +
1030 filename + ": {}", e.getMessage());
1031 return false;
1032 }
1033 if (document == null) {
1034 logger.error(Messages.getString("FileBasedConfiguration.CannotReadXml") +
1035 filename);
1036 return false;
1037 }
1038 final XmlValue[] configuration = XmlUtil.read(document, configSpooled);
1039 final XmlHash hashConfig = new XmlHash(configuration);
1040 XmlValue value = hashConfig.get(XML_STOPFILE);
1041 final String stopfile;
1042 if (value == null || value.isEmpty()) {
1043 return false;
1044 }
1045 stopfile = value.getString();
1046 value = hashConfig.get(XML_LOG_WARN);
1047 boolean logWarn = true;
1048 if (value != null && !value.isEmpty()) {
1049 logWarn = value.getBoolean();
1050 }
1051 value = hashConfig.get(XML_SPOOLED);
1052 if (value != null && value.getList() != null) {
1053 for (final XmlValue[] xml : (Iterable<XmlValue[]>) value.getList()) {
1054 final Arguments arg = new Arguments();
1055 arg.setStopFile(stopfile);
1056 arg.setLogWarn(logWarn);
1057 final XmlHash subHash = new XmlHash(xml);
1058 value = subHash.get(XML_NAME);
1059 if (value != null && !value.isEmpty()) {
1060 arg.setName(value.getString());
1061 }
1062 value = subHash.get(XML_TO);
1063 if (value != null && value.getList() != null) {
1064 for (final String to : (Iterable<String>) value.getList()) {
1065 if (to.trim().isEmpty()) {
1066 continue;
1067 }
1068 arg.getRemoteHosts().add(to.trim());
1069 }
1070 if (arg.getRemoteHosts().isEmpty()) {
1071 logger.warn("to directive is empty but must not");
1072 continue;
1073 }
1074 } else {
1075 logger.warn("to directive is empty but must not");
1076 continue;
1077 }
1078 value = subHash.get(XML_RULE);
1079 if (value != null && !value.isEmpty()) {
1080 arg.setRule(value.getString());
1081 } else {
1082 logger.warn("rule directive is empty but must not");
1083 continue;
1084 }
1085 value = subHash.get(XML_STATUSFILE);
1086 if (value != null && !value.isEmpty()) {
1087 arg.setStatusFile(value.getString());
1088 } else {
1089 logger.warn("statusfile directive is empty but must not");
1090 continue;
1091 }
1092 value = subHash.get(XML_DIRECTORY);
1093 if (value != null && value.getList() != null) {
1094 for (final String dir : (Iterable<String>) value.getList()) {
1095 if (dir.trim().isEmpty()) {
1096 continue;
1097 }
1098 arg.getLocalDirectory().add(dir.trim());
1099 }
1100 if (arg.getLocalDirectory().isEmpty()) {
1101 logger.warn("directory directive is empty but must not");
1102 continue;
1103 }
1104 } else {
1105 logger.warn("directory directive is empty but must not");
1106 continue;
1107 }
1108 value = subHash.get(XML_REGEX);
1109 if (value != null && !value.isEmpty()) {
1110 arg.setRegex(value.getString());
1111 }
1112 value = subHash.get(XML_RECURSIVE);
1113 if (value != null && !value.isEmpty()) {
1114 arg.setRecursive(value.getBoolean());
1115 }
1116 value = subHash.get(XML_ELAPSE);
1117 if (value != null && !value.isEmpty()) {
1118 arg.setElapsed(value.getLong());
1119 }
1120 value = subHash.get(XML_SUBMIT);
1121 if (value != null && !value.isEmpty()) {
1122 arg.setToSubmit(value.getBoolean());
1123 }
1124 value = subHash.get(XML_PARALLEL);
1125 if (value != null && !value.isEmpty()) {
1126 arg.setParallel(value.getBoolean());
1127 }
1128 value = subHash.get(XML_LIMIT_PARALLEL);
1129 if (value != null && !value.isEmpty()) {
1130 arg.setLimitParallel(value.getInteger());
1131 }
1132 value = subHash.get(XML_INFO);
1133 if (value != null && !value.isEmpty()) {
1134 arg.setFileInfo(value.getString());
1135 }
1136 value = subHash.get(XML_MD_5);
1137 if (value != null && !value.isEmpty()) {
1138 arg.setMd5(value.getBoolean());
1139 }
1140 value = subHash.get(XML_BLOCK);
1141 if (value != null && !value.isEmpty()) {
1142 arg.setBlock(value.getInteger());
1143 }
1144 value = subHash.get(XML_NOLOG);
1145 if (value != null && !value.isEmpty()) {
1146 arg.setNoLog(value.getBoolean());
1147 }
1148 value = subHash.get(XML_WAARP);
1149 if (value != null && value.getList() != null) {
1150 for (final String host : (Iterable<String>) value.getList()) {
1151 if (host.trim().isEmpty()) {
1152 continue;
1153 }
1154 arg.getWaarpHosts().add(host.trim());
1155 }
1156 }
1157 value = subHash.get(XML_ELAPSE_WAARP);
1158 if (value != null && !value.isEmpty()) {
1159 arg.setElapsedWaarp(value.getLong());
1160 }
1161 value = subHash.get(XML_MINIMAL_SIZE);
1162 if (value != null && !value.isEmpty()) {
1163 arg.setMinimalSize(value.getLong());
1164 }
1165 value = subHash.get(XML_IGNORED_ALREADY_USED);
1166 if (value != null && !value.isEmpty()) {
1167 arg.setIgnoreAlreadyUsed(value.getBoolean());
1168 }
1169 arguments.add(arg);
1170 }
1171 }
1172 hashConfig.clear();
1173 return !arguments.isEmpty();
1174 }
1175
1176
1177
1178
1179
1180
1181
1182
1183 protected static boolean getParams(final String[] args) {
1184 if (logger == null) {
1185 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
1186 }
1187 infoArgs = Messages.getString("SpooledDirectoryTransfer.0");
1188 if (args.length < 1) {
1189 logger.error(infoArgs);
1190 return false;
1191 }
1192 if (!FileBasedConfiguration.setClientConfigurationFromXml(
1193 Configuration.configuration, args[0])) {
1194 logger.error(
1195 Messages.getString("Configuration.NeedCorrectConfig"));
1196 return false;
1197 }
1198
1199 if (!getParamsFromConfigFile(args[0])) {
1200 if (args.length < 11) {
1201 logger.error(infoArgs);
1202 return false;
1203 }
1204
1205 final Arguments arg = new Arguments();
1206 arg.setBlock(Configuration.configuration.getBlockSize());
1207 int i = 1;
1208 try {
1209 for (i = 1; i < args.length; i++) {
1210 if ("-to".equalsIgnoreCase(args[i])) {
1211 i++;
1212 final String[] rhosts = args[i].split(",");
1213 for (String string : rhosts) {
1214 string = string.trim();
1215 if (string.isEmpty()) {
1216 continue;
1217 }
1218 if (Configuration.configuration.getAliases()
1219 .containsKey(string)) {
1220 string = Configuration.configuration.getAliases().get(string);
1221 }
1222 arg.getRemoteHosts().add(string);
1223 }
1224 } else if ("-name".equalsIgnoreCase(args[i])) {
1225 i++;
1226 arg.setName(args[i]);
1227 } else if ("-directory".equalsIgnoreCase(args[i])) {
1228 i++;
1229 final String[] dir = args[i].split(",");
1230 for (final String string : dir) {
1231 if (string.trim().isEmpty()) {
1232 continue;
1233 }
1234 arg.getLocalDirectory().add(string.trim());
1235 }
1236 } else if ("-rule".equalsIgnoreCase(args[i])) {
1237 i++;
1238 arg.setRule(args[i]);
1239 } else if ("-statusfile".equalsIgnoreCase(args[i])) {
1240 i++;
1241 arg.setStatusFile(args[i]);
1242 } else if ("-stopfile".equalsIgnoreCase(args[i])) {
1243 i++;
1244 arg.setStopFile(args[i]);
1245 } else if ("-info".equalsIgnoreCase(args[i])) {
1246 i++;
1247 arg.setFileInfo(args[i]);
1248 } else if ("-md5".equalsIgnoreCase(args[i])) {
1249 arg.setMd5(true);
1250 } else if ("-block".equalsIgnoreCase(args[i])) {
1251 i++;
1252 arg.setBlock(Integer.parseInt(args[i]));
1253 if (arg.getBlock() < 100) {
1254 logger.error(Messages.getString("AbstractTransfer.1") +
1255 arg.getBlock());
1256 return false;
1257 }
1258 } else if ("-nolog".equalsIgnoreCase(args[i])) {
1259 arg.setNoLog(true);
1260 } else if ("-submit".equalsIgnoreCase(args[i])) {
1261 arg.setToSubmit(true);
1262 } else if ("-direct".equalsIgnoreCase(args[i])) {
1263 arg.setToSubmit(false);
1264 } else if ("-recursive".equalsIgnoreCase(args[i])) {
1265 arg.setRecursive(true);
1266 } else if ("-logWarn".equalsIgnoreCase(args[i])) {
1267 arg.setLogWarn(true);
1268 } else if ("-notlogWarn".equalsIgnoreCase(args[i])) {
1269 arg.setLogWarn(false);
1270 } else if ("-regex".equalsIgnoreCase(args[i])) {
1271 i++;
1272 arg.setRegex(args[i]);
1273 } else if ("-waarp".equalsIgnoreCase(args[i])) {
1274 i++;
1275 final String[] host = args[i].split(",");
1276 for (final String string : host) {
1277 if (string.trim().isEmpty()) {
1278 continue;
1279 }
1280 arg.getWaarpHosts().add(string.trim());
1281 }
1282 } else if ("-elapse".equalsIgnoreCase(args[i])) {
1283 i++;
1284 arg.setElapsed(Long.parseLong(args[i]));
1285 } else if ("-elapseWaarp".equalsIgnoreCase(args[i])) {
1286 i++;
1287 arg.setElapsedWaarp(Long.parseLong(args[i]));
1288 } else if ("-minimalSize".equalsIgnoreCase(args[i])) {
1289 i++;
1290 arg.setMinimalSize(Long.parseLong(args[i]));
1291 } else if ("-limitParallel".equalsIgnoreCase(args[i])) {
1292 i++;
1293 arg.setLimitParallel(Integer.parseInt(args[i]));
1294 } else if ("-parallel".equalsIgnoreCase(args[i])) {
1295 arg.setParallel(true);
1296 } else if ("-sequential".equalsIgnoreCase(args[i])) {
1297 arg.setParallel(false);
1298 } else if ("-ignoreAlreadyUsed".equalsIgnoreCase(args[i])) {
1299 arg.setIgnoreAlreadyUsed(true);
1300 }
1301 }
1302 } catch (final NumberFormatException e) {
1303 logger.error(
1304 Messages.getString("AbstractTransfer.20") + i);
1305 return false;
1306 }
1307 if (arg.getFileInfo() == null) {
1308 arg.setFileInfo(NO_INFO_ARGS);
1309 }
1310 if (arg.getName() == null) {
1311 arg.setName(Configuration.configuration.getHostId() + " : " +
1312 arg.getLocalDirectory());
1313 }
1314 if (!arg.getRemoteHosts().isEmpty() && arg.getRule() != null &&
1315 !arg.getLocalDirectory().isEmpty() && arg.getStatusFile() != null &&
1316 arg.getStopFile() != null) {
1317 arguments.add(arg);
1318 return true;
1319 }
1320 logger.error(Messages.getString("SpooledDirectoryTransfer.56") +
1321
1322 infoArgs);
1323 return false;
1324 }
1325 return !arguments.isEmpty();
1326 }
1327
1328 public static void main(final String[] args) {
1329 WaarpLoggerFactory.setDefaultFactoryIfNotSame(
1330 new WaarpSlf4JLoggerFactory(null));
1331 if (logger == null) {
1332 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
1333 }
1334 initialize(args, true);
1335 }
1336
1337 public static final List<SpooledDirectoryTransfer> list =
1338 new ArrayList<SpooledDirectoryTransfer>();
1339 public static NetworkTransaction networkTransactionStatic;
1340 public static ExecutorService executorService;
1341
1342
1343
1344
1345
1346
1347
1348
1349 public static boolean initialize(final String[] args,
1350 final boolean normalStart) {
1351 if (logger == null) {
1352 logger = WaarpLoggerFactory.getLogger(SpooledDirectoryTransfer.class);
1353 }
1354 arguments.clear();
1355 if (!getParams(args)) {
1356 logger.error(Messages.getString("Configuration.WrongInit"));
1357 if (admin != null) {
1358 admin.close();
1359 }
1360 if (normalStart) {
1361 WaarpSystemUtil.systemExit(2);
1362 }
1363 return false;
1364 }
1365
1366 Configuration.configuration.pipelineInit();
1367 networkTransactionStatic = new NetworkTransaction();
1368 try {
1369 executorService = Executors.newCachedThreadPool(
1370 new WaarpThreadFactory("SpooledDirectoryDaemon"));
1371 for (final Arguments arg : arguments) {
1372 final R66Future future = new R66Future(true);
1373 final SpooledDirectoryTransfer spooled =
1374 new SpooledDirectoryTransfer(future, arg, networkTransactionStatic);
1375 executorService.submit(spooled);
1376 list.add(spooled);
1377 }
1378 arguments.clear();
1379 Thread.sleep(1000);
1380 executorService.shutdown();
1381 Configuration.configuration.launchStatistics();
1382 if (normalStart) {
1383 while (!executorService.awaitTermination(
1384 Configuration.configuration.getTimeoutCon(),
1385 TimeUnit.MILLISECONDS)) {
1386 Thread.sleep(Configuration.configuration.getTimeoutCon());
1387 }
1388 for (final SpooledDirectoryTransfer spooledDirectoryTransfer : list) {
1389 logger.warn(Messages.getString("SpooledDirectoryTransfer.58") +
1390 spooledDirectoryTransfer.name + ": " +
1391 spooledDirectoryTransfer.getSent() + " success, " +
1392 spooledDirectoryTransfer.getError() + Messages.getString(
1393 "SpooledDirectoryTransfer.60"));
1394 }
1395 list.clear();
1396 }
1397 return true;
1398 } catch (final Throwable e) {
1399 logger.error("Exception", e);
1400 return false;
1401 } finally {
1402 if (normalStart) {
1403 WaarpShutdownHook.shutdownWillStart();
1404 networkTransactionStatic.closeAll();
1405 WaarpSystemUtil.systemExit(0);
1406 }
1407 }
1408 }
1409
1410
1411
1412
1413 public final long getSent() {
1414 return sent;
1415 }
1416
1417
1418
1419
1420 private void setSent(final long sent) {
1421 this.sent = sent;
1422 }
1423
1424
1425
1426
1427 public final long getError() {
1428 return error;
1429 }
1430
1431
1432
1433
1434 private void setError(final long error) {
1435 this.error = error;
1436 }
1437
1438 }