1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.common.filemonitor;
21
22 import com.fasterxml.jackson.annotation.JsonTypeInfo;
23 import com.fasterxml.jackson.core.JsonGenerationException;
24 import com.fasterxml.jackson.core.JsonParseException;
25 import com.fasterxml.jackson.core.type.TypeReference;
26 import com.fasterxml.jackson.databind.JsonMappingException;
27 import io.netty.util.HashedWheelTimer;
28 import io.netty.util.Timeout;
29 import io.netty.util.Timer;
30 import io.netty.util.TimerTask;
31 import org.waarp.common.database.DbConstant;
32 import org.waarp.common.digest.FilesystemBasedDigest;
33 import org.waarp.common.digest.FilesystemBasedDigest.DigestAlgo;
34 import org.waarp.common.file.AbstractDir;
35 import org.waarp.common.future.WaarpFuture;
36 import org.waarp.common.json.JsonHandler;
37 import org.waarp.common.logging.SysErrLogger;
38 import org.waarp.common.logging.WaarpLogger;
39 import org.waarp.common.logging.WaarpLoggerFactory;
40 import org.waarp.common.utility.WaarpThreadFactory;
41
42 import java.io.File;
43 import java.io.FileFilter;
44 import java.io.IOException;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Calendar;
48 import java.util.Date;
49 import java.util.GregorianCalendar;
50 import java.util.HashMap;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Map.Entry;
55 import java.util.Set;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedQueue;
58 import java.util.concurrent.ExecutionException;
59 import java.util.concurrent.ExecutorService;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.Future;
62 import java.util.concurrent.TimeUnit;
63 import java.util.concurrent.atomic.AtomicLong;
64
65
66
67
68
69
70
71 public class FileMonitor {
72
73
74
75 protected static volatile WaarpLogger logger;
76 protected static final DigestAlgo defaultDigestAlgo = DigestAlgo.MD5;
77 protected static final long MINIMAL_DELAY = 100;
78 protected static final long DEFAULT_DELAY = 1000;
79 protected static final long DEFAULT_CHECK_DELAY = 300000;
80 private static final TypeReference<HashMap<String, FileItem>> typeReference =
81 new TypeReference<HashMap<String, FileItem>>() {
82 };
83
84 protected WaarpFuture future;
85 protected WaarpFuture internalfuture;
86 protected boolean stopped;
87 protected final String name;
88 protected final File statusFile;
89 protected final File stopFile;
90 protected final List<File> directories = new ArrayList<File>();
91 protected final DigestAlgo digest;
92 protected long elapseTime = DEFAULT_DELAY;
93 protected long elapseWaarpTime = -1;
94 protected long checkDelay = DEFAULT_CHECK_DELAY;
95 protected Timer timer;
96 protected Timer timerWaarp;
97
98 protected final boolean scanSubDir;
99
100 protected boolean ignoreAlreadyUsed = false;
101
102 protected boolean initialized;
103 protected File checkFile;
104
105 protected final ConcurrentHashMap<String, FileItem> fileItems =
106 new ConcurrentHashMap<String, FileItem>();
107 protected final ConcurrentHashMap<String, FileItem> lastFileItems =
108 new ConcurrentHashMap<String, FileItem>();
109
110 protected FileFilter filter = new FileFilter() {
111 @Override
112 public final boolean accept(final File pathname) {
113 return pathname.isFile();
114 }
115 };
116 protected final FileMonitorCommandRunnableFuture commandValidFile;
117 protected FileMonitorCommandFactory commandValidFileFactory;
118 protected ExecutorService executor;
119 protected int fixedThreadPool;
120 protected final FileMonitorCommandRunnableFuture commandRemovedFile;
121 protected FileMonitorCommandRunnableFuture commandCheckIteration;
122
123 protected final ConcurrentLinkedQueue<FileItem> toUse =
124 new ConcurrentLinkedQueue<FileItem>();
125 protected final ConcurrentLinkedQueue<Future<?>> results =
126 new ConcurrentLinkedQueue<Future<?>>();
127
128 protected final AtomicLong globalok = new AtomicLong(0);
129 protected final AtomicLong globalerror = new AtomicLong(0);
130 protected final AtomicLong todayok = new AtomicLong(0);
131 protected final AtomicLong todayerror = new AtomicLong(0);
132 protected Date nextDay;
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 public FileMonitor(final String name, final File statusFile,
162 final File stopFile, final File directory,
163 final DigestAlgo digest, final long elapseTime,
164 final FileFilter filter, final boolean scanSubdir,
165 final FileMonitorCommandRunnableFuture commandValidFile,
166 final FileMonitorCommandRunnableFuture commandRemovedFile,
167 final FileMonitorCommandRunnableFuture commandCheckIteration) {
168 if (logger == null) {
169 logger = WaarpLoggerFactory.getLogger(FileMonitor.class);
170 }
171 this.name = name;
172 this.statusFile = statusFile;
173 this.stopFile = stopFile;
174 directories.add(directory);
175 scanSubDir = scanSubdir;
176 if (digest == null) {
177 this.digest = defaultDigestAlgo;
178 } else {
179 this.digest = digest;
180 }
181 if (elapseTime >= MINIMAL_DELAY) {
182 this.elapseTime = (elapseTime / 10) * 10;
183 }
184 if (filter != null) {
185 this.filter = filter;
186 }
187 this.commandValidFile = commandValidFile;
188 this.commandRemovedFile = commandRemovedFile;
189 this.commandCheckIteration = commandCheckIteration;
190 if (statusFile != null) {
191 checkFile = new File(statusFile.getAbsolutePath() + ".chk");
192 }
193 reloadStatus();
194 setNextDay();
195 }
196
197 protected final void setNextDay() {
198 final Calendar c = new GregorianCalendar();
199 c.set(Calendar.HOUR_OF_DAY, 0);
200 c.set(Calendar.MINUTE, 0);
201 c.set(Calendar.SECOND, 0);
202 c.set(Calendar.MILLISECOND, 0);
203 c.add(Calendar.DAY_OF_MONTH, 1);
204 nextDay = c.getTime();
205 }
206
207
208
209
210
211
212
213 public final void setCommandCheckIteration(
214 final FileMonitorCommandRunnableFuture commandCheckIteration) {
215 this.commandCheckIteration = commandCheckIteration;
216 }
217
218
219
220
221
222
223
224
225 public final void setCommandValidFileFactory(
226 final FileMonitorCommandFactory factory, final int fixedPool) {
227 commandValidFileFactory = factory;
228 fixedThreadPool = fixedPool;
229 }
230
231
232
233
234 public final long getElapseWaarpTime() {
235 return elapseWaarpTime;
236 }
237
238
239
240
241
242
243
244
245 public final void setElapseWaarpTime(final long elapseWaarpTime) {
246 if (elapseWaarpTime >= DEFAULT_DELAY) {
247 this.elapseWaarpTime = (elapseWaarpTime / 10) * 10;
248 }
249 }
250
251
252
253
254 public final boolean isIgnoreAlreadyUsed() {
255 return ignoreAlreadyUsed;
256 }
257
258
259
260
261
262 public final void setIgnoreAlreadyUsed(final boolean ignoreAlreadyUsed) {
263 this.ignoreAlreadyUsed = ignoreAlreadyUsed;
264 }
265
266
267
268
269
270 public final void setCheckDelay(final long checkDelay) {
271 this.checkDelay = checkDelay;
272 }
273
274
275
276
277
278
279 public final void addDirectory(final File directory) {
280 synchronized (directories) {
281 if (!directories.contains(directory)) {
282 directories.add(directory);
283 }
284 }
285 }
286
287
288
289
290
291
292 public final void removeDirectory(final File directory) {
293 synchronized (directories) {
294 directories.remove(directory);
295 }
296 }
297
298 protected final void setThreadName() {
299 Thread.currentThread().setName("FileMonitor_" + name);
300 }
301
302 private boolean testChkFile() {
303 if (checkFile.exists()) {
304 deleteChkFile();
305 final long time = elapseTime * 10;
306 logger.warn(
307 "Waiting to check if another Monitor is running with the same configuration: " +
308 time / 1000 + 's');
309 try {
310 Thread.sleep(time);
311 } catch (final InterruptedException e) {
312 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
313 }
314 return checkFile.exists();
315 }
316 return false;
317 }
318
319 private void createChkFile() {
320 try {
321 if (!checkFile.createNewFile()) {
322 logger.info("Cannot create Check File");
323 }
324 } catch (final IOException ignored) {
325 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
326 }
327 }
328
329 private void deleteChkFile() {
330 if (!checkFile.delete()) {
331 logger.info("Cannot delete Check File");
332 }
333 }
334
335 protected final void reloadStatus() {
336 if (statusFile == null) {
337 return;
338 }
339 if (!statusFile.exists()) {
340 initialized = true;
341 return;
342 }
343 if (testChkFile()) {
344
345 logger.warn(
346 "Error: One other monitor is probably running using the same status file: " +
347 statusFile);
348 return;
349 }
350 synchronized (directories) {
351 try {
352 final HashMap<String, FileItem> newHashMap =
353 JsonHandler.mapper.readValue(statusFile, typeReference);
354 fileItems.putAll(newHashMap);
355 initialized = true;
356 } catch (final JsonParseException ignored) {
357 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
358 } catch (final JsonMappingException ignored) {
359 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
360 } catch (final IOException ignored) {
361 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
362 }
363 }
364 }
365
366
367
368
369 public final boolean initialized() {
370 return initialized;
371 }
372
373 protected final void saveStatus() {
374 if (statusFile == null) {
375 return;
376 }
377 synchronized (directories) {
378 try {
379 JsonHandler.mapper.writeValue(statusFile, fileItems);
380 createChkFile();
381 } catch (final JsonGenerationException ignored) {
382 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
383 } catch (final JsonMappingException ignored) {
384 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
385 } catch (final IOException ignored) {
386 SysErrLogger.FAKE_LOGGER.ignoreLog(ignored);
387 }
388 }
389 }
390
391
392
393
394
395
396 public final long getCurrentHistoryNb() {
397 synchronized (directories) {
398 if (fileItems != null) {
399 return fileItems.size();
400 }
401 }
402 return -1;
403 }
404
405
406
407
408 public final void setNextAsFullStatus() {
409 lastFileItems.clear();
410 }
411
412
413
414
415 public final String getStatus() {
416 Set<String> removedFileItems = null;
417 final ConcurrentHashMap<String, FileItem> newFileItems =
418 new ConcurrentHashMap<String, FileItem>();
419 synchronized (directories) {
420 if (!lastFileItems.isEmpty()) {
421 removedFileItems = ((Map<String, FileItem>) lastFileItems).keySet();
422 removedFileItems.removeAll(
423 ((Map<String, FileItem>) fileItems).keySet());
424 for (final Entry<String, FileItem> key : fileItems.entrySet()) {
425 if (!key.getValue().isStrictlySame(lastFileItems.get(key.getKey()))) {
426 newFileItems.put(key.getKey(), key.getValue());
427 }
428 }
429 } else {
430 for (final Entry<String, FileItem> key : fileItems.entrySet()) {
431 newFileItems.put(key.getKey(), key.getValue());
432 }
433 }
434 final FileMonitorInformation fileMonitorInformation =
435 new FileMonitorInformation(name, newFileItems, removedFileItems,
436 directories, stopFile, statusFile,
437 elapseTime, scanSubDir, globalok,
438 globalerror, todayok, todayerror);
439 for (final Entry<String, FileItem> key : fileItems.entrySet()) {
440 final FileItem clone = key.getValue().clone();
441 lastFileItems.put(key.getKey(), clone);
442 }
443 createChkFile();
444 final String status = JsonHandler.writeAsString(fileMonitorInformation);
445 if (removedFileItems != null) {
446 removedFileItems.clear();
447 }
448 newFileItems.clear();
449 return status;
450 }
451 }
452
453
454
455
456 public final long getElapseTime() {
457 return elapseTime;
458 }
459
460
461
462
463 public final void setElapseTime(final long elapseTime) {
464 this.elapseTime = elapseTime;
465 }
466
467
468
469
470 public final void setFilter(final FileFilter filter) {
471 this.filter = filter;
472 }
473
474 public final void start() {
475 if (timer == null) {
476 timer = new HashedWheelTimer(
477 new WaarpThreadFactory("TimerFileMonitor_" + name), 100,
478 TimeUnit.MILLISECONDS, 8);
479 future = new WaarpFuture(true);
480 internalfuture = new WaarpFuture(true);
481 if (commandValidFileFactory != null && executor == null) {
482 if (fixedThreadPool > 1) {
483 executor = Executors.newFixedThreadPool(fixedThreadPool,
484 new WaarpThreadFactory(
485 "FileMonitorRunner_" +
486 name));
487 } else if (fixedThreadPool == 0) {
488 executor = Executors.newCachedThreadPool(
489 new WaarpThreadFactory("FileMonitorRunner_" + name));
490 }
491 }
492 timer.newTimeout(new FileMonitorTimerTask(this), elapseTime,
493 TimeUnit.MILLISECONDS);
494 }
495 if (elapseWaarpTime >= DEFAULT_DELAY && timerWaarp == null &&
496 commandCheckIteration != null) {
497 timerWaarp = new HashedWheelTimer(
498 new WaarpThreadFactory("TimerFileMonitorWaarp_" + name), 100,
499 TimeUnit.MILLISECONDS, 8);
500 timerWaarp.newTimeout(
501 new FileMonitorTimerInformationTask(commandCheckIteration),
502 elapseWaarpTime, TimeUnit.MILLISECONDS);
503 }
504 }
505
506 public final void stop() {
507 initialized = false;
508 stopped = true;
509 if (timerWaarp != null) {
510 timerWaarp.stop();
511 }
512 if (internalfuture != null) {
513 internalfuture.awaitOrInterruptible(elapseTime * 2);
514 internalfuture.setSuccess();
515 }
516 if (timer != null) {
517 timer.stop();
518 }
519 timer = null;
520 timerWaarp = null;
521 if (executor != null) {
522 executor.shutdown();
523 executor = null;
524 }
525 deleteChkFile();
526 if (future != null) {
527 future.setSuccess();
528 }
529 }
530
531
532
533
534 public final File peek() {
535 final FileItem item = toUse.peek();
536 if (item == null) {
537 return null;
538 }
539 return item.file;
540 }
541
542
543
544
545 public final File poll() {
546 final FileItem item = toUse.poll();
547 if (item == null) {
548 return null;
549 }
550 return item.file;
551 }
552
553
554
555
556 public final void waitForStopFile() {
557 internalfuture.awaitOrInterruptible();
558 stop();
559 }
560
561 private boolean checkStop() {
562 if (stopped || stopFile.exists()) {
563 logger.warn(
564 "STOPPING the FileMonitor {} since condition is fullfilled: stop file found ({}): " +
565 stopFile.exists(), name, stopFile);
566 internalfuture.setSuccess();
567 return true;
568 }
569 return false;
570 }
571
572
573
574
575
576
577 protected final boolean checkFiles() {
578 setThreadName();
579 boolean fileItemsChanged = false;
580 if (checkStop()) {
581 return false;
582 }
583 synchronized (directories) {
584 for (final File directory : directories) {
585 logger.info("Scan: {}", directory);
586 fileItemsChanged = checkOneDir(fileItemsChanged, directory);
587 }
588 }
589 setThreadName();
590 boolean error = false;
591
592 for (final Future<?> futureResult : results) {
593 createChkFile();
594 try {
595 futureResult.get();
596 } catch (final InterruptedException e) {
597 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
598 logger.info("Interruption so exit");
599 error = true;
600 } catch (final ExecutionException e) {
601 logger.error("Exception during execution: {}", e.getMessage());
602 error = true;
603 } catch (final Throwable e) {
604 logger.error("Exception during execution: {}", e.getMessage());
605 error = true;
606 }
607 }
608 logger.debug("Scan over");
609 results.clear();
610 if (error) {
611
612
613 return false;
614 }
615
616 final List<FileItem> todel = new LinkedList<FileItem>();
617 synchronized (directories) {
618 for (final FileItem item : fileItems.values()) {
619 if (item.file != null && item.file.isFile()) {
620 continue;
621 }
622 todel.add(item);
623 }
624
625 for (final FileItem fileItem : todel) {
626 final String newName =
627 AbstractDir.normalizePath(fileItem.file.getAbsolutePath());
628 fileItems.remove(newName);
629 toUse.remove(fileItem);
630 if (commandRemovedFile != null) {
631 commandRemovedFile.run(fileItem);
632 }
633 fileItem.file = null;
634 fileItem.hash = null;
635 fileItemsChanged = true;
636 }
637 }
638 if (fileItemsChanged) {
639 saveStatus();
640 } else {
641 createChkFile();
642 }
643 if (checkStop()) {
644 return false;
645 }
646 logger.debug("Finishing step");
647
648 if (commandCheckIteration != null && timerWaarp == null) {
649 commandCheckIteration.run(null);
650 }
651 return true;
652 }
653
654 private void setIfAlreadyUsed(final FileItem fileItem, final boolean valid) {
655 if (!ignoreAlreadyUsed && fileItem.specialId != DbConstant.ILLEGALVALUE &&
656 fileItem.used) {
657 switch (fileItem.status) {
658 case START:
659 fileItem.status = Status.CHANGING;
660 break;
661 case CHANGING:
662 if (valid) {
663 fileItem.status = Status.VALID;
664 }
665 break;
666 case VALID:
667 if (valid) {
668 fileItem.status = Status.RESTART;
669 }
670 break;
671 case DONE:
672 if (valid) {
673 fileItem.status = Status.START;
674 }
675 break;
676 case RESTART:
677 break;
678 }
679 } else {
680 switch (fileItem.status) {
681 case START:
682 fileItem.status = Status.CHANGING;
683 break;
684 case CHANGING:
685 if (valid) {
686 fileItem.status = Status.VALID;
687 }
688 break;
689 case VALID:
690 case DONE:
691 case RESTART:
692 break;
693 }
694 }
695 }
696
697
698
699
700
701
702
703 protected final boolean checkOneDir(boolean fileItemsChanged,
704 final File directory) {
705 try {
706 File[] files = directory.listFiles(filter);
707 for (final File file : files) {
708 if (checkStop()) {
709 return false;
710 }
711 if (file.isDirectory()) {
712 continue;
713 }
714 final String newName =
715 AbstractDir.normalizePath(file.getAbsolutePath());
716 final FileItem fileItem;
717 synchronized (directories) {
718 fileItem = fileItems.get(newName);
719 if (fileItem == null) {
720
721 fileItems.put(newName, new FileItem(file));
722 fileItemsChanged = true;
723 continue;
724 }
725 if (fileItem.used && ignoreAlreadyUsed) {
726
727 continue;
728 }
729 }
730 logger.debug("File check: {}", fileItem);
731 final long size = fileItem.file.length();
732 if (size != fileItem.size) {
733 fileItemsChanged = isFileItemsChangedOnSize(fileItem, size);
734 continue;
735 }
736 final long lastTimeModified = fileItem.file.lastModified();
737 if (lastTimeModified != fileItem.lastTime) {
738 fileItemsChanged =
739 isFileItemsChangedOnLastTimeModified(fileItem, lastTimeModified);
740 continue;
741 }
742
743 try {
744 final byte[] hash =
745 FilesystemBasedDigest.getHash(fileItem.file, true, digest);
746 if (hash == null || fileItem.hash == null) {
747
748 fileItemsChanged = isFileItemsChangedOnHash(fileItem, hash);
749 continue;
750 }
751 if (!Arrays.equals(hash, fileItem.hash)) {
752
753 fileItemsChanged = isFileItemsChangedOnHash(fileItem, hash);
754 continue;
755 } else {
756 setIfAlreadyUsed(fileItem, fileItem.status != Status.DONE);
757 }
758 if (checkStop()) {
759 return false;
760 }
761 boolean toIgnore = false;
762 if (!ignoreAlreadyUsed && fileItem.used &&
763 fileItem.specialId != DbConstant.ILLEGALVALUE) {
764 if (fileItem.status != Status.RESTART) {
765 logger.debug("File Ignore check: {}", fileItem);
766 toIgnore = true;
767 }
768 }
769 logger.debug("File Run check: {}", fileItem);
770
771 fileItem.timeUsed = System.currentTimeMillis();
772 if (commandValidFileFactory != null) {
773 final FileMonitorCommandRunnableFuture torun =
774 commandValidFileFactory.create(fileItem);
775 if (!torun.checkFileItemBusiness(fileItem)) {
776 logger.debug("File Ignore Business check: {}", fileItem);
777 continue;
778 }
779 if (toIgnore) {
780 continue;
781 }
782 if (executor != null) {
783 final Future<?> torunFuture = executor.submit(torun);
784 results.add(torunFuture);
785 } else {
786 torun.run(fileItem);
787 }
788 } else if (commandValidFile != null) {
789 if (!commandValidFile.checkFileItemBusiness(fileItem)) {
790 logger.debug("File Ignore Business check: {}", fileItem);
791 continue;
792 }
793 if (toIgnore) {
794 continue;
795 }
796 commandValidFile.run(fileItem);
797 } else {
798 if (toIgnore) {
799 continue;
800 }
801 toUse.add(fileItem);
802 }
803 fileItemsChanged = true;
804 } catch (final Throwable e) {
805 setThreadName();
806 logger.error("Error during final file check: {}", e.getMessage());
807 }
808 }
809 if (scanSubDir) {
810 files = directory.listFiles();
811 for (final File file : files) {
812 if (checkStop()) {
813 return false;
814 }
815 if (file.isDirectory()) {
816 fileItemsChanged = checkOneDir(fileItemsChanged, file);
817 }
818 }
819 }
820 } catch (final Throwable e) {
821 setThreadName();
822 logger.error("Issue during Directory and File Checking: {}",
823 e.getMessage());
824
825 }
826 return fileItemsChanged;
827 }
828
829 private boolean isFileItemsChangedOnHash(final FileItem fileItem,
830 final byte[] hash) {
831 fileItem.hash = hash;
832 fileItem.status = Status.CHANGING;
833 logger.debug("File Hash check: {}", fileItem);
834 return true;
835 }
836
837 private boolean isFileItemsChangedOnLastTimeModified(final FileItem fileItem,
838 final long lastTimeModified) {
839
840 fileItem.lastTime = lastTimeModified;
841 if (!ignoreAlreadyUsed && fileItem.used) {
842 fileItem.hash = null;
843 }
844 fileItem.status = Status.CHANGING;
845 logger.debug("File Change check: {}({})", fileItem, lastTimeModified);
846 return true;
847 }
848
849 private boolean isFileItemsChangedOnSize(final FileItem fileItem,
850 final long size) {
851
852 fileItem.size = size;
853 fileItem.status = Status.CHANGING;
854 logger.debug("File Size check: {}({})", fileItem, size);
855 return true;
856 }
857
858
859
860
861 protected static class FileMonitorTimerTask implements TimerTask {
862 protected final FileMonitor fileMonitor;
863
864
865
866
867 protected FileMonitorTimerTask(final FileMonitor fileMonitor) {
868 if (logger == null) {
869 logger = WaarpLoggerFactory.getLogger(FileMonitor.class);
870 }
871 this.fileMonitor = fileMonitor;
872 }
873
874 @Override
875 public void run(final Timeout timeout) {
876 try {
877 if (fileMonitor.checkFiles()) {
878 fileMonitor.setThreadName();
879 if (fileMonitor.timer != null) {
880 try {
881 fileMonitor.timer.newTimeout(this, fileMonitor.elapseTime,
882 TimeUnit.MILLISECONDS);
883 } catch (final Throwable e) {
884 logger.error("Error while pushing next filemonitor step: {}",
885 e.getMessage());
886
887 fileMonitor.internalfuture.setSuccess();
888 }
889 } else {
890 logger.warn("No Timer found");
891 fileMonitor.internalfuture.setSuccess();
892 }
893 } else {
894 fileMonitor.setThreadName();
895 logger.warn("Stop file found");
896 fileMonitor.deleteChkFile();
897 fileMonitor.internalfuture.setSuccess();
898 }
899 } catch (final Throwable e) {
900 fileMonitor.setThreadName();
901 logger.error("Issue during Directory and File Checking: {}",
902 e.getMessage());
903 fileMonitor.internalfuture.setSuccess();
904 }
905 }
906
907 }
908
909
910
911
912
913 protected class FileMonitorTimerInformationTask implements TimerTask {
914 protected final FileMonitorCommandRunnableFuture informationMonitorCommand;
915
916
917
918
919 protected FileMonitorTimerInformationTask(
920 final FileMonitorCommandRunnableFuture informationMonitorCommand) {
921 if (logger == null) {
922 logger = WaarpLoggerFactory.getLogger(FileMonitor.class);
923 }
924 this.informationMonitorCommand = informationMonitorCommand;
925 }
926
927 @Override
928 public void run(final Timeout timeout) {
929 try {
930 Thread.currentThread().setName("FileMonitorInformation_" + name);
931 if (!checkStop()) {
932 informationMonitorCommand.run(null);
933 if (timerWaarp != null && !checkStop()) {
934 try {
935 timerWaarp.newTimeout(this, elapseWaarpTime,
936 TimeUnit.MILLISECONDS);
937 } catch (final Throwable e) {
938
939 logger.error("Error during nex filemonitor information step: {}",
940 e.getMessage());
941 internalfuture.setSuccess();
942 }
943 } else {
944 if (timerWaarp != null) {
945 logger.warn("Stop file found");
946 } else {
947 logger.warn("No Timer found");
948 }
949 internalfuture.setSuccess();
950 }
951 } else {
952 logger.warn("Stop file found");
953 internalfuture.setSuccess();
954 }
955 } catch (final Throwable e) {
956
957 Thread.currentThread().setName("FileMonitorInformation_" + name);
958 logger.error("Error during nex filemonitor information step: {}",
959 e.getMessage());
960 internalfuture.setSuccess();
961 }
962 }
963 }
964
965
966
967
968 @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
969 public static class FileMonitorInformation {
970 public String name;
971 public ConcurrentHashMap<String, FileItem> fileItems;
972 public Set<String> removedFileItems;
973 public List<File> directories;
974 public File stopFile;
975 public File statusFile;
976 public long elapseTime;
977 public boolean scanSubDir;
978 public AtomicLong globalok;
979 public AtomicLong globalerror;
980 public AtomicLong todayok;
981 public AtomicLong todayerror;
982
983 public FileMonitorInformation() {
984
985 }
986
987 protected FileMonitorInformation(final String name,
988 final ConcurrentHashMap<String, FileItem> fileItems,
989 final Set<String> removedFileItems,
990 final List<File> directories,
991 final File stopFile, final File statusFile,
992 final long elapseTime,
993 final boolean scanSubDir,
994 final AtomicLong globalok,
995 final AtomicLong globalerror,
996 final AtomicLong todayok,
997 final AtomicLong todayerror) {
998 this.name = name;
999 this.fileItems = fileItems;
1000 this.removedFileItems = removedFileItems;
1001 this.directories = directories;
1002 this.stopFile = stopFile;
1003 this.statusFile = statusFile;
1004 this.elapseTime = elapseTime;
1005 this.scanSubDir = scanSubDir;
1006 this.globalok = globalok;
1007 this.globalerror = globalerror;
1008 this.todayok = todayok;
1009 this.todayerror = todayerror;
1010 }
1011
1012 }
1013
1014 public enum Status {
1015 START, CHANGING, VALID, DONE, RESTART
1016 }
1017
1018
1019
1020
1021 public static class FileItem implements Cloneable {
1022 public File file;
1023 public long size;
1024 public byte[] hash;
1025 public long lastTime = Long.MIN_VALUE;
1026 public long timeUsed = Long.MIN_VALUE;
1027 public boolean used;
1028 public Status status = Status.START;
1029 public long specialId = DbConstant.ILLEGALVALUE;
1030
1031 public FileItem() {
1032
1033 }
1034
1035
1036
1037
1038 protected FileItem(final File file) {
1039 this.file = file;
1040 }
1041
1042 @Override
1043 public final int hashCode() {
1044 return file.hashCode();
1045 }
1046
1047 @Override
1048 public final boolean equals(final Object obj) {
1049
1050 return obj instanceof FileItem && file.equals(((FileItem) obj).file);
1051 }
1052
1053
1054
1055
1056
1057
1058
1059 public final boolean isStrictlySame(final FileItem item) {
1060 return item != null &&
1061 file.getAbsolutePath().equals(item.file.getAbsolutePath()) &&
1062 file.length() == item.size && lastTime == item.lastTime &&
1063 timeUsed == item.timeUsed && used == item.used &&
1064 status.equals(item.status) &&
1065 (hash != null? Arrays.equals(hash, item.hash) : item.hash == null);
1066 }
1067
1068 @Override
1069 public final String toString() {
1070 return file.getAbsolutePath() + " : " + size + " : " + specialId + " : " +
1071 used + " : " + status + " : " + lastTime + " : " + timeUsed;
1072 }
1073
1074 @Override
1075 public final FileItem clone() {
1076 final FileItem clone = new FileItem(file);
1077 clone.hash = hash;
1078 clone.lastTime = lastTime;
1079 clone.timeUsed = timeUsed;
1080 clone.used = used;
1081 clone.specialId = specialId;
1082 clone.status = status;
1083 clone.size = size;
1084 return clone;
1085 }
1086 }
1087
1088 public static void main(final String[] args) {
1089 if (logger == null) {
1090 logger = WaarpLoggerFactory.getLogger(FileMonitor.class);
1091 }
1092 if (args.length < 3) {
1093 SysErrLogger.FAKE_LOGGER.syserr(
1094 "Need a statusfile, a stopfile and a directory to test");
1095 return;
1096 }
1097 final File file = new File(args[0]);
1098 if (file.exists() && !file.isFile()) {
1099 SysErrLogger.FAKE_LOGGER.syserr("Not a correct status file");
1100 return;
1101 }
1102 final File stopfile = new File(args[1]);
1103 if (file.exists() && !file.isFile()) {
1104 SysErrLogger.FAKE_LOGGER.syserr("Not a correct stop file");
1105 return;
1106 }
1107 final File dir = new File(args[2]);
1108 if (!dir.isDirectory()) {
1109 SysErrLogger.FAKE_LOGGER.syserr("Not a directory");
1110 return;
1111 }
1112 final FileMonitorCommandRunnableFuture filemonitor =
1113 new FileMonitorCommandRunnableFuture() {
1114 @Override
1115 public void run(final FileItem file) {
1116 SysErrLogger.FAKE_LOGGER.syserr(
1117 "File New: " + file.file.getAbsolutePath());
1118 finalizeValidFile(true, 0);
1119 }
1120 };
1121 final FileMonitor monitor =
1122 new FileMonitor("test", file, stopfile, dir, null, 0,
1123 new RegexFileFilter(
1124 RegexFileFilter.REGEX_XML_EXTENSION), false,
1125 filemonitor, new FileMonitorCommandRunnableFuture() {
1126 @Override
1127 public void run(final FileItem file) {
1128 SysErrLogger.FAKE_LOGGER.syserr(
1129 "File Del: " + file.file.getAbsolutePath());
1130 }
1131 }, new FileMonitorCommandRunnableFuture() {
1132 @Override
1133 public void run(final FileItem unused) {
1134 SysErrLogger.FAKE_LOGGER.syserr("Check done");
1135 }
1136 });
1137 filemonitor.setMonitor(monitor);
1138 monitor.start();
1139 monitor.waitForStopFile();
1140 }
1141 }