View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.commander;
21  
22  import org.waarp.common.command.exception.CommandAbstractException;
23  import org.waarp.common.database.data.AbstractDbData;
24  import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
25  import org.waarp.common.database.exception.WaarpDatabaseException;
26  import org.waarp.common.logging.SysErrLogger;
27  import org.waarp.common.logging.WaarpLogger;
28  import org.waarp.common.logging.WaarpLoggerFactory;
29  import org.waarp.openr66.client.RecvThroughHandler;
30  import org.waarp.openr66.context.ErrorCode;
31  import org.waarp.openr66.context.R66FiniteDualStates;
32  import org.waarp.openr66.context.R66Result;
33  import org.waarp.openr66.context.R66Session;
34  import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
35  import org.waarp.openr66.database.DbConstantR66;
36  import org.waarp.openr66.database.data.DbHostAuth;
37  import org.waarp.openr66.database.data.DbTaskRunner;
38  import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
39  import org.waarp.openr66.protocol.configuration.Configuration;
40  import org.waarp.openr66.protocol.configuration.Messages;
41  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
42  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoSslException;
43  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNotAuthenticatedException;
44  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
45  import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
46  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
47  import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
48  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
49  import org.waarp.openr66.protocol.utils.ChannelUtils;
50  import org.waarp.openr66.protocol.utils.R66Future;
51  import org.waarp.openr66.protocol.utils.TransferUtils;
52  
53  import java.io.File;
54  import java.net.SocketAddress;
55  import java.util.concurrent.ConcurrentHashMap;
56  import java.util.concurrent.ConcurrentLinkedQueue;
57  
58  /**
59   * Client Runner from a TaskRunner
60   */
61  public class ClientRunner extends Thread {
62    private static final String CANNOT_CONNECT_TO_SERVER =
63        "Cannot connect to server ";
64  
65    private static final String REQUEST_INFORMATION_FAILURE =
66        "RequestInformation.Failure";
67  
68    private static final String REQUEST_INFORMATION_SUCCESS =
69        "RequestInformation.Success";
70  
71    private static final String TRANSFER_STATUS = "Transfer.Status";
72  
73    /**
74     * Internal Logger
75     */
76    private static final WaarpLogger logger =
77        WaarpLoggerFactory.getLogger(ClientRunner.class);
78  
79    private static final ConcurrentHashMap<String, Integer>
80        taskRunnerRetryHashMap = new ConcurrentHashMap<String, Integer>();
81  
82    private static RecvThroughHandler staticRecvHandlerJunit = null;
83  
84    public static ConcurrentLinkedQueue<ClientRunner> activeRunners;
85  
86    private final NetworkTransaction networkTransaction;
87  
88    private final DbTaskRunner taskRunner;
89  
90    private final R66Future futureRequest;
91  
92    private RecvThroughHandler handler = null;
93  
94    private boolean isSendThroughMode;
95  
96    private LocalChannelReference localChannelReference;
97  
98    private final String nameTask;
99  
100   private boolean limitRetryConnection = true;
101 
102   public static void setRecvHandlerJunit(final RecvThroughHandler handler) {
103     staticRecvHandlerJunit = handler;
104   }
105 
106   public static boolean isRecvHandlerJunit() {
107     return staticRecvHandlerJunit != null;
108   }
109 
110   public ClientRunner(final NetworkTransaction networkTransaction,
111                       final DbTaskRunner taskRunner,
112                       final R66Future futureRequest) {
113     this.networkTransaction = networkTransaction;
114     this.taskRunner = taskRunner;
115     this.futureRequest = futureRequest;
116     setDaemon(true);
117     nameTask = "Client_Runner_" + taskRunner.getKey();
118     setName(nameTask);
119     if (staticRecvHandlerJunit != null) {
120       this.handler = staticRecvHandlerJunit;
121     }
122   }
123 
124   public static String hashStatus() {
125     return "ClientRunner: [taskRunnerRetryHashMap: " +
126            taskRunnerRetryHashMap.size() + " activeRunners: " +
127            (activeRunners != null? activeRunners.size() :
128                Configuration.configuration.getInternalRunner()
129                                           .nbInternalRunner()) + "] ";
130   }
131 
132   /**
133    * @return the networkTransaction
134    */
135   public final NetworkTransaction getNetworkTransaction() {
136     return networkTransaction;
137   }
138 
139   /**
140    * @return the taskRunner
141    */
142   public final DbTaskRunner getTaskRunner() {
143     return taskRunner;
144   }
145 
146   /**
147    * @return the localChannelReference
148    */
149   public final LocalChannelReference getLocalChannelReference() {
150     return localChannelReference;
151   }
152 
153   @Override
154   public void run() {
155     if (Configuration.configuration.isShutdown() || Thread.interrupted()) {
156       taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
157       taskRunner.forceSaveStatus();
158       return;
159     }
160     boolean status = false;
161     try {
162       if (activeRunners != null) {
163         activeRunners.add(this);
164       }
165       // fix for SelfRequest
166       if (taskRunner.isSelfRequest()) {
167         taskRunner.setSenderByRequestToValidate(false);
168       }
169       // Try to check if file still exists in send not self not through mode
170       if (taskRunner.isSender() && !taskRunner.isSelfRequest() &&
171           !taskRunner.isSendThrough()) {
172         try {
173           final R66Session session = new R66Session(false);
174           session.setReady(true);
175           final boolean ssl = Configuration.configuration.isUseSSL();
176           session.getAuth().specialNoSessionAuth(ssl,
177                                                  Configuration.configuration.getHostId(
178                                                      ssl));
179           final DbTaskRunner reloaded =
180               new DbTaskRunner(session, taskRunner.getRule(),
181                                taskRunner.getSpecialId(),
182                                taskRunner.getRequester(),
183                                taskRunner.getRequested());
184           reloaded.setSender(taskRunner.isSender());
185           session.setRunner(reloaded);
186           session.setBlockSize(reloaded.getBlocksize());
187           final File file = new File(reloaded.getFullFilePath());
188           if (!file.isFile()) {
189             logger.warn("File not found: {}", file.getAbsolutePath());
190             // File does no more exist => error
191             reloaded.changeUpdatedInfo(UpdatedInfo.INERROR);
192             reloaded.setErrorExecutionStatus(ErrorCode.FileNotFound);
193             logger.error("Runner Error: {} {}",
194                          ErrorCode.FileNotFound.getMesg(),
195                          taskRunner.toShortString());
196             reloaded.setErrorTask();
197             reloaded.update();
198             return;
199           }
200           status = true;
201         } catch (final CommandAbstractException e) {
202           if (Configuration.configuration.isShutdown()) {
203             // ignore since shutdown
204             logger.warn(e.getMessage());
205           } else {
206             // Wrong path? Ignore
207             logger.warn(e);
208           }
209         } catch (final OpenR66RunnerErrorException e) {
210           if (Configuration.configuration.isShutdown()) {
211             // ignore since shutdown
212             logger.warn(e.getMessage());
213           } else {
214             // Wrong run error? Ignore
215             logger.warn(e);
216           }
217         } catch (final WaarpDatabaseException e) {
218           if (Configuration.configuration.isShutdown()) {
219             // ignore since shutdown
220             logger.warn(e.getMessage());
221           } else {
222             // Wrong dbtask? Ignore
223             logger.warn(e);
224           }
225         } catch (final OpenR66ProtocolNoSslException e) {
226           if (Configuration.configuration.isShutdown()) {
227             // ignore since shutdown
228             logger.warn(e.getMessage());
229           } else {
230             // Wrong ssl? Ignore
231             logger.warn(e);
232           }
233         }
234       } else {
235         status = true;
236       }
237       if (Configuration.configuration.isShutdown() || Thread.interrupted() ||
238           !status) {
239         taskRunner.changeUpdatedInfo(UpdatedInfo.TOSUBMIT);
240         taskRunner.forceSaveStatus();
241         return;
242       }
243       final R66Future transfer;
244       try {
245         transfer = runTransfer();
246       } catch (final OpenR66RunnerErrorException e) {
247         logger.error("Runner Error: {} {}", e.getMessage(),
248                      taskRunner.toShortString());
249         return;
250       } catch (final OpenR66ProtocolNoConnectionException e) {
251         logger.error("No connection Error {}", e.getMessage());
252         if (localChannelReference != null) {
253           localChannelReference.setErrorMessage(
254               ErrorCode.ConnectionImpossible.getMesg(),
255               ErrorCode.ConnectionImpossible);
256         }
257         taskRunner.setErrorTask();
258         try {
259           taskRunner.forceSaveStatus();
260           taskRunner.run();
261           taskRunner.saveStatus();
262         } catch (final OpenR66RunnerErrorException e1) {
263           changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.ConnectionImpossible,
264                             true);
265         }
266         return;
267       } catch (final OpenR66ProtocolPacketException e) {
268         logger.error("Protocol Error", e);
269         return;
270       } catch (final OpenR66ProtocolNotYetConnectionException e) {
271         logger.warn("No connection warning {}", e.getMessage());
272         return;
273       }
274       final R66Result result = transfer.getResult();
275       if (result != null) {
276         if (result.getCode() == ErrorCode.QueryAlreadyFinished) {
277           logger.warn(Messages.getString(TRANSFER_STATUS) +
278                       (transfer.isSuccess()?
279                           Messages.getString(REQUEST_INFORMATION_SUCCESS) :
280                           Messages.getString(REQUEST_INFORMATION_FAILURE)) +
281                       "     " + ErrorCode.QueryAlreadyFinished.getMesg() + ':' +
282                       result.toString());
283         } else {
284           if (transfer.isSuccess()) {
285             logger.info("{}{}     {}", Messages.getString(TRANSFER_STATUS),
286                         Messages.getString(REQUEST_INFORMATION_SUCCESS),
287                         result);
288           } else {
289             logger.error(Messages.getString(TRANSFER_STATUS) +
290                          Messages.getString(REQUEST_INFORMATION_FAILURE) +
291                          "     " + result.toString());
292           }
293         }
294       } else {
295         if (transfer.isSuccess()) {
296           logger.warn(Messages.getString(TRANSFER_STATUS) +
297                       Messages.getString(REQUEST_INFORMATION_SUCCESS) +
298                       "     no result");
299         } else {
300           logger.error(Messages.getString(TRANSFER_STATUS) +
301                        Messages.getString(REQUEST_INFORMATION_FAILURE) +
302                        "     no result");
303         }
304       }
305     } finally {
306       if (activeRunners != null) {
307         activeRunners.remove(this);
308       }
309       setName("Finished_" + nameTask);
310     }
311   }
312 
313   /**
314    * @param runner
315    * @param limit
316    *
317    * @return True if the task was run less than limit, else False
318    */
319   public final boolean incrementTaskRunnerTry(final DbTaskRunner runner,
320                                               final int limit) {
321     if (!isLimitRetryConnection()) {
322       return true;
323     }
324     final String key = runner.getKey();
325     Integer tries = taskRunnerRetryHashMap.get(key);
326     logger.debug("try to find integer: {}", tries);
327     if (tries == null) {
328       tries = 1;
329     } else {
330       tries += 1;
331     }
332     logger.debug("Check: {} vs {}: {}", tries, limit, limit <= tries);
333     if (limit <= tries || Thread.interrupted()) {
334       taskRunnerRetryHashMap.remove(key);
335       return false;
336     } else {
337       taskRunnerRetryHashMap.put(key, tries);
338       return true;
339     }
340   }
341 
342   /**
343    * True transfer run (can be called directly to enable exception outside any
344    * executors)
345    *
346    * @return The R66Future of the transfer operation
347    *
348    * @throws OpenR66RunnerErrorException
349    * @throws OpenR66ProtocolNoConnectionException
350    * @throws OpenR66ProtocolPacketException
351    * @throws OpenR66ProtocolNotYetConnectionException
352    */
353   public final R66Future runTransfer()
354       throws OpenR66RunnerErrorException, OpenR66ProtocolNoConnectionException,
355              OpenR66ProtocolPacketException,
356              OpenR66ProtocolNotYetConnectionException {
357     logger.debug("Start attempt Transfer");
358     localChannelReference = initRequest();
359     localChannelReference.getFutureValidRequest().awaitOrInterruptible(
360         Configuration.configuration.getTimeoutCon());
361     if (localChannelReference.getFutureValidRequest().isSuccess()) {
362       return finishTransfer(localChannelReference);
363     } else if (
364         localChannelReference.getFutureValidRequest().getResult() != null &&
365         localChannelReference.getFutureValidRequest().getResult().getCode() ==
366         ErrorCode.ServerOverloaded) {
367       return tryAgainTransferOnOverloaded(true, localChannelReference);
368     } else {
369       return finishTransfer(localChannelReference);
370     }
371   }
372 
373   /**
374    * In case an overloaded signal is returned by the requested
375    *
376    * @param retry if True, it will retry in case of overloaded remote
377    *     server, else it just stops
378    * @param localChannelReference
379    *
380    * @return The R66Future of the transfer operation
381    *
382    * @throws OpenR66RunnerErrorException
383    * @throws OpenR66ProtocolNoConnectionException
384    * @throws OpenR66ProtocolPacketException
385    * @throws OpenR66ProtocolNotYetConnectionException
386    */
387   public final R66Future tryAgainTransferOnOverloaded(final boolean retry,
388                                                       final LocalChannelReference localChannelReference)
389       throws OpenR66RunnerErrorException, OpenR66ProtocolNoConnectionException,
390              OpenR66ProtocolPacketException,
391              OpenR66ProtocolNotYetConnectionException {
392     if (this.localChannelReference == null) {
393       this.localChannelReference = localChannelReference;
394     }
395     final boolean incRetry =
396         incrementTaskRunnerTry(taskRunner, Configuration.RETRYNB);
397     logger.debug("tryAgainTransferOnOverloaded: {}:{}", retry, incRetry);
398     switch (taskRunner.getUpdatedInfo()) {
399       case DONE:
400       case INERROR:
401       case INTERRUPTED:
402         break;
403       default:
404         changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.ServerOverloaded,
405                           true);
406     }
407     // redo if possible
408     if (retry && incRetry) {
409       try {
410         Thread.sleep(Configuration.configuration.getConstraintLimitHandler()
411                                                 .getSleepTime());
412       } catch (final InterruptedException e) {//NOSONAR
413         if (localChannelReference == null) {
414           taskRunner.setLocalChannelReference(new LocalChannelReference());
415         }
416         taskRunner.getLocalChannelReference()
417                   .setErrorMessage(ErrorCode.ConnectionImpossible.getMesg(),
418                                    ErrorCode.ConnectionImpossible);
419         taskRunner.setErrorTask();
420         taskRunner.run();
421         taskRunner.saveStatus();
422         throw new OpenR66ProtocolNoConnectionException(
423             "End of retry on ServerOverloaded due to interruption");
424       }
425       return runTransfer();
426     } else {
427       if (localChannelReference == null) {
428         taskRunner.setLocalChannelReference(new LocalChannelReference());
429       }
430       taskRunner.getLocalChannelReference()
431                 .setErrorMessage(ErrorCode.ConnectionImpossible.getMesg(),
432                                  ErrorCode.ConnectionImpossible);
433       taskRunner.setErrorTask();
434       taskRunner.run();
435       taskRunner.saveStatus();
436       throw new OpenR66ProtocolNoConnectionException(
437           "End of retry on ServerOverloaded");
438     }
439   }
440 
441   /**
442    * Finish the transfer (called at the end of runTransfer)
443    *
444    * @param localChannelReference
445    *
446    * @return The R66Future of the transfer operation
447    */
448   public final R66Future finishTransfer(
449       final LocalChannelReference localChannelReference) {
450     if (this.localChannelReference == null) {
451       this.localChannelReference = localChannelReference;
452     }
453     final R66Future transfer = localChannelReference.getFutureRequest();
454     transfer.awaitOrInterruptible();
455     taskRunnerRetryHashMap.remove(taskRunner.getKey());
456     logger.info("Request done with {}",
457                 transfer.isSuccess()? "success" : "error");
458     localChannelReference.close();
459     // now reload TaskRunner if it still exists (light client can forget it)
460     final boolean isSender = taskRunner.isSender();
461     if (transfer.isSuccess()) {
462       try {
463         taskRunner.select();
464       } catch (final WaarpDatabaseException e) {
465         logger.debug("Not a problem but cannot find at the end the task", e);
466         taskRunner.setFrom(transfer.getRunner());
467       }
468       taskRunner.setSender(isSender);
469       changeUpdatedInfo(UpdatedInfo.DONE, ErrorCode.CompleteOk, false);
470     } else {
471       try {
472         taskRunner.select();
473       } catch (final WaarpDatabaseException e) {
474         logger.debug("Not a problem but cannot find at the end the task");
475         taskRunner.setFrom(transfer.getRunner());
476       }
477       taskRunner.setSender(isSender);
478       // Case when we were interrupted
479       if (transfer.getResult() == null) {
480         switch (taskRunner.getUpdatedInfo()) {
481           case DONE:
482             final R66Result ok =
483                 new R66Result(null, true, ErrorCode.CompleteOk, taskRunner);
484             transfer.setResult(ok);
485             transfer.setSuccess();
486             changeUpdatedInfo(UpdatedInfo.DONE, ErrorCode.CompleteOk, false);
487             break;
488           case INERROR:
489           case INTERRUPTED:
490           default:
491             final R66Result error =
492                 new R66Result(null, true, ErrorCode.Internal, taskRunner);
493             transfer.setResult(error);
494             transfer.cancel();
495             changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.Internal, false);
496         }
497         return transfer;
498       }
499       if (transfer.getResult().getCode() == ErrorCode.QueryAlreadyFinished) {
500         // check if post task to execute
501         logger.warn("WARN QueryAlreadyFinished:     " + transfer + "     " +
502                     taskRunner.toShortString());
503         try {
504           TransferUtils.finalizeTaskWithNoSession(taskRunner,
505                                                   localChannelReference);
506         } catch (final OpenR66RunnerErrorException e) {
507           taskRunner.changeUpdatedInfo(UpdatedInfo.INERROR);
508           taskRunner.forceSaveStatus();
509         }
510       } else {
511         switch (taskRunner.getUpdatedInfo()) {
512           case DONE:
513           case INERROR:
514           case INTERRUPTED:
515           case TOSUBMIT:
516             break;
517           default:
518             changeUpdatedInfo(UpdatedInfo.INERROR,
519                               transfer.getResult().getCode(), false);
520         }
521       }
522     }
523     return transfer;
524   }
525 
526   /**
527    * Initialize the request
528    *
529    * @return the localChannelReference holding the transfer request
530    *
531    * @throws OpenR66ProtocolNoConnectionException
532    * @throws OpenR66ProtocolPacketException
533    * @throws OpenR66ProtocolNotYetConnectionException
534    */
535   public final LocalChannelReference initRequest()
536       throws OpenR66ProtocolNoConnectionException,
537              OpenR66ProtocolPacketException,
538              OpenR66ProtocolNotYetConnectionException {
539     changeUpdatedInfo(UpdatedInfo.RUNNING, ErrorCode.Running, true);
540     final long id = taskRunner.getSpecialId();
541     final String tid;
542     if (id == DbConstantR66.ILLEGALVALUE) {
543       tid = "Runner_" + taskRunner.getRuleId() + '_' + taskRunner.getMode() +
544             "_NEWTRANSFER";
545     } else {
546       tid = "Runner_" + taskRunner.getRuleId() + '_' + taskRunner.getMode() +
547             '_' + id;
548     }
549     setName(tid);
550     logger.debug("Will run {}", taskRunner);
551     boolean restartPost = false;
552     if (taskRunner.getGloballaststep() == TASKSTEP.POSTTASK.ordinal()) {
553       // Send a validation to requested
554       if (!taskRunner.isRequestOnRequested()) {
555         // restart
556         restartPost = true;
557       }
558     }
559     if (taskRunner.isRequestOnRequested()) {
560       // Don't have to restart a task for itself (or should use requester)
561       logger.warn("Requested host cannot initiate itself the request");
562       changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.LoopSelfRequestedHost,
563                         true);
564       throw new OpenR66ProtocolNoConnectionException(
565           "Requested host cannot initiate itself the request");
566     }
567     final DbHostAuth host;
568     try {
569       host = new DbHostAuth(taskRunner.getRequested());
570     } catch (final WaarpDatabaseException e) {
571       logger.error(
572           "Requested host cannot be found: " + taskRunner.getRequested());
573       changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.NotKnownHost, true);
574       throw new OpenR66ProtocolNoConnectionException(
575           "Requested host cannot be found " + taskRunner.getRequested());
576     }
577     if (host.isClient()) {
578       logger.warn("Cannot initiate a connection with a client: {}", host);
579       changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.ConnectionImpossible,
580                         true);
581       throw new OpenR66ProtocolNoConnectionException(
582           "Cannot connect to client " + host);
583     }
584     final SocketAddress socketAddress = host.getSocketAddress();
585     final boolean isSSL = host.isSsl();
586 
587     final LocalChannelReference localChannelReferenceTemp;
588     try {
589       localChannelReferenceTemp =
590           networkTransaction.createConnectionWithRetryWithAuthenticationException(
591               socketAddress, isSSL, futureRequest);
592     } catch (final OpenR66ProtocolNotAuthenticatedException e1) {
593       changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.BadAuthent, true);
594       taskRunner.setLocalChannelReference(new LocalChannelReference());
595       throw new OpenR66ProtocolNoConnectionException(
596           CANNOT_CONNECT_TO_SERVER + host +
597           " cannot be authenticated so stop retry here", e1);
598     }
599     taskRunner.setLocalChannelReference(localChannelReferenceTemp);
600     if (localChannelReferenceTemp == null) {
601       // propose to redo
602       String retry;
603       if (incrementTaskRunnerTry(taskRunner, Configuration.RETRYNB)) {
604 
605         logger.debug("Will retry since Cannot connect to {}", host);
606         retry = " but will retry";
607         // now wait
608         try {
609           Thread.sleep(Configuration.configuration.getDelayRetry());
610         } catch (final InterruptedException e) {//NOSONAR
611           SysErrLogger.FAKE_LOGGER.ignoreLog(e);
612           logger.info(
613               "Will not retry since an interruption occurs while connection to {}",
614               host);
615           retry = " and retries gets an interruption so stop here";
616           changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.ConnectionImpossible,
617                             true);
618           taskRunner.setLocalChannelReference(new LocalChannelReference());
619           throw new OpenR66ProtocolNoConnectionException(
620               CANNOT_CONNECT_TO_SERVER + host + retry);
621         }
622         changeUpdatedInfo(UpdatedInfo.TOSUBMIT, ErrorCode.ConnectionImpossible,
623                           true);
624         throw new OpenR66ProtocolNotYetConnectionException(
625             CANNOT_CONNECT_TO_SERVER + host + retry);
626       } else {
627         logger.info(
628             "Will not retry since limit of connection attemtps is reached for {}",
629             host);
630         retry = " and retries reach step limit so stop here";
631         changeUpdatedInfo(UpdatedInfo.INERROR, ErrorCode.ConnectionImpossible,
632                           true);
633         taskRunner.setLocalChannelReference(new LocalChannelReference());
634         throw new OpenR66ProtocolNoConnectionException(
635             CANNOT_CONNECT_TO_SERVER + host + retry);
636       }
637     }
638     if (handler != null) {
639       localChannelReferenceTemp.setRecvThroughHandler(handler);
640     }
641     localChannelReferenceTemp.setSendThroughMode(isSendThroughMode);
642     if (restartPost) {
643       final RequestPacket request = taskRunner.getRequest();
644       logger.debug("Will send request {} ", request);
645       localChannelReferenceTemp.setClientRunner(this);
646       localChannelReferenceTemp.sessionNewState(R66FiniteDualStates.REQUESTR);
647       try {
648         ChannelUtils.writeAbstractLocalPacket(localChannelReferenceTemp,
649                                               request, false);
650       } catch (final OpenR66ProtocolPacketException e) {
651         // propose to redo
652         logger.warn("Cannot transfer request to " + host);
653         changeUpdatedInfo(UpdatedInfo.INTERRUPTED, ErrorCode.Internal, true);
654         localChannelReferenceTemp.close();
655         throw e;
656       }
657       logger.debug("Wait for request to {}", host);
658       return localChannelReferenceTemp;
659     }
660     // If Requester is NOT Sender, and if TransferTask then decrease now if
661     // possible the rank
662     if (!taskRunner.isSender() &&
663         taskRunner.getGloballaststep() == TASKSTEP.TRANSFERTASK.ordinal()) {
664       logger.debug(
665           "Requester is not Sender so decrease if possible the rank {}",
666           taskRunner);
667       taskRunner.restartRank();
668       taskRunner.forceSaveStatus();
669       logger.info("Requester is not Sender so new rank is {} {}",
670                   taskRunner.getRank(), taskRunner);
671     }
672     final RequestPacket request = taskRunner.getRequest();
673     request.setLimit(
674         localChannelReferenceTemp.getChannelLimit(taskRunner.isSender()));
675     localChannelReferenceTemp.setClientRunner(this);
676     logger.debug("Will send request {} {}", request, localChannelReferenceTemp);
677     localChannelReferenceTemp.sessionNewState(R66FiniteDualStates.REQUESTR);
678     try {
679       ChannelUtils.writeAbstractLocalPacket(localChannelReferenceTemp, request,
680                                             false);
681     } catch (final OpenR66ProtocolPacketException e) {
682       // propose to redo
683       logger.warn("Cannot transfer request to " + host);
684       changeUpdatedInfo(UpdatedInfo.INTERRUPTED, ErrorCode.Internal, true);
685       localChannelReferenceTemp.close();
686       throw e;
687     }
688     logger.debug("Wait for request to {} {} {}", host,
689                  localChannelReferenceTemp, request);
690     return localChannelReferenceTemp;
691   }
692 
693   /**
694    * Change the UpdatedInfo of the current runner
695    *
696    * @param info
697    */
698   public final void changeUpdatedInfo(final AbstractDbData.UpdatedInfo info,
699                                       final ErrorCode code,
700                                       final boolean force) {
701     taskRunner.changeUpdatedInfo(info);
702     taskRunner.setErrorExecutionStatus(code);
703     if (force) {
704       taskRunner.forceSaveStatus();
705     } else {
706       try {
707         taskRunner.saveStatus();
708       } catch (final OpenR66RunnerErrorException ignored) {
709         // nothing
710       }
711     }
712   }
713 
714   /**
715    * @param handler the handler to set
716    */
717   public final void setRecvThroughHandler(final RecvThroughHandler handler) {
718     this.handler = handler;
719   }
720 
721   public final void setSendThroughMode() {
722     isSendThroughMode = true;
723   }
724 
725   public final boolean getSendThroughMode() {
726     return isSendThroughMode;
727   }
728 
729   public final boolean isLimitRetryConnection() {
730     return limitRetryConnection;
731   }
732 
733   public final void setLimitRetryConnection(
734       final boolean limitRetryConnection) {
735     this.limitRetryConnection = limitRetryConnection;
736   }
737 }