1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.context;
21
22 import org.waarp.common.command.exception.CommandAbstractException;
23 import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
24 import org.waarp.common.digest.FilesystemBasedDigest;
25 import org.waarp.common.exception.IllegalFiniteStateException;
26 import org.waarp.common.exception.NoRestartException;
27 import org.waarp.common.file.SessionInterface;
28 import org.waarp.common.file.filesystembased.FilesystemBasedFileParameterImpl;
29 import org.waarp.common.logging.SysErrLogger;
30 import org.waarp.common.logging.WaarpLogger;
31 import org.waarp.common.logging.WaarpLoggerFactory;
32 import org.waarp.common.state.MachineState;
33 import org.waarp.compress.WaarpZstdCodec;
34 import org.waarp.openr66.context.authentication.R66Auth;
35 import org.waarp.openr66.context.filesystem.R66Dir;
36 import org.waarp.openr66.context.filesystem.R66File;
37 import org.waarp.openr66.context.filesystem.R66Restart;
38 import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
39 import org.waarp.openr66.database.data.DbTaskRunner;
40 import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
41 import org.waarp.openr66.protocol.configuration.Configuration;
42 import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
43 import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
44 import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
45 import org.waarp.openr66.protocol.localhandler.packet.compression.ZstdCompressionCodecDataPacket;
46 import org.waarp.openr66.protocol.utils.FileUtils;
47
48 import java.io.File;
49 import java.lang.ref.SoftReference;
50 import java.net.InetSocketAddress;
51 import java.net.SocketAddress;
52 import java.security.NoSuchAlgorithmException;
53 import java.util.HashMap;
54 import java.util.concurrent.atomic.AtomicInteger;
55
56
57
58
59 public class R66Session implements SessionInterface {
60
61
62
63 private static final WaarpLogger logger =
64 WaarpLoggerFactory.getLogger(R66Session.class);
65 private static final String FILE_IS_IN_THROUGH_MODE =
66 "File is in through mode: {}";
67 private static final String FILE_CANNOT_BE_WRITE = "File cannot be write";
68
69
70
71
72 private static final ZstdCompressionCodecDataPacket codec =
73 Configuration.configuration.isCompressionAvailable()?
74 new ZstdCompressionCodecDataPacket() : null;
75
76
77
78
79 private int blockSize = Configuration.configuration.getBlockSize();
80
81
82
83 private LocalChannelReference localChannelReference;
84
85
86
87 private final R66Auth auth;
88
89
90
91 private SocketAddress raddress;
92
93
94
95 private SocketAddress laddress;
96
97
98
99
100 private final R66Dir dir;
101
102
103
104 private R66File file;
105
106
107
108 private boolean isReady;
109
110
111
112 private final AtomicInteger numOfError = new AtomicInteger(0);
113
114
115
116
117 private final R66Restart restart;
118
119
120
121
122 private DbTaskRunner runner;
123
124 private String status = "NoStatus";
125
126
127
128
129 private final MachineState<R66FiniteDualStates> state;
130 private Exception traceState = null;
131
132
133
134 private R66BusinessInterface businessObject;
135
136
137
138 private final boolean extendedProtocol =
139 Configuration.configuration.isExtendedProtocol();
140
141 private final HashMap<String, R66Dir> dirsFromSession =
142 new HashMap<String, R66Dir>();
143 private static SoftReference<byte[]> reusableBufferStatic = null;
144 private static SoftReference<byte[]> reusableDataPacketBufferStatic = null;
145 private static SoftReference<byte[]> reusableCompressionBufferStatic = null;
146 private SoftReference<byte[]> reusableBuffer = null;
147 private SoftReference<byte[]> reusableDataPacketBuffer = null;
148 private SoftReference<byte[]> reusableCompressionBuffer = null;
149 private FilesystemBasedDigest digestBlock = null;
150 private boolean isSender;
151 private boolean isCompressionEnabled;
152 private int compressionMaxSize =
153 WaarpZstdCodec.getMaxCompressedSize(blockSize);
154
155
156
157
158 public static ZstdCompressionCodecDataPacket getCodec() {
159 return codec;
160 }
161
162
163
164
165 public R66Session() {
166 isReady = false;
167 auth = new R66Auth(this);
168 dir = new R66Dir(this);
169 restart = new R66Restart(this);
170 state = R66FiniteDualStates.newSessionMachineState();
171 isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
172 synchronized (logger) {
173 reusableBuffer = reusableBufferStatic;
174 reusableBufferStatic = null;
175 reusableDataPacketBuffer = reusableDataPacketBufferStatic;
176 reusableDataPacketBufferStatic = null;
177 }
178 }
179
180
181
182
183 public R66Session(final boolean noBuffer) {
184 isReady = false;
185 auth = new R66Auth(this);
186 dir = new R66Dir(this);
187 restart = new R66Restart(this);
188 state = R66FiniteDualStates.newSessionMachineState();
189 isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
190 }
191
192
193
194
195 public final boolean getExtendedProtocol() {
196 return extendedProtocol;
197 }
198
199
200
201
202 public final R66BusinessInterface getBusinessObject() {
203 return businessObject;
204 }
205
206
207
208
209 public final void setBusinessObject(
210 final R66BusinessInterface businessObject) {
211 this.businessObject = businessObject;
212 }
213
214
215
216
217
218
219
220
221 public final void newState(final R66FiniteDualStates desiredstate) {
222 try {
223 state.setCurrent(desiredstate);
224 if (logger.isDebugEnabled()) {
225 traceState = new Exception("Trace for debugging");
226 }
227 } catch (final IllegalFiniteStateException e) {
228 logger.warn("Should not changed of State: {} {}", this, e.getMessage(),
229 e);
230 if (logger.isDebugEnabled()) {
231 logger.warn("Previous condition of state: {}", this, traceState);
232 traceState = new Exception("Trace for debugging");
233 }
234 state.setDryCurrent(desiredstate);
235 }
236 }
237
238 public final void setErrorState() {
239 try {
240 state.setCurrent(R66FiniteDualStates.ERROR);
241 } catch (final IllegalFiniteStateException e) {
242 logger.error("Couldn't pass to error state. This should not happen");
243 }
244 }
245
246
247
248
249 public final R66FiniteDualStates getState() {
250 return state.getCurrent();
251 }
252
253
254
255
256
257
258 public final void setStatus(final int stat) {
259 if (logger.isDebugEnabled()) {
260 final StackTraceElement elt = Thread.currentThread().getStackTrace()[2];
261 status =
262 '(' + elt.getFileName() + ':' + elt.getLineNumber() + "):" + stat;
263 } else {
264 status = ":" + stat;
265 }
266 }
267
268 @Override
269 public final void clear() {
270 partialClear();
271 if (dir != null) {
272 dir.clear();
273 }
274 if (auth != null) {
275 auth.clear();
276 }
277 if (runner != null) {
278 runner.clear();
279 }
280 if (state != null) {
281 try {
282 state.setCurrent(R66FiniteDualStates.CLOSEDCHANNEL);
283 } catch (final IllegalFiniteStateException ignored) {
284
285 }
286 R66FiniteDualStates.endSessionMachineSate(state);
287 }
288 }
289
290 public final void partialClear() {
291
292 if (runner != null && !runner.isFinished() && !runner.continueTransfer()) {
293 if (localChannelReference != null) {
294 if (!localChannelReference.getFutureRequest().isDone()) {
295 final R66Result result = new R66Result(
296 new OpenR66RunnerErrorException("Close before ending"), this,
297 true, ErrorCode.Disconnection,
298 runner);
299 result.setRunner(runner);
300 try {
301 setFinalizeTransfer(false, result);
302 } catch (final OpenR66RunnerErrorException ignored) {
303
304 } catch (final OpenR66ProtocolSystemException ignored) {
305
306 }
307 }
308 }
309 }
310
311 isReady = false;
312 if (businessObject != null) {
313 businessObject.releaseResources(this);
314 businessObject = null;
315 }
316 digestBlock = null;
317 if (reusableBuffer != null && reusableBufferStatic == null) {
318 reusableBufferStatic = reusableBuffer;
319 }
320 if (reusableDataPacketBuffer != null &&
321 reusableDataPacketBufferStatic == null) {
322 reusableDataPacketBufferStatic = reusableDataPacketBuffer;
323 }
324 if (reusableCompressionBuffer != null &&
325 reusableCompressionBufferStatic == null) {
326 reusableCompressionBufferStatic = reusableCompressionBuffer;
327 }
328 reusableBuffer = null;
329 reusableDataPacketBuffer = null;
330 reusableCompressionBuffer = null;
331 }
332
333 @Override
334 public final R66Auth getAuth() {
335 return auth;
336 }
337
338 @Override
339 public final int getBlockSize() {
340 return blockSize;
341 }
342
343
344
345
346 public final void setBlockSize(final int blocksize) {
347 blockSize = blocksize;
348 compressionMaxSize = WaarpZstdCodec.getMaxCompressedSize(blockSize);
349 }
350
351 private SoftReference<byte[]> getBuffer(SoftReference<byte[]> softReference,
352 final int length) {
353 if (softReference == null) {
354 softReference = new SoftReference<byte[]>(new byte[length]);
355 return softReference;
356 }
357 final byte[] buffer = softReference.get();
358 if (buffer != null && buffer.length >= length) {
359 return softReference;
360 }
361 softReference.clear();
362 softReference = new SoftReference<byte[]>(new byte[length]);
363 return softReference;
364 }
365
366
367
368
369
370
371 public final byte[] getReusableBuffer(final int length) {
372 reusableBuffer = getBuffer(reusableBuffer, length);
373 return reusableBuffer.get();
374 }
375
376
377
378
379
380
381 public final byte[] getReusableDataPacketBuffer(final int length) {
382 reusableDataPacketBuffer = getBuffer(reusableDataPacketBuffer, length);
383 return reusableDataPacketBuffer.get();
384 }
385
386
387
388
389
390
391 public final byte[] getSessionReusableCompressionBuffer(final int length) {
392 reusableCompressionBuffer = getBuffer(reusableCompressionBuffer,
393 WaarpZstdCodec.getMaxCompressedSize(
394 length));
395 return reusableCompressionBuffer.get();
396 }
397
398
399
400
401 public final boolean isCompressionEnabled() {
402 return isCompressionEnabled;
403 }
404
405
406
407
408 public final void setCompressionEnabled(final boolean compressionEnabled) {
409 logger.debug("Compression enabled? {} => {}", isCompressionEnabled,
410 compressionEnabled);
411 isCompressionEnabled = compressionEnabled;
412 if (isCompressionEnabled && reusableCompressionBuffer == null) {
413 synchronized (logger) {
414 reusableCompressionBuffer = reusableCompressionBufferStatic;
415 reusableCompressionBufferStatic = null;
416 }
417 } else if (!isCompressionEnabled && reusableCompressionBuffer != null) {
418 synchronized (logger) {
419 reusableDataPacketBufferStatic = reusableCompressionBuffer;
420 reusableCompressionBuffer = null;
421 }
422 }
423 }
424
425
426
427
428 public final int getCompressionMaxSize() {
429 return compressionMaxSize;
430 }
431
432
433
434
435 public final void initializeDigest() {
436 if (digestBlock == null && RequestPacket.isMD5Mode(getRunner().getMode())) {
437 try {
438 digestBlock = new FilesystemBasedDigest(
439 localChannelReference.getPartner().getDigestAlgo());
440 } catch (final NoSuchAlgorithmException e) {
441 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
442 }
443 }
444 }
445
446
447
448
449 public final FilesystemBasedDigest getDigestBlock() {
450 return digestBlock;
451 }
452
453 @Override
454 public final R66Dir getDir() {
455 return dir;
456 }
457
458 @Override
459 public final FilesystemBasedFileParameterImpl getFileParameter() {
460 return Configuration.getFileParameter();
461 }
462
463 @Override
464 public final R66Restart getRestart() {
465 return restart;
466 }
467
468
469
470
471 public final boolean isAuthenticated() {
472 if (auth == null) {
473 return false;
474 }
475 return auth.isIdentified();
476 }
477
478
479
480
481 public final boolean isReady() {
482 return isReady;
483 }
484
485
486
487
488 public final void setReady(final boolean isReady) {
489 this.isReady = isReady;
490 }
491
492
493
494
495 public final DbTaskRunner getRunner() {
496 return runner;
497 }
498
499
500
501
502 public final void setLocalChannelReference(
503 final LocalChannelReference localChannelReference) {
504 this.localChannelReference = localChannelReference;
505 this.localChannelReference.setSession(this);
506 if (this.localChannelReference.getNetworkChannel() != null) {
507 raddress = this.localChannelReference.getNetworkChannel().remoteAddress();
508 laddress = this.localChannelReference.getNetworkChannel().localAddress();
509 } else {
510 raddress = laddress = new InetSocketAddress(0);
511 }
512 }
513
514
515
516
517 public final SocketAddress getRemoteAddress() {
518 return raddress;
519 }
520
521
522
523
524 public final SocketAddress getLocalAddress() {
525 return laddress;
526 }
527
528
529
530
531 public final LocalChannelReference getLocalChannelReference() {
532 return localChannelReference;
533 }
534
535
536
537
538
539
540
541 public final void setNoSessionRunner(final DbTaskRunner runner,
542 final LocalChannelReference localChannelReference) {
543 this.runner = runner;
544
545 auth.specialNoSessionAuth(false, Configuration.configuration.getHostId());
546 try {
547 file = (R66File) dir.setFile(this.runner.getFilename(), false);
548 } catch (final CommandAbstractException ignored) {
549
550 }
551 this.localChannelReference = localChannelReference;
552 if (this.localChannelReference == null) {
553 if (this.runner.getLocalChannelReference() != null) {
554 this.localChannelReference = this.runner.getLocalChannelReference();
555 } else {
556 this.localChannelReference = new LocalChannelReference();
557 }
558 this.localChannelReference.setErrorMessage(
559 this.runner.getErrorInfo().getMesg(), this.runner.getErrorInfo());
560 }
561 runner.setLocalChannelReference(this.localChannelReference);
562 this.localChannelReference.setSession(this);
563 }
564
565
566
567
568
569
570 public final void setFileBeforePreRunner()
571 throws OpenR66RunnerErrorException {
572
573 try {
574 file = FileUtils.getFile(logger, this, runner.getOriginalFilename(),
575 runner.isPreTaskStarting(), isSender,
576 runner.isSendThrough(), file);
577 } catch (final OpenR66RunnerErrorException e) {
578 runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
579 throw e;
580 }
581 if (isSender && !runner.isSendThrough()) {
582
583 try {
584 runner.setOriginalFilename(file.getFile());
585 runner.setFilename(file.getFile());
586 logger.debug("Old size: {} => {}", runner.getOriginalSize(),
587 file.length());
588 if (runner.getOriginalSize() <= 0) {
589 final long originalSize = file.length();
590 if (originalSize > 0) {
591 runner.setOriginalSize(originalSize);
592 }
593 }
594 } catch (final CommandAbstractException e) {
595 throw new OpenR66RunnerErrorException(e);
596 }
597 }
598 }
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613 public final void setFileAfterPreRunnerReceiver(final boolean createFile)
614 throws OpenR66RunnerErrorException, CommandAbstractException {
615 if (businessObject != null) {
616 businessObject.checkAtChangeFilename(this);
617 }
618
619 if (runner.getRank() > 0) {
620
621 try {
622 file = (R66File) dir.setFile(runner.getFilename(), true);
623 if (runner.isRecvThrough()) {
624
625 logger.debug(FILE_IS_IN_THROUGH_MODE, file);
626 } else if (!file.canWrite()) {
627 throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
628 }
629 } catch (final CommandAbstractException e) {
630 throw new OpenR66RunnerErrorException(e);
631 }
632 } else {
633
634 if (createFile) {
635 file = null;
636 String newfilename = runner.getOriginalFilename();
637 if (newfilename.charAt(1) == ':') {
638
639 newfilename = newfilename.substring(2);
640 }
641 newfilename = R66File.getBasename(newfilename);
642 try {
643 file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
644 runner.setFilename(file.getBasename());
645 } catch (final CommandAbstractException e) {
646 runner.deleteTempFile();
647 throw e;
648 }
649 try {
650 if (runner.isRecvThrough()) {
651
652 logger.debug(FILE_IS_IN_THROUGH_MODE, file);
653 runner.deleteTempFile();
654 } else if (!file.canWrite()) {
655 runner.deleteTempFile();
656 throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
657 }
658 } catch (final CommandAbstractException e) {
659 runner.deleteTempFile();
660 throw new OpenR66RunnerErrorException(e);
661 }
662 } else {
663 throw new OpenR66RunnerErrorException("No file created");
664 }
665 }
666
667 try {
668 if (runner.isFileMoved()) {
669 runner.setFileMoved(file.getFile(), true);
670 } else {
671 runner.setFilename(file.getFile());
672 }
673 } catch (final CommandAbstractException e) {
674 runner.deleteTempFile();
675 throw new OpenR66RunnerErrorException(e);
676 }
677 }
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692 public final void setFileAfterPreRunner(final boolean createFile)
693 throws OpenR66RunnerErrorException, CommandAbstractException {
694 if (businessObject != null) {
695 businessObject.checkAtChangeFilename(this);
696 }
697
698 if (isSender != runner.isSender()) {
699 logger.warn("Not same SenderSide {} {}", isSender, runner.isSender());
700 }
701 if (isSender) {
702 try {
703 if (file == null) {
704 try {
705 file = (R66File) dir.setFile(runner.getFilename(), false);
706 } catch (final CommandAbstractException e) {
707
708
709 file = dir.setFileNoCheck(runner.getFilename());
710 }
711 }
712 if (runner.isSendThrough()) {
713
714 logger.debug(FILE_IS_IN_THROUGH_MODE, file);
715 } else if (!file.canRead()) {
716
717
718 final R66File newFile = new R66File(this, dir, runner.getFilename());
719 if (!newFile.canRead()) {
720 runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
721 throw new OpenR66RunnerErrorException(
722 "File cannot be read: " + file.getTrueFile().getAbsolutePath() +
723 " to " + newFile.getTrueFile().getAbsolutePath());
724 }
725 }
726 } catch (final CommandAbstractException e) {
727 throw new OpenR66RunnerErrorException(e);
728 }
729 } else {
730
731 if (runner.getRank() > 0) {
732
733 try {
734 file = (R66File) dir.setFile(runner.getFilename(), true);
735 if (runner.isRecvThrough()) {
736
737 logger.debug(FILE_IS_IN_THROUGH_MODE, file);
738 } else if (!file.canWrite()) {
739 throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
740 }
741 } catch (final CommandAbstractException e) {
742 throw new OpenR66RunnerErrorException(e);
743 }
744 } else {
745
746 if (createFile) {
747 file = null;
748 String newfilename = runner.getOriginalFilename();
749 if (newfilename.charAt(1) == ':') {
750
751 newfilename = newfilename.substring(2);
752 }
753 newfilename = R66File.getBasename(newfilename);
754 try {
755 file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
756 runner.setFilename(file.getBasename());
757 } catch (final CommandAbstractException e) {
758 runner.deleteTempFile();
759 throw e;
760 }
761 try {
762 if (runner.isRecvThrough()) {
763
764 logger.debug(FILE_IS_IN_THROUGH_MODE, file);
765 runner.deleteTempFile();
766 } else if (!file.canWrite()) {
767 runner.deleteTempFile();
768 throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
769 }
770 } catch (final CommandAbstractException e) {
771 runner.deleteTempFile();
772 throw new OpenR66RunnerErrorException(e);
773 }
774 } else {
775 throw new OpenR66RunnerErrorException("No file created");
776 }
777 }
778 }
779
780 try {
781 if (runner.isFileMoved()) {
782 runner.setFileMoved(file.getFile(), true);
783 } else {
784 runner.setFilename(file.getFile());
785 }
786 } catch (final CommandAbstractException e) {
787 runner.deleteTempFile();
788 throw new OpenR66RunnerErrorException(e);
789 }
790
791 if (isSender && file != null) {
792 logger.debug("could change size: {} => {}", runner.getOriginalSize(),
793 file.length());
794 if (runner.getOriginalSize() < 0) {
795 final long originalSize = file.length();
796 if (originalSize > 0) {
797 runner.setOriginalSize(originalSize);
798 }
799 }
800 }
801 }
802
803
804
805
806
807
808
809
810 public final void setBadRunner(final DbTaskRunner runner,
811 final ErrorCode code) {
812 this.runner = runner;
813 if (code == ErrorCode.QueryAlreadyFinished) {
814 if (this.runner.isSender()) {
815
816 try {
817 dir.changeDirectory(this.runner.getRule().getSendPath());
818 } catch (final CommandAbstractException ignored) {
819
820 }
821 } else {
822
823 try {
824 dir.changeDirectory(this.runner.getRule().getWorkPath());
825 } catch (final CommandAbstractException ignored) {
826
827 }
828 }
829 if (businessObject != null) {
830 businessObject.checkAtError(this);
831 }
832 this.runner.setPostTask();
833 try {
834 setFileAfterPreRunner(false);
835 } catch (final OpenR66RunnerErrorException ignored) {
836
837 } catch (final CommandAbstractException ignored) {
838
839 }
840 }
841 }
842
843
844
845
846
847
848
849
850
851
852 public final void setRunner(final DbTaskRunner runner)
853 throws OpenR66RunnerErrorException {
854 this.runner = runner;
855 if (localChannelReference != null) {
856 this.runner.setLocalChannelReference(localChannelReference);
857 }
858 this.isSender = runner.isSender();
859 logger.debug("Runner to set: {} {}", runner.shallIgnoreSave(), runner);
860 this.runner.checkThroughMode();
861 if (businessObject != null) {
862 businessObject.checkAtStartup(this);
863 }
864 if (this.runner.isSender()) {
865 if (runner.isSendThrough()) {
866
867
868 try {
869 dir.changeDirectory(this.runner.getRule().getSendPath());
870 } catch (final CommandAbstractException e) {
871
872 }
873 } else {
874
875 try {
876 dir.changeDirectory(this.runner.getRule().getSendPath());
877 } catch (final CommandAbstractException e) {
878 throw new OpenR66RunnerErrorException(e);
879 }
880 }
881 } else {
882 if (runner.isRecvThrough()) {
883
884
885 try {
886 dir.changeDirectory(this.runner.getRule().getWorkPath());
887 } catch (final CommandAbstractException ignored) {
888
889 }
890 } else {
891
892 try {
893 dir.changeDirectory(this.runner.getRule().getWorkPath());
894 } catch (final CommandAbstractException e) {
895 throw new OpenR66RunnerErrorException(e);
896 }
897 }
898 }
899 logger.debug("Dir is: {}", dir.getFullPath());
900 }
901
902
903
904
905
906
907
908
909
910 public final void startup(final boolean checkNotExternal)
911 throws OpenR66RunnerErrorException {
912 setRestartMarker();
913 logger.debug("GlobalLastStep: {} vs {}:{}", runner.getGloballaststep(),
914 TASKSTEP.NOTASK.ordinal(), TASKSTEP.PRETASK.ordinal());
915 initializeTransfer(checkNotExternal);
916
917 try {
918 setFileAfterPreRunner(true);
919 } catch (final CommandAbstractException e2) {
920
921 file = null;
922 }
923 logger.debug("GlobalLastStep: {}", runner.getGloballaststep());
924 if (runner.getGloballaststep() == TASKSTEP.TRANSFERTASK.ordinal()) {
925 if (businessObject != null) {
926 businessObject.checkAfterPreCommand(this);
927 }
928 if (!runner.isSender()) {
929 if (initializeReceiver()) {
930 return;
931 }
932 } else {
933 initializeSender();
934 }
935 }
936 runner.saveStatus();
937 logger.debug("Final init: {} {}", runner, file != null);
938 }
939
940 private void initializeSender() throws OpenR66RunnerErrorException {
941 if (file != null) {
942 try {
943 localChannelReference.getFutureRequest().setFilesize(file.length());
944 } catch (final CommandAbstractException ignored) {
945
946 }
947 try {
948 file.restartMarker(restart);
949 } catch (final CommandAbstractException e) {
950 runner.deleteTempFile();
951 throw new OpenR66RunnerErrorException(e);
952 }
953 }
954 }
955
956 private boolean initializeReceiver() throws OpenR66RunnerErrorException {
957
958 if (runner.isRecvThrough()) {
959
960 } else {
961 long length = 0;
962 if (file != null) {
963 try {
964 length = file.length();
965 } catch (final CommandAbstractException ignored) {
966
967 }
968 }
969 final long needed = runner.getOriginalSize() - length;
970 long available = 0;
971 String targetDir = null;
972 try {
973 available = dir.getFreeSpace();
974 targetDir = dir.getPwd();
975 } catch (final CommandAbstractException ignored) {
976
977 }
978 if (file != null) {
979 final File truefile = file.getTrueFile().getParentFile();
980 available = truefile.getFreeSpace();
981 targetDir = truefile.getPath();
982 }
983 logger.debug("Check available space: {} >? {}(+{})", available, needed,
984 length);
985
986 if (available > 0 && needed > available) {
987
988 runner.setErrorExecutionStatus(ErrorCode.Internal);
989 throw new OpenR66RunnerErrorException(
990 "File cannot be written due to unsufficient space available: " +
991 targetDir + " need " + needed + " more while available is " +
992 available);
993 }
994 if (file == null) {
995 runner.saveStatus();
996 logger.info("Final PARTIAL init: {}", runner);
997 return true;
998 }
999 checkPosition(length);
1000 }
1001 return false;
1002 }
1003
1004 private void checkPosition(final long length)
1005 throws OpenR66RunnerErrorException {
1006
1007 try {
1008 final long oldPosition = restart.getPosition();
1009 restart.setSet(true);
1010 if (oldPosition > length) {
1011 int newRank = (int) (length / runner.getBlocksize()) -
1012 Configuration.getRankRestart();
1013 if (newRank <= 0) {
1014 newRank = 1;
1015 }
1016 logger.info("OldPos: {}:{} curLength: {}:{}", oldPosition,
1017 runner.getRank(), length, newRank);
1018 logger.warn("Decreased Rank Restart for {} at " + newRank, runner);
1019 runner.setTransferTask(newRank);
1020 restart.restartMarker(runner.getBlocksize() * runner.getRank());
1021 }
1022 try {
1023 file.restartMarker(restart);
1024 } catch (final CommandAbstractException e) {
1025 runner.deleteTempFile();
1026 throw new OpenR66RunnerErrorException(e);
1027 }
1028 } catch (final NoRestartException e) {
1029
1030 }
1031 }
1032
1033 private void initializeTransfer(final boolean checkNotExternal)
1034 throws OpenR66RunnerErrorException {
1035 if (runner.isSelfRequest() && runner.getRule().isSendMode()) {
1036
1037
1038 runner.setInitialTask();
1039 }
1040 if (runner.getGloballaststep() == TASKSTEP.NOTASK.ordinal() ||
1041 runner.getGloballaststep() == TASKSTEP.PRETASK.ordinal()) {
1042 setFileBeforePreRunner();
1043 if (runner.isSender() && !runner.isSendThrough() && file != null &&
1044 checkNotExternal) {
1045 String path = null;
1046 try {
1047 path = file.getFile();
1048 } catch (final CommandAbstractException ignored) {
1049
1050 }
1051 if (file.isExternal() ||
1052 path != null && !dir.isPathInCurrentDir(path)) {
1053
1054 logger.error(
1055 "File cannot be found in the current output directory: {} not in {}",
1056 file, dir);
1057 runner.setErrorExecutionStatus(ErrorCode.FileNotAllowed);
1058 throw new OpenR66RunnerErrorException(
1059 "File cannot be found in the current output directory");
1060 }
1061 }
1062 runner.setPreTask();
1063 runner.run();
1064 if (runner.isSender() && !runner.isSendThrough()) {
1065 if (file != null) {
1066 try {
1067 final long originalSize = file.length();
1068 if (originalSize > 0) {
1069 runner.setOriginalSize(originalSize);
1070 }
1071 } catch (final CommandAbstractException e) {
1072
1073 }
1074 }
1075 }
1076 runner.setTransferTask(runner.getRank());
1077 } else {
1078 runner.reset();
1079 runner.changeUpdatedInfo(UpdatedInfo.RUNNING);
1080 }
1081 }
1082
1083 private void setRestartMarker() {
1084 if (runner.getRank() > 0) {
1085 logger.debug("restart at {} {}", runner.getRank(), runner);
1086 logger.debug("restart at {} {}", runner.getRank(), dir);
1087 runner.setTransferTask(runner.getRank());
1088 restart.restartMarker(runner.getBlocksize() * runner.getRank());
1089 } else {
1090 restart.restartMarker(0);
1091 }
1092 }
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103 public final void renameReceiverFile(final String newFilename)
1104 throws OpenR66RunnerErrorException {
1105 if (runner == null) {
1106 return;
1107 }
1108
1109 if (runner.getRank() > 0) {
1110 logger.error(
1111 "Renaming file is not correct since transfer does not start from first block");
1112
1113 throw new OpenR66RunnerErrorException(
1114 "Renaming file not correct since transfer already started");
1115 }
1116 if (!runner.isRecvThrough()) {
1117 runner.deleteTempFile();
1118 }
1119
1120 runner.setOriginalFilename(newFilename);
1121 try {
1122 setFileAfterPreRunnerReceiver(true);
1123 } catch (final CommandAbstractException e) {
1124 throw new OpenR66RunnerErrorException(e);
1125 }
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139 public final void setFinalizeTransfer(final boolean status,
1140 final R66Result finalValue)
1141 throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
1142 logger.debug("{}:{}:{}", status, finalValue, runner);
1143 if (runner == null) {
1144 if (localChannelReference != null) {
1145 if (status) {
1146 localChannelReference.validateRequest(finalValue);
1147 } else {
1148 localChannelReference.invalidateRequest(finalValue);
1149 }
1150 }
1151 if (businessObject != null) {
1152 if (status) {
1153 businessObject.checkAfterTransfer(this);
1154 } else {
1155 businessObject.checkAtError(this);
1156 }
1157 }
1158 return;
1159 }
1160 if (businessObject != null) {
1161 if (status) {
1162 businessObject.checkAfterTransfer(this);
1163 } else {
1164 businessObject.checkAtError(this);
1165 }
1166 }
1167 if (runner.isAllDone()) {
1168 if (logger.isDebugEnabled()) {
1169 logger.debug("Transfer already done but {} on {} {}", status, file,
1170 runner.toShortString(),
1171 new OpenR66RunnerErrorException(finalValue.toString()));
1172 }
1173
1174
1175
1176
1177 return;
1178 }
1179 if (runner.isInError()) {
1180 if (logger.isDebugEnabled()) {
1181 logger.debug("Transfer already done in error but {} on {} {}", status,
1182 file, runner.toShortString(),
1183 new OpenR66RunnerErrorException(finalValue.toString()));
1184 }
1185
1186
1187
1188
1189 return;
1190 }
1191 if (localChannelReference.getFutureRequest().isDone()) {
1192 if (logger.isDebugEnabled()) {
1193 logger.debug("Request already done but {} on {} {}" + status, file,
1194 runner.toShortString(),
1195 new OpenR66RunnerErrorException(finalValue.toString()));
1196 }
1197
1198 return;
1199 }
1200 if (!status) {
1201 runner.deleteTempFile();
1202 runner.setErrorExecutionStatus(finalValue.getCode());
1203 }
1204 if (status) {
1205 runner.finishTransferTask(ErrorCode.TransferOk);
1206 } else {
1207 runner.finishTransferTask(finalValue.getCode());
1208 }
1209 logger.debug("Transfer {} on {} and {}", status, file, runner);
1210 if (!runner.ready()) {
1211
1212 final OpenR66RunnerErrorException runnerErrorException;
1213 if (!status && finalValue.getException() != null) {
1214 runnerErrorException = new OpenR66RunnerErrorException(
1215 "Pre task in error (or even before)", finalValue.getException());
1216 } else {
1217 runnerErrorException = new OpenR66RunnerErrorException(
1218 "Pre task in error (or even before)");
1219 }
1220 finalValue.setException(runnerErrorException);
1221 logger.info("Pre task in error (or even before) : {}",
1222 runnerErrorException.getMessage());
1223 if (Configuration.configuration.isExecuteErrorBeforeTransferAllowed()) {
1224 runner.finalizeTransfer(localChannelReference, file, finalValue,
1225 status);
1226 }
1227 runner.saveStatus();
1228 localChannelReference.invalidateRequest(finalValue);
1229 throw runnerErrorException;
1230 }
1231 try {
1232 if (file != null) {
1233 file.closeFile();
1234 }
1235 } catch (final CommandAbstractException e1) {
1236 R66Result result = finalValue;
1237 if (status) {
1238 result = new R66Result(new OpenR66RunnerErrorException(e1), this, false,
1239 ErrorCode.Internal, runner);
1240 }
1241 localChannelReference.invalidateRequest(result);
1242 throw (OpenR66RunnerErrorException) result.getException();
1243 }
1244 runner.finalizeTransfer(localChannelReference, file, finalValue, status);
1245 if (businessObject != null) {
1246 businessObject.checkAfterPost(this);
1247 }
1248 }
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258 public final void tryFinalizeRequest(final R66Result errorValue)
1259 throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
1260 if (getLocalChannelReference() == null) {
1261 return;
1262 }
1263 if (getLocalChannelReference().getFutureRequest().isDone()) {
1264 return;
1265 }
1266 if (runner == null) {
1267 localChannelReference.invalidateRequest(errorValue);
1268 return;
1269 }
1270
1271 if (runner.getStatus() == ErrorCode.CompleteOk) {
1272 runner.setAllDone();
1273 runner.forceSaveStatus();
1274 localChannelReference.validateRequest(
1275 new R66Result(this, true, ErrorCode.CompleteOk, runner));
1276 } else if (runner.getStatus() == ErrorCode.TransferOk &&
1277 (!runner.isSender() ||
1278 errorValue.getCode() == ErrorCode.QueryAlreadyFinished)) {
1279
1280 try {
1281 setFinalizeTransfer(true,
1282 new R66Result(this, true, ErrorCode.CompleteOk,
1283 runner));
1284 localChannelReference.validateRequest(
1285 localChannelReference.getFutureEndTransfer().getResult());
1286 } catch (final OpenR66ProtocolSystemException e) {
1287 logger.error("Cannot validate runner: {}", runner.toShortString());
1288 runner.changeUpdatedInfo(UpdatedInfo.INERROR);
1289 runner.setErrorExecutionStatus(errorValue.getCode());
1290 runner.forceSaveStatus();
1291 setFinalizeTransfer(false, errorValue);
1292 } catch (final OpenR66RunnerErrorException e) {
1293 logger.error("Cannot validate runner: {}", runner.toShortString());
1294 runner.changeUpdatedInfo(UpdatedInfo.INERROR);
1295 runner.setErrorExecutionStatus(errorValue.getCode());
1296 runner.forceSaveStatus();
1297 setFinalizeTransfer(false, errorValue);
1298 }
1299 } else {
1300
1301 logger.error(
1302 "Runner {} will be shutdown while in status {} and future status will be {}",
1303 runner.getSpecialId(), runner.getStatus().getMesg(),
1304 errorValue.getCode().getMesg());
1305 setFinalizeTransfer(false, errorValue);
1306 }
1307 }
1308
1309
1310
1311
1312 public final R66File getFile() {
1313 return file;
1314 }
1315
1316
1317
1318
1319 public final boolean addError() {
1320 final int value = numOfError.incrementAndGet();
1321 return value < Configuration.RETRYNB;
1322 }
1323
1324 @Override
1325 public final String toString() {
1326 return "Session: FS[" + state.getCurrent() + "] " + status + " " +
1327 (auth != null? auth.toString() : "no Auth") + " " +
1328 (dir != null? dir.toString() : "no Dir") + " " +
1329 (file != null? file.toString() : "no File") + " " +
1330 (runner != null? runner.toShortString() : "no Runner");
1331 }
1332
1333 @Override
1334 public final String getUniqueExtension() {
1335 return Configuration.EXT_R66;
1336 }
1337
1338
1339
1340
1341 public final HashMap<String, R66Dir> getDirsFromSession() {
1342 return dirsFromSession;
1343 }
1344
1345
1346
1347
1348
1349 public final boolean isSender() {
1350 return isSender;
1351 }
1352 }