View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.context;
21  
22  import org.waarp.common.command.exception.CommandAbstractException;
23  import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
24  import org.waarp.common.digest.FilesystemBasedDigest;
25  import org.waarp.common.exception.IllegalFiniteStateException;
26  import org.waarp.common.exception.NoRestartException;
27  import org.waarp.common.file.SessionInterface;
28  import org.waarp.common.file.filesystembased.FilesystemBasedFileParameterImpl;
29  import org.waarp.common.logging.SysErrLogger;
30  import org.waarp.common.logging.WaarpLogger;
31  import org.waarp.common.logging.WaarpLoggerFactory;
32  import org.waarp.common.state.MachineState;
33  import org.waarp.compress.WaarpZstdCodec;
34  import org.waarp.openr66.context.authentication.R66Auth;
35  import org.waarp.openr66.context.filesystem.R66Dir;
36  import org.waarp.openr66.context.filesystem.R66File;
37  import org.waarp.openr66.context.filesystem.R66Restart;
38  import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
39  import org.waarp.openr66.database.data.DbTaskRunner;
40  import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
41  import org.waarp.openr66.protocol.configuration.Configuration;
42  import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
43  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
44  import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
45  import org.waarp.openr66.protocol.localhandler.packet.compression.ZstdCompressionCodecDataPacket;
46  import org.waarp.openr66.protocol.utils.FileUtils;
47  
48  import java.io.File;
49  import java.lang.ref.SoftReference;
50  import java.net.InetSocketAddress;
51  import java.net.SocketAddress;
52  import java.security.NoSuchAlgorithmException;
53  import java.util.HashMap;
54  import java.util.concurrent.atomic.AtomicInteger;
55  
56  /**
57   * The global object session in OpenR66, a session by local channel
58   */
59  public class R66Session implements SessionInterface {
60    /**
61     * Internal Logger
62     */
63    private static final WaarpLogger logger =
64        WaarpLoggerFactory.getLogger(R66Session.class);
65    private static final String FILE_IS_IN_THROUGH_MODE =
66        "File is in through mode: {}";
67    private static final String FILE_CANNOT_BE_WRITE = "File cannot be write";
68  
69    /**
70     * Global Codec for packet compression
71     */
72    private static final ZstdCompressionCodecDataPacket codec =
73        Configuration.configuration.isCompressionAvailable()?
74            new ZstdCompressionCodecDataPacket() : null;
75  
76    /**
77     * Block size used during file transfer
78     */
79    private int blockSize = Configuration.configuration.getBlockSize();
80    /**
81     * The local channel reference
82     */
83    private LocalChannelReference localChannelReference;
84    /**
85     * Authentication
86     */
87    private final R66Auth auth;
88    /**
89     * Remote Address
90     */
91    private SocketAddress raddress;
92    /**
93     * Local Address
94     */
95    private SocketAddress laddress;
96  
97    /**
98     * Current directory
99     */
100   private final R66Dir dir;
101   /**
102    * Current file
103    */
104   private R66File file;
105   /**
106    * Does this session is Ready to serve a request
107    */
108   private boolean isReady;
109   /**
110    * Used to prevent deny of service
111    */
112   private final AtomicInteger numOfError = new AtomicInteger(0);
113 
114   /**
115    * Current Restart information
116    */
117   private final R66Restart restart;
118 
119   /**
120    * DbTaskRunner
121    */
122   private DbTaskRunner runner;
123 
124   private String status = "NoStatus";
125 
126   /**
127    * The Finite Machine State
128    */
129   private final MachineState<R66FiniteDualStates> state;
130   private Exception traceState = null;
131   /**
132    * Business Object if used
133    */
134   private R66BusinessInterface businessObject;
135   /**
136    * Extended protocol or not
137    */
138   private final boolean extendedProtocol =
139       Configuration.configuration.isExtendedProtocol();
140 
141   private final HashMap<String, R66Dir> dirsFromSession =
142       new HashMap<String, R66Dir>();
143   private static SoftReference<byte[]> reusableBufferStatic = null;
144   private static SoftReference<byte[]> reusableDataPacketBufferStatic = null;
145   private static SoftReference<byte[]> reusableCompressionBufferStatic = null;
146   private SoftReference<byte[]> reusableBuffer = null;
147   private SoftReference<byte[]> reusableDataPacketBuffer = null;
148   private SoftReference<byte[]> reusableCompressionBuffer = null;
149   private FilesystemBasedDigest digestBlock = null;
150   private boolean isSender;
151   private boolean isCompressionEnabled;
152   private int compressionMaxSize =
153       WaarpZstdCodec.getMaxCompressedSize(blockSize);
154 
155   /**
156    * @return the compression codec
157    */
158   public static ZstdCompressionCodecDataPacket getCodec() {
159     return codec;
160   }
161 
162   /**
163    * Create the session
164    */
165   public R66Session() {
166     isReady = false;
167     auth = new R66Auth(this);
168     dir = new R66Dir(this);
169     restart = new R66Restart(this);
170     state = R66FiniteDualStates.newSessionMachineState();
171     isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
172     synchronized (logger) {
173       reusableBuffer = reusableBufferStatic;
174       reusableBufferStatic = null;
175       reusableDataPacketBuffer = reusableDataPacketBufferStatic;
176       reusableDataPacketBufferStatic = null;
177     }
178   }
179 
180   /**
181    * Create the session without Buffers
182    */
183   public R66Session(final boolean noBuffer) {
184     isReady = false;
185     auth = new R66Auth(this);
186     dir = new R66Dir(this);
187     restart = new R66Restart(this);
188     state = R66FiniteDualStates.newSessionMachineState();
189     isCompressionEnabled = Configuration.configuration.isCompressionAvailable();
190   }
191 
192   /**
193    * @return extendedProtocol
194    */
195   public final boolean getExtendedProtocol() {
196     return extendedProtocol;
197   }
198 
199   /**
200    * @return the businessObject
201    */
202   public final R66BusinessInterface getBusinessObject() {
203     return businessObject;
204   }
205 
206   /**
207    * @param businessObject the businessObject to set
208    */
209   public final void setBusinessObject(
210       final R66BusinessInterface businessObject) {
211     this.businessObject = businessObject;
212   }
213 
214   /**
215    * Propose a new State
216    *
217    * @param desiredstate
218    *
219    * @throws IllegalFiniteStateException if the new status if not ok
220    */
221   public final void newState(final R66FiniteDualStates desiredstate) {
222     try {
223       state.setCurrent(desiredstate);
224       if (logger.isDebugEnabled()) {
225         traceState = new Exception("Trace for debugging");
226       }
227     } catch (final IllegalFiniteStateException e) {
228       logger.warn("Should not changed of State: {} {}", this, e.getMessage(),
229                   e);
230       if (logger.isDebugEnabled()) {
231         logger.warn("Previous condition of state: {}", this, traceState);
232         traceState = new Exception("Trace for debugging");
233       }
234       state.setDryCurrent(desiredstate);
235     }
236   }
237 
238   public final void setErrorState() {
239     try {
240       state.setCurrent(R66FiniteDualStates.ERROR);
241     } catch (final IllegalFiniteStateException e) {
242       logger.error("Couldn't pass to error state. This should not happen");
243     }
244   }
245 
246   /**
247    * @return the current state in the finite state machine
248    */
249   public final R66FiniteDualStates getState() {
250     return state.getCurrent();
251   }
252 
253   /**
254    * Debugging purpose (trace)
255    *
256    * @param stat
257    */
258   public final void setStatus(final int stat) {
259     if (logger.isDebugEnabled()) {
260       final StackTraceElement elt = Thread.currentThread().getStackTrace()[2];
261       status =
262           '(' + elt.getFileName() + ':' + elt.getLineNumber() + "):" + stat;
263     } else {
264       status = ":" + stat;
265     }
266   }
267 
268   @Override
269   public final void clear() {
270     partialClear();
271     if (dir != null) {
272       dir.clear();
273     }
274     if (auth != null) {
275       auth.clear();
276     }
277     if (runner != null) {
278       runner.clear();
279     }
280     if (state != null) {
281       try {
282         state.setCurrent(R66FiniteDualStates.CLOSEDCHANNEL);
283       } catch (final IllegalFiniteStateException ignored) {
284         // nothing
285       }
286       R66FiniteDualStates.endSessionMachineSate(state);
287     }
288   }
289 
290   public final void partialClear() {
291     // First check if a transfer was on going
292     if (runner != null && !runner.isFinished() && !runner.continueTransfer()) {
293       if (localChannelReference != null) {
294         if (!localChannelReference.getFutureRequest().isDone()) {
295           final R66Result result = new R66Result(
296               new OpenR66RunnerErrorException("Close before ending"), this,
297               true, ErrorCode.Disconnection,
298               runner);// True since called from closed
299           result.setRunner(runner);
300           try {
301             setFinalizeTransfer(false, result);
302           } catch (final OpenR66RunnerErrorException ignored) {
303             // nothing
304           } catch (final OpenR66ProtocolSystemException ignored) {
305             // nothing
306           }
307         }
308       }
309     }
310     // No clean of file since it can be used after channel is closed
311     isReady = false;
312     if (businessObject != null) {
313       businessObject.releaseResources(this);
314       businessObject = null;
315     }
316     digestBlock = null;
317     if (reusableBuffer != null && reusableBufferStatic == null) {
318       reusableBufferStatic = reusableBuffer;
319     }
320     if (reusableDataPacketBuffer != null &&
321         reusableDataPacketBufferStatic == null) {
322       reusableDataPacketBufferStatic = reusableDataPacketBuffer;
323     }
324     if (reusableCompressionBuffer != null &&
325         reusableCompressionBufferStatic == null) {
326       reusableCompressionBufferStatic = reusableCompressionBuffer;
327     }
328     reusableBuffer = null;
329     reusableDataPacketBuffer = null;
330     reusableCompressionBuffer = null;
331   }
332 
333   @Override
334   public final R66Auth getAuth() {
335     return auth;
336   }
337 
338   @Override
339   public final int getBlockSize() {
340     return blockSize;
341   }
342 
343   /**
344    * @param blocksize the blocksize to set
345    */
346   public final void setBlockSize(final int blocksize) {
347     blockSize = blocksize;
348     compressionMaxSize = WaarpZstdCodec.getMaxCompressedSize(blockSize);
349   }
350 
351   private SoftReference<byte[]> getBuffer(SoftReference<byte[]> softReference,
352                                           final int length) {
353     if (softReference == null) {
354       softReference = new SoftReference<byte[]>(new byte[length]);
355       return softReference;
356     }
357     final byte[] buffer = softReference.get();
358     if (buffer != null && buffer.length >= length) {
359       return softReference;
360     }
361     softReference.clear();
362     softReference = new SoftReference<byte[]>(new byte[length]);
363     return softReference;
364   }
365 
366   /**
367    * @param length the target size
368    *
369    * @return the reusable buffer for sending per Session
370    */
371   public final byte[] getReusableBuffer(final int length) {
372     reusableBuffer = getBuffer(reusableBuffer, length);
373     return reusableBuffer.get();
374   }
375 
376   /**
377    * @param length the target size
378    *
379    * @return the reusable buffer for received DataPacket per Session
380    */
381   public final byte[] getReusableDataPacketBuffer(final int length) {
382     reusableDataPacketBuffer = getBuffer(reusableDataPacketBuffer, length);
383     return reusableDataPacketBuffer.get();
384   }
385 
386   /**
387    * @param length the original size
388    *
389    * @return the possible reusable compression buffer per Session
390    */
391   public final byte[] getSessionReusableCompressionBuffer(final int length) {
392     reusableCompressionBuffer = getBuffer(reusableCompressionBuffer,
393                                           WaarpZstdCodec.getMaxCompressedSize(
394                                               length));
395     return reusableCompressionBuffer.get();
396   }
397 
398   /**
399    * @return True if compression is enabled in this session
400    */
401   public final boolean isCompressionEnabled() {
402     return isCompressionEnabled;
403   }
404 
405   /**
406    * @param compressionEnabled True if compression is enabled in this session
407    */
408   public final void setCompressionEnabled(final boolean compressionEnabled) {
409     logger.debug("Compression enabled? {} => {}", isCompressionEnabled,
410                  compressionEnabled);
411     isCompressionEnabled = compressionEnabled;
412     if (isCompressionEnabled && reusableCompressionBuffer == null) {
413       synchronized (logger) {
414         reusableCompressionBuffer = reusableCompressionBufferStatic;
415         reusableCompressionBufferStatic = null;
416       }
417     } else if (!isCompressionEnabled && reusableCompressionBuffer != null) {
418       synchronized (logger) {
419         reusableDataPacketBufferStatic = reusableCompressionBuffer;
420         reusableCompressionBuffer = null;
421       }
422     }
423   }
424 
425   /**
426    * @return the current max compression size according to block size
427    */
428   public final int getCompressionMaxSize() {
429     return compressionMaxSize;
430   }
431 
432   /**
433    * Initialize per block digest
434    */
435   public final void initializeDigest() {
436     if (digestBlock == null && RequestPacket.isMD5Mode(getRunner().getMode())) {
437       try {
438         digestBlock = new FilesystemBasedDigest(
439             localChannelReference.getPartner().getDigestAlgo());
440       } catch (final NoSuchAlgorithmException e) {
441         SysErrLogger.FAKE_LOGGER.ignoreLog(e);
442       }
443     }
444   }
445 
446   /**
447    * @return the digest used per block
448    */
449   public final FilesystemBasedDigest getDigestBlock() {
450     return digestBlock;
451   }
452 
453   @Override
454   public final R66Dir getDir() {
455     return dir;
456   }
457 
458   @Override
459   public final FilesystemBasedFileParameterImpl getFileParameter() {
460     return Configuration.getFileParameter();
461   }
462 
463   @Override
464   public final R66Restart getRestart() {
465     return restart;
466   }
467 
468   /**
469    * @return True if the connection is currently authenticated
470    */
471   public final boolean isAuthenticated() {
472     if (auth == null) {
473       return false;
474     }
475     return auth.isIdentified();
476   }
477 
478   /**
479    * @return True if the Channel is ready to accept transfer
480    */
481   public final boolean isReady() {
482     return isReady;
483   }
484 
485   /**
486    * @param isReady the isReady for transfer to set
487    */
488   public final void setReady(final boolean isReady) {
489     this.isReady = isReady;
490   }
491 
492   /**
493    * @return the runner
494    */
495   public final DbTaskRunner getRunner() {
496     return runner;
497   }
498 
499   /**
500    * @param localChannelReference the localChannelReference to set
501    */
502   public final void setLocalChannelReference(
503       final LocalChannelReference localChannelReference) {
504     this.localChannelReference = localChannelReference;
505     this.localChannelReference.setSession(this);
506     if (this.localChannelReference.getNetworkChannel() != null) {
507       raddress = this.localChannelReference.getNetworkChannel().remoteAddress();
508       laddress = this.localChannelReference.getNetworkChannel().localAddress();
509     } else {
510       raddress = laddress = new InetSocketAddress(0);
511     }
512   }
513 
514   /**
515    * @return the remote SocketAddress
516    */
517   public final SocketAddress getRemoteAddress() {
518     return raddress;
519   }
520 
521   /**
522    * @return the local SocketAddress
523    */
524   public final SocketAddress getLocalAddress() {
525     return laddress;
526   }
527 
528   /**
529    * @return the localChannelReference
530    */
531   public final LocalChannelReference getLocalChannelReference() {
532     return localChannelReference;
533   }
534 
535   /**
536    * To be called in case of No Session not from a valid LocalChannelHandler
537    *
538    * @param runner
539    * @param localChannelReference
540    */
541   public final void setNoSessionRunner(final DbTaskRunner runner,
542                                        final LocalChannelReference localChannelReference) {
543     this.runner = runner;
544     // Warning: the file is not correctly setup
545     auth.specialNoSessionAuth(false, Configuration.configuration.getHostId());
546     try {
547       file = (R66File) dir.setFile(this.runner.getFilename(), false);
548     } catch (final CommandAbstractException ignored) {
549       // nothing
550     }
551     this.localChannelReference = localChannelReference;
552     if (this.localChannelReference == null) {
553       if (this.runner.getLocalChannelReference() != null) {
554         this.localChannelReference = this.runner.getLocalChannelReference();
555       } else {
556         this.localChannelReference = new LocalChannelReference();
557       }
558       this.localChannelReference.setErrorMessage(
559           this.runner.getErrorInfo().getMesg(), this.runner.getErrorInfo());
560     }
561     runner.setLocalChannelReference(this.localChannelReference);
562     this.localChannelReference.setSession(this);
563   }
564 
565   /**
566    * Set the File from the runner before PRE operation are done
567    *
568    * @throws OpenR66RunnerErrorException
569    */
570   public final void setFileBeforePreRunner()
571       throws OpenR66RunnerErrorException {
572     // check first if the next step is the PRE task from beginning
573     try {
574       file = FileUtils.getFile(logger, this, runner.getOriginalFilename(),
575                                runner.isPreTaskStarting(), isSender,
576                                runner.isSendThrough(), file);
577     } catch (final OpenR66RunnerErrorException e) {
578       runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
579       throw e;
580     }
581     if (isSender && !runner.isSendThrough()) {
582       // possibly resolved filename
583       try {
584         runner.setOriginalFilename(file.getFile());
585         runner.setFilename(file.getFile());
586         logger.debug("Old size: {} => {}", runner.getOriginalSize(),
587                      file.length());
588         if (runner.getOriginalSize() <= 0) {
589           final long originalSize = file.length();
590           if (originalSize > 0) {
591             runner.setOriginalSize(originalSize);
592           }
593         }
594       } catch (final CommandAbstractException e) {
595         throw new OpenR66RunnerErrorException(e);
596       }
597     }
598   }
599 
600   /**
601    * Set the File from the runner once PRE operation are done, only for Receiver
602    *
603    * @param createFile When True, the file can be newly created if
604    *     needed.
605    *     If False, no new file will be
606    *     created, thus having an Exception.
607    *
608    * @throws OpenR66RunnerErrorException
609    * @throws CommandAbstractException only when new received created
610    *     file
611    *     cannot be created
612    */
613   public final void setFileAfterPreRunnerReceiver(final boolean createFile)
614       throws OpenR66RunnerErrorException, CommandAbstractException {
615     if (businessObject != null) {
616       businessObject.checkAtChangeFilename(this);
617     }
618     // File should not exist except if restart
619     if (runner.getRank() > 0) {
620       // Filename should be get back from runner load from database
621       try {
622         file = (R66File) dir.setFile(runner.getFilename(), true);
623         if (runner.isRecvThrough()) {
624           // no test on file since it does not really exist
625           logger.debug(FILE_IS_IN_THROUGH_MODE, file);
626         } else if (!file.canWrite()) {
627           throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
628         }
629       } catch (final CommandAbstractException e) {
630         throw new OpenR66RunnerErrorException(e);
631       }
632     } else {
633       // New FILENAME if necessary and store it
634       if (createFile) {
635         file = null;
636         String newfilename = runner.getOriginalFilename();
637         if (newfilename.charAt(1) == ':') {
638           // Windows path
639           newfilename = newfilename.substring(2);
640         }
641         newfilename = R66File.getBasename(newfilename);
642         try {
643           file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
644           runner.setFilename(file.getBasename());
645         } catch (final CommandAbstractException e) {
646           runner.deleteTempFile();
647           throw e;
648         }
649         try {
650           if (runner.isRecvThrough()) {
651             // no test on file since it does not really exist
652             logger.debug(FILE_IS_IN_THROUGH_MODE, file);
653             runner.deleteTempFile();
654           } else if (!file.canWrite()) {
655             runner.deleteTempFile();
656             throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
657           }
658         } catch (final CommandAbstractException e) {
659           runner.deleteTempFile();
660           throw new OpenR66RunnerErrorException(e);
661         }
662       } else {
663         throw new OpenR66RunnerErrorException("No file created");
664       }
665     }
666     // Store TRUEFILENAME
667     try {
668       if (runner.isFileMoved()) {
669         runner.setFileMoved(file.getFile(), true);
670       } else {
671         runner.setFilename(file.getFile());
672       }
673     } catch (final CommandAbstractException e) {
674       runner.deleteTempFile();
675       throw new OpenR66RunnerErrorException(e);
676     }
677   }
678 
679   /**
680    * Set the File from the runner once PRE operation are done
681    *
682    * @param createFile When True, the file can be newly created if
683    *     needed.
684    *     If False, no new file will be
685    *     created, thus having an Exception.
686    *
687    * @throws OpenR66RunnerErrorException
688    * @throws CommandAbstractException only when new received created
689    *     file
690    *     cannot be created
691    */
692   public final void setFileAfterPreRunner(final boolean createFile)
693       throws OpenR66RunnerErrorException, CommandAbstractException {
694     if (businessObject != null) {
695       businessObject.checkAtChangeFilename(this);
696     }
697     // Now create the associated file
698     if (isSender != runner.isSender()) {
699       logger.warn("Not same SenderSide {} {}", isSender, runner.isSender());
700     }
701     if (isSender) {
702       try {
703         if (file == null) {
704           try {
705             file = (R66File) dir.setFile(runner.getFilename(), false);
706           } catch (final CommandAbstractException e) {
707             // file is not under normal base directory, so is external
708             // File must already exist but can be using special code ('*?')
709             file = dir.setFileNoCheck(runner.getFilename());
710           }
711         }
712         if (runner.isSendThrough()) {
713           // no test on file since it does not really exist
714           logger.debug(FILE_IS_IN_THROUGH_MODE, file);
715         } else if (!file.canRead()) {
716           // file is not under normal base directory, so is external
717           // File must already exist but cannot used special code ('*?')
718           final R66File newFile = new R66File(this, dir, runner.getFilename());
719           if (!newFile.canRead()) {
720             runner.setErrorExecutionStatus(ErrorCode.FileNotFound);
721             throw new OpenR66RunnerErrorException(
722                 "File cannot be read: " + file.getTrueFile().getAbsolutePath() +
723                 " to " + newFile.getTrueFile().getAbsolutePath());
724           }
725         }
726       } catch (final CommandAbstractException e) {
727         throw new OpenR66RunnerErrorException(e);
728       }
729     } else {
730       // File should not exist except if restart
731       if (runner.getRank() > 0) {
732         // Filename should be get back from runner load from database
733         try {
734           file = (R66File) dir.setFile(runner.getFilename(), true);
735           if (runner.isRecvThrough()) {
736             // no test on file since it does not really exist
737             logger.debug(FILE_IS_IN_THROUGH_MODE, file);
738           } else if (!file.canWrite()) {
739             throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
740           }
741         } catch (final CommandAbstractException e) {
742           throw new OpenR66RunnerErrorException(e);
743         }
744       } else {
745         // New FILENAME if necessary and store it
746         if (createFile) {
747           file = null;
748           String newfilename = runner.getOriginalFilename();
749           if (newfilename.charAt(1) == ':') {
750             // Windows path
751             newfilename = newfilename.substring(2);
752           }
753           newfilename = R66File.getBasename(newfilename);
754           try {
755             file = dir.setUniqueFile(runner.getSpecialId(), newfilename);
756             runner.setFilename(file.getBasename());
757           } catch (final CommandAbstractException e) {
758             runner.deleteTempFile();
759             throw e;
760           }
761           try {
762             if (runner.isRecvThrough()) {
763               // no test on file since it does not really exist
764               logger.debug(FILE_IS_IN_THROUGH_MODE, file);
765               runner.deleteTempFile();
766             } else if (!file.canWrite()) {
767               runner.deleteTempFile();
768               throw new OpenR66RunnerErrorException(FILE_CANNOT_BE_WRITE);
769             }
770           } catch (final CommandAbstractException e) {
771             runner.deleteTempFile();
772             throw new OpenR66RunnerErrorException(e);
773           }
774         } else {
775           throw new OpenR66RunnerErrorException("No file created");
776         }
777       }
778     }
779     // Store TRUEFILENAME
780     try {
781       if (runner.isFileMoved()) {
782         runner.setFileMoved(file.getFile(), true);
783       } else {
784         runner.setFilename(file.getFile());
785       }
786     } catch (final CommandAbstractException e) {
787       runner.deleteTempFile();
788       throw new OpenR66RunnerErrorException(e);
789     }
790     // check fileSize
791     if (isSender && file != null) {
792       logger.debug("could change size: {} => {}", runner.getOriginalSize(),
793                    file.length());
794       if (runner.getOriginalSize() < 0) {
795         final long originalSize = file.length();
796         if (originalSize > 0) {
797           runner.setOriginalSize(originalSize);
798         }
799       }
800     }
801   }
802 
803   /**
804    * To be used when a request comes with a bad code so it cannot be set
805    * normally
806    *
807    * @param runner
808    * @param code
809    */
810   public final void setBadRunner(final DbTaskRunner runner,
811                                  final ErrorCode code) {
812     this.runner = runner;
813     if (code == ErrorCode.QueryAlreadyFinished) {
814       if (this.runner.isSender()) {
815         // Change dir
816         try {
817           dir.changeDirectory(this.runner.getRule().getSendPath());
818         } catch (final CommandAbstractException ignored) {
819           // nothing
820         }
821       } else {
822         // Change dir
823         try {
824           dir.changeDirectory(this.runner.getRule().getWorkPath());
825         } catch (final CommandAbstractException ignored) {
826           // nothing
827         }
828       }
829       if (businessObject != null) {
830         businessObject.checkAtError(this);
831       }
832       this.runner.setPostTask();
833       try {
834         setFileAfterPreRunner(false);
835       } catch (final OpenR66RunnerErrorException ignored) {
836         // nothing
837       } catch (final CommandAbstractException ignored) {
838         // nothing
839       }
840     }
841   }
842 
843   /**
844    * Set the runner, and setup the directory first.
845    * <p>
846    * This call should be followed by a startup() call.
847    *
848    * @param runner the runner to set
849    *
850    * @throws OpenR66RunnerErrorException
851    */
852   public final void setRunner(final DbTaskRunner runner)
853       throws OpenR66RunnerErrorException {
854     this.runner = runner;
855     if (localChannelReference != null) {
856       this.runner.setLocalChannelReference(localChannelReference);
857     }
858     this.isSender = runner.isSender();
859     logger.debug("Runner to set: {} {}", runner.shallIgnoreSave(), runner);
860     this.runner.checkThroughMode();
861     if (businessObject != null) {
862       businessObject.checkAtStartup(this);
863     }
864     if (this.runner.isSender()) {
865       if (runner.isSendThrough()) {
866         // May not change dir as needed
867         // Change dir
868         try {
869           dir.changeDirectory(this.runner.getRule().getSendPath());
870         } catch (final CommandAbstractException e) {
871           // ignore
872         }
873       } else {
874         // Change dir
875         try {
876           dir.changeDirectory(this.runner.getRule().getSendPath());
877         } catch (final CommandAbstractException e) {
878           throw new OpenR66RunnerErrorException(e);
879         }
880       }
881     } else {
882       if (runner.isRecvThrough()) {
883         // May not change dir as needed
884         // Change dir
885         try {
886           dir.changeDirectory(this.runner.getRule().getWorkPath());
887         } catch (final CommandAbstractException ignored) {
888           // nothing
889         }
890       } else {
891         // Change dir
892         try {
893           dir.changeDirectory(this.runner.getRule().getWorkPath());
894         } catch (final CommandAbstractException e) {
895           throw new OpenR66RunnerErrorException(e);
896         }
897       }
898     }
899     logger.debug("Dir is: {}", dir.getFullPath());
900   }
901 
902   /**
903    * START from the PreTask if necessary, and prepare the file
904    *
905    * @param checkNotExternal if True, the file as Sender should not be
906    *     external to current directory
907    *
908    * @throws OpenR66RunnerErrorException
909    */
910   public final void startup(final boolean checkNotExternal)
911       throws OpenR66RunnerErrorException {
912     setRestartMarker();
913     logger.debug("GlobalLastStep: {} vs {}:{}", runner.getGloballaststep(),
914                  TASKSTEP.NOTASK.ordinal(), TASKSTEP.PRETASK.ordinal());
915     initializeTransfer(checkNotExternal);
916     // Now create the associated file
917     try {
918       setFileAfterPreRunner(true);
919     } catch (final CommandAbstractException e2) {
920       // generated due to a possible wildcard not ready
921       file = null;
922     }
923     logger.debug("GlobalLastStep: {}", runner.getGloballaststep());
924     if (runner.getGloballaststep() == TASKSTEP.TRANSFERTASK.ordinal()) {
925       if (businessObject != null) {
926         businessObject.checkAfterPreCommand(this);
927       }
928       if (!runner.isSender()) {
929         if (initializeReceiver()) {
930           return;
931         }
932       } else {
933         initializeSender();
934       }
935     }
936     runner.saveStatus();
937     logger.debug("Final init: {} {}", runner, file != null);
938   }
939 
940   private void initializeSender() throws OpenR66RunnerErrorException {
941     if (file != null) {
942       try {
943         localChannelReference.getFutureRequest().setFilesize(file.length());
944       } catch (final CommandAbstractException ignored) {
945         // nothing
946       }
947       try {
948         file.restartMarker(restart);
949       } catch (final CommandAbstractException e) {
950         runner.deleteTempFile();
951         throw new OpenR66RunnerErrorException(e);
952       }
953     }
954   }
955 
956   private boolean initializeReceiver() throws OpenR66RunnerErrorException {
957     // Check file length according to rank
958     if (runner.isRecvThrough()) {
959       // no size can be checked
960     } else {
961       long length = 0;
962       if (file != null) {
963         try {
964           length = file.length();
965         } catch (final CommandAbstractException ignored) {
966           // nothing
967         }
968       }
969       final long needed = runner.getOriginalSize() - length;
970       long available = 0;
971       String targetDir = null;
972       try {
973         available = dir.getFreeSpace();
974         targetDir = dir.getPwd();
975       } catch (final CommandAbstractException ignored) {
976         // nothing
977       }
978       if (file != null) {
979         final File truefile = file.getTrueFile().getParentFile();
980         available = truefile.getFreeSpace();
981         targetDir = truefile.getPath();
982       }
983       logger.debug("Check available space: {} >? {}(+{})", available, needed,
984                    length);
985       // Available > 0 since some system returns 0 (wrong size)
986       if (available > 0 && needed > available) {
987         // not enough space
988         runner.setErrorExecutionStatus(ErrorCode.Internal);
989         throw new OpenR66RunnerErrorException(
990             "File cannot be written due to unsufficient space available: " +
991             targetDir + " need " + needed + " more while available is " +
992             available);
993       }
994       if (file == null) {
995         runner.saveStatus();
996         logger.info("Final PARTIAL init: {}", runner);
997         return true;
998       }
999       checkPosition(length);
1000     }
1001     return false;
1002   }
1003 
1004   private void checkPosition(final long length)
1005       throws OpenR66RunnerErrorException {
1006     // First check available space
1007     try {
1008       final long oldPosition = restart.getPosition();
1009       restart.setSet(true);
1010       if (oldPosition > length) {
1011         int newRank = (int) (length / runner.getBlocksize()) -
1012                       Configuration.getRankRestart();
1013         if (newRank <= 0) {
1014           newRank = 1;
1015         }
1016         logger.info("OldPos: {}:{} curLength: {}:{}", oldPosition,
1017                     runner.getRank(), length, newRank);
1018         logger.warn("Decreased Rank Restart for {} at " + newRank, runner);
1019         runner.setTransferTask(newRank);
1020         restart.restartMarker(runner.getBlocksize() * runner.getRank());
1021       }
1022       try {
1023         file.restartMarker(restart);
1024       } catch (final CommandAbstractException e) {
1025         runner.deleteTempFile();
1026         throw new OpenR66RunnerErrorException(e);
1027       }
1028     } catch (final NoRestartException e) {
1029       // length is not to be changed
1030     }
1031   }
1032 
1033   private void initializeTransfer(final boolean checkNotExternal)
1034       throws OpenR66RunnerErrorException {
1035     if (runner.isSelfRequest() && runner.getRule().isSendMode()) {
1036       // It might be the sender and initiator side, already changed by
1037       // receiver, reset globalLastStep and globalStep
1038       runner.setInitialTask();
1039     }
1040     if (runner.getGloballaststep() == TASKSTEP.NOTASK.ordinal() ||
1041         runner.getGloballaststep() == TASKSTEP.PRETASK.ordinal()) {
1042       setFileBeforePreRunner();
1043       if (runner.isSender() && !runner.isSendThrough() && file != null &&
1044           checkNotExternal) {
1045         String path = null;
1046         try {
1047           path = file.getFile();
1048         } catch (final CommandAbstractException ignored) {
1049           // nothing
1050         }
1051         if (file.isExternal() ||
1052             path != null && !dir.isPathInCurrentDir(path)) {
1053           // should not be
1054           logger.error(
1055               "File cannot be found in the current output directory: {} not in {}",
1056               file, dir);
1057           runner.setErrorExecutionStatus(ErrorCode.FileNotAllowed);
1058           throw new OpenR66RunnerErrorException(
1059               "File cannot be found in the current output directory");
1060         }
1061       }
1062       runner.setPreTask();
1063       runner.run();
1064       if (runner.isSender() && !runner.isSendThrough()) {
1065         if (file != null) {
1066           try {
1067             final long originalSize = file.length();
1068             if (originalSize > 0) {
1069               runner.setOriginalSize(originalSize);
1070             }
1071           } catch (final CommandAbstractException e) {
1072             // ignore
1073           }
1074         }
1075       }
1076       runner.setTransferTask(runner.getRank());
1077     } else {
1078       runner.reset();
1079       runner.changeUpdatedInfo(UpdatedInfo.RUNNING);
1080     }
1081   }
1082 
1083   private void setRestartMarker() {
1084     if (runner.getRank() > 0) {
1085       logger.debug("restart at {} {}", runner.getRank(), runner);
1086       logger.debug("restart at {} {}", runner.getRank(), dir);
1087       runner.setTransferTask(runner.getRank());
1088       restart.restartMarker(runner.getBlocksize() * runner.getRank());
1089     } else {
1090       restart.restartMarker(0);
1091     }
1092   }
1093 
1094   /**
1095    * Rename the current receive file from the very beginning since the sender
1096    * has a post action that changes its
1097    * name
1098    *
1099    * @param newFilename
1100    *
1101    * @throws OpenR66RunnerErrorException
1102    */
1103   public final void renameReceiverFile(final String newFilename)
1104       throws OpenR66RunnerErrorException {
1105     if (runner == null) {
1106       return;
1107     }
1108     // First delete the temporary file if needed
1109     if (runner.getRank() > 0) {
1110       logger.error(
1111           "Renaming file is not correct since transfer does not start from first block");
1112       // Not correct
1113       throw new OpenR66RunnerErrorException(
1114           "Renaming file not correct since transfer already started");
1115     }
1116     if (!runner.isRecvThrough()) {
1117       runner.deleteTempFile();
1118     }
1119     // Now rename it
1120     runner.setOriginalFilename(newFilename);
1121     try {
1122       setFileAfterPreRunnerReceiver(true);
1123     } catch (final CommandAbstractException e) {
1124       throw new OpenR66RunnerErrorException(e);
1125     }
1126   }
1127 
1128   /**
1129    * Finalize the transfer step by running the error or post operation
1130    * according
1131    * to the status.
1132    *
1133    * @param status
1134    * @param finalValue
1135    *
1136    * @throws OpenR66RunnerErrorException
1137    * @throws OpenR66ProtocolSystemException
1138    */
1139   public final void setFinalizeTransfer(final boolean status,
1140                                         final R66Result finalValue)
1141       throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
1142     logger.debug("{}:{}:{}", status, finalValue, runner);
1143     if (runner == null) {
1144       if (localChannelReference != null) {
1145         if (status) {
1146           localChannelReference.validateRequest(finalValue);
1147         } else {
1148           localChannelReference.invalidateRequest(finalValue);
1149         }
1150       }
1151       if (businessObject != null) {
1152         if (status) {
1153           businessObject.checkAfterTransfer(this);
1154         } else {
1155           businessObject.checkAtError(this);
1156         }
1157       }
1158       return;
1159     }
1160     if (businessObject != null) {
1161       if (status) {
1162         businessObject.checkAfterTransfer(this);
1163       } else {
1164         businessObject.checkAtError(this);
1165       }
1166     }
1167     if (runner.isAllDone()) {
1168       if (logger.isDebugEnabled()) {
1169         logger.debug("Transfer already done but {} on {} {}", status, file,
1170                      runner.toShortString(),
1171                      new OpenR66RunnerErrorException(finalValue.toString()));
1172       }
1173       // FIXME ??
1174       /*
1175        * if (! status) runner.finalizeTransfer(localChannelReference, file, finalValue, status)
1176        */
1177       return;
1178     }
1179     if (runner.isInError()) {
1180       if (logger.isDebugEnabled()) {
1181         logger.debug("Transfer already done in error but {} on {} {}", status,
1182                      file, runner.toShortString(),
1183                      new OpenR66RunnerErrorException(finalValue.toString()));
1184       }
1185       // FIXME ??
1186       /*
1187        * if (! status) runner.finalizeTransfer(localChannelReference, file, finalValue, status)
1188        */
1189       return;
1190     }
1191     if (localChannelReference.getFutureRequest().isDone()) {
1192       if (logger.isDebugEnabled()) {
1193         logger.debug("Request already done but {} on {} {}" + status, file,
1194                      runner.toShortString(),
1195                      new OpenR66RunnerErrorException(finalValue.toString()));
1196       }
1197       // Already finished once so do nothing more
1198       return;
1199     }
1200     if (!status) {
1201       runner.deleteTempFile();
1202       runner.setErrorExecutionStatus(finalValue.getCode());
1203     }
1204     if (status) {
1205       runner.finishTransferTask(ErrorCode.TransferOk);
1206     } else {
1207       runner.finishTransferTask(finalValue.getCode());
1208     }
1209     logger.debug("Transfer {} on {} and {}", status, file, runner);
1210     if (!runner.ready()) {
1211       // Pre task in error (or even before)
1212       final OpenR66RunnerErrorException runnerErrorException;
1213       if (!status && finalValue.getException() != null) {
1214         runnerErrorException = new OpenR66RunnerErrorException(
1215             "Pre task in error (or even before)", finalValue.getException());
1216       } else {
1217         runnerErrorException = new OpenR66RunnerErrorException(
1218             "Pre task in error (or even before)");
1219       }
1220       finalValue.setException(runnerErrorException);
1221       logger.info("Pre task in error (or even before) : {}",
1222                   runnerErrorException.getMessage());
1223       if (Configuration.configuration.isExecuteErrorBeforeTransferAllowed()) {
1224         runner.finalizeTransfer(localChannelReference, file, finalValue,
1225                                 status);
1226       }
1227       runner.saveStatus();
1228       localChannelReference.invalidateRequest(finalValue);
1229       throw runnerErrorException;
1230     }
1231     try {
1232       if (file != null) {
1233         file.closeFile();
1234       }
1235     } catch (final CommandAbstractException e1) {
1236       R66Result result = finalValue;
1237       if (status) {
1238         result = new R66Result(new OpenR66RunnerErrorException(e1), this, false,
1239                                ErrorCode.Internal, runner);
1240       }
1241       localChannelReference.invalidateRequest(result);
1242       throw (OpenR66RunnerErrorException) result.getException();
1243     }
1244     runner.finalizeTransfer(localChannelReference, file, finalValue, status);
1245     if (businessObject != null) {
1246       businessObject.checkAfterPost(this);
1247     }
1248   }
1249 
1250   /**
1251    * Try to finalize the request if possible
1252    *
1253    * @param errorValue
1254    *
1255    * @throws OpenR66RunnerErrorException
1256    * @throws OpenR66ProtocolSystemException
1257    */
1258   public final void tryFinalizeRequest(final R66Result errorValue)
1259       throws OpenR66RunnerErrorException, OpenR66ProtocolSystemException {
1260     if (getLocalChannelReference() == null) {
1261       return;
1262     }
1263     if (getLocalChannelReference().getFutureRequest().isDone()) {
1264       return;
1265     }
1266     if (runner == null) {
1267       localChannelReference.invalidateRequest(errorValue);
1268       return;
1269     }
1270     // do the real end
1271     if (runner.getStatus() == ErrorCode.CompleteOk) {
1272       runner.setAllDone();
1273       runner.forceSaveStatus();
1274       localChannelReference.validateRequest(
1275           new R66Result(this, true, ErrorCode.CompleteOk, runner));
1276     } else if (runner.getStatus() == ErrorCode.TransferOk &&
1277                (!runner.isSender() ||
1278                 errorValue.getCode() == ErrorCode.QueryAlreadyFinished)) {
1279       // Try to finalize it
1280       try {
1281         setFinalizeTransfer(true,
1282                             new R66Result(this, true, ErrorCode.CompleteOk,
1283                                           runner));
1284         localChannelReference.validateRequest(
1285             localChannelReference.getFutureEndTransfer().getResult());
1286       } catch (final OpenR66ProtocolSystemException e) {
1287         logger.error("Cannot validate runner:     {}", runner.toShortString());
1288         runner.changeUpdatedInfo(UpdatedInfo.INERROR);
1289         runner.setErrorExecutionStatus(errorValue.getCode());
1290         runner.forceSaveStatus();
1291         setFinalizeTransfer(false, errorValue);
1292       } catch (final OpenR66RunnerErrorException e) {
1293         logger.error("Cannot validate runner:     {}", runner.toShortString());
1294         runner.changeUpdatedInfo(UpdatedInfo.INERROR);
1295         runner.setErrorExecutionStatus(errorValue.getCode());
1296         runner.forceSaveStatus();
1297         setFinalizeTransfer(false, errorValue);
1298       }
1299     } else {
1300       // invalidate Request
1301       logger.error(
1302           "Runner {} will be shutdown while in status {} and future status will be {}",
1303           runner.getSpecialId(), runner.getStatus().getMesg(),
1304           errorValue.getCode().getMesg());
1305       setFinalizeTransfer(false, errorValue);
1306     }
1307   }
1308 
1309   /**
1310    * @return the file
1311    */
1312   public final R66File getFile() {
1313     return file;
1314   }
1315 
1316   /**
1317    * @return True if the number of Error is still acceptable
1318    */
1319   public final boolean addError() {
1320     final int value = numOfError.incrementAndGet();
1321     return value < Configuration.RETRYNB;
1322   }
1323 
1324   @Override
1325   public final String toString() {
1326     return "Session: FS[" + state.getCurrent() + "] " + status + "  " +
1327            (auth != null? auth.toString() : "no Auth") + "     " +
1328            (dir != null? dir.toString() : "no Dir") + "     " +
1329            (file != null? file.toString() : "no File") + "     " +
1330            (runner != null? runner.toShortString() : "no Runner");
1331   }
1332 
1333   @Override
1334   public final String getUniqueExtension() {
1335     return Configuration.EXT_R66;
1336   }
1337 
1338   /**
1339    * @return the dirsFromSession
1340    */
1341   public final HashMap<String, R66Dir> getDirsFromSession() {
1342     return dirsFromSession;
1343   }
1344 
1345   /**
1346    * @return True if according to session, it is the sender side (to byPass
1347    *     send to itself issue)
1348    */
1349   public final boolean isSender() {
1350     return isSender;
1351   }
1352 }