View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.protocol.localhandler;
21  
22  import io.netty.channel.Channel;
23  import io.netty.handler.traffic.ChannelTrafficShapingHandler;
24  import org.waarp.common.database.DbSession;
25  import org.waarp.common.guid.IntegerUuid;
26  import org.waarp.common.logging.WaarpLogger;
27  import org.waarp.common.logging.WaarpLoggerFactory;
28  import org.waarp.openr66.client.RecvThroughHandler;
29  import org.waarp.openr66.commander.ClientRunner;
30  import org.waarp.openr66.context.ErrorCode;
31  import org.waarp.openr66.context.R66FiniteDualStates;
32  import org.waarp.openr66.context.R66Result;
33  import org.waarp.openr66.context.R66Session;
34  import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
35  import org.waarp.openr66.protocol.configuration.Configuration;
36  import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
37  import org.waarp.openr66.protocol.exception.OpenR66Exception;
38  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
39  import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
40  import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
41  import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
42  import org.waarp.openr66.protocol.networkhandler.NetworkServerInitializer;
43  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
44  import org.waarp.openr66.protocol.utils.R66Future;
45  import org.waarp.openr66.protocol.utils.R66Versions;
46  
47  import static org.waarp.common.database.DbConstant.*;
48  
49  /**
50   * Reference of one object using Local Channel localId and containing local
51   * channel and network channel.
52   */
53  public class LocalChannelReference {
54    /**
55     * Internal Logger
56     */
57    private static final WaarpLogger logger =
58        WaarpLoggerFactory.getLogger(LocalChannelReference.class);
59  
60    /**
61     * Network Channel Ref
62     */
63    private final NetworkChannelReference networkChannelRef;
64    /**
65     * Traffic handler associated if any
66     */
67    private final ChannelTrafficShapingHandler cts;
68  
69    /**
70     * Network Server Handler
71     */
72    private final NetworkServerHandler networkServerHandler;
73  
74    /**
75     * Server Actions handler
76     */
77    private final TransferActions serverHandler = new TransferActions();
78  
79    /**
80     * Local Id
81     */
82    private final Integer localId;
83  
84    /**
85     * Remote Id
86     */
87    private Integer remoteId;
88  
89    /**
90     * Requested_requester_specialId
91     */
92    private String requestId;
93    /**
94     * Future on Global Request
95     */
96    private final R66Future futureRequest;
97  
98    /**
99     * Future on Valid Starting Request
100    */
101   private final R66Future futureValidRequest = new R66Future(true);
102 
103   /**
104    * Future on Transfer if any
105    */
106   private R66Future futureEndTransfer = new R66Future(true);
107 
108   /**
109    * Future on Connection
110    */
111   private final R66Future futureConnection = new R66Future(true);
112 
113   /**
114    * Future on Startup
115    */
116   private final R66Future futureStartup = new R66Future(true);
117 
118   /**
119    * Session
120    */
121   private R66Session session;
122 
123   /**
124    * Last error message
125    */
126   private String errorMessage = "NoError";
127 
128   /**
129    * Last error code
130    */
131   private ErrorCode code = ErrorCode.Unknown;
132 
133   /**
134    * RecvThroughHandler
135    */
136   private RecvThroughHandler recvThroughHandler;
137 
138   private boolean isSendThroughMode;
139   /**
140    * Thread for ClientRunner if any
141    */
142   private ClientRunner clientRunner;
143 
144   /**
145    * To be able to check hash once all transfer is over once again
146    */
147   private String hashComputeDuringTransfer;
148   /**
149    * If partial hash, no global hash validation can be done
150    */
151   private boolean partialHash;
152 
153   /**
154    * PartnerConfiguration
155    */
156   private PartnerConfiguration partner;
157 
158   /**
159    * @param networkChannelRef
160    * @param remoteId
161    * @param futureRequest
162    *
163    * @throws OpenR66ProtocolRemoteShutdownException
164    */
165   public LocalChannelReference(final NetworkChannelReference networkChannelRef,
166                                final Integer remoteId,
167                                final R66Future futureRequest)
168       throws OpenR66ProtocolRemoteShutdownException {
169     this.networkChannelRef = networkChannelRef;
170     networkServerHandler =
171         (NetworkServerHandler) this.networkChannelRef.channel().pipeline().get(
172             NetworkServerInitializer.NETWORK_HANDLER);
173     localId = new IntegerUuid().getInt();
174     this.remoteId = remoteId;
175     if (futureRequest == null) {
176       this.futureRequest = new R66Future(true);
177     } else {
178       if (futureRequest.isDone()) {
179         futureRequest.reset();
180       }
181       this.futureRequest = futureRequest;
182     }
183     cts = (ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
184                                                           .get(
185                                                               NetworkServerInitializer.LIMITCHANNEL);
186     LocalServerHandler.channelActive(serverHandler);
187     serverHandler.setLocalChannelReference(this);
188     networkChannelRef.add(this);
189   }
190 
191   /**
192    * Special empty LCR constructor
193    */
194   public LocalChannelReference() {
195     networkChannelRef = null;
196     networkServerHandler = null;
197     localId = 0;
198     futureRequest = new R66Future(true);
199     cts = null;
200     serverHandler.localChannelReference = this;
201   }
202 
203   /**
204    * Close the localChannelReference
205    */
206   public final void close() {
207     LocalServerHandler.channelInactive(serverHandler);
208     if (networkChannelRef != null) {
209       networkChannelRef.remove(this);
210     }
211     final LocalTransaction lt =
212         Configuration.configuration.getLocalTransaction();
213     if (lt != null) {
214       lt.remove(this);
215     }
216   }
217 
218   /**
219    * @return the networkChannelRef
220    */
221   public final Channel getNetworkChannel() {
222     return networkChannelRef.channel();
223   }
224 
225   /**
226    * @return the id
227    */
228   public final Integer getLocalId() {
229     return localId;
230   }
231 
232   /**
233    * @return the remoteId
234    */
235   public final Integer getRemoteId() {
236     return remoteId;
237   }
238 
239   /**
240    * @return the ChannelTrafficShapingHandler
241    */
242   public final ChannelTrafficShapingHandler getChannelTrafficShapingHandler() {
243     return cts;
244   }
245 
246   /**
247    * @return the networkChannelObject
248    */
249   public final NetworkChannelReference getNetworkChannelObject() {
250     return networkChannelRef;
251   }
252 
253   /**
254    * @return the networkServerHandler
255    */
256   public final NetworkServerHandler getNetworkServerHandler() {
257     return networkServerHandler;
258   }
259 
260   /**
261    * @return the serverHandler
262    */
263   public final TransferActions getServerHandler() {
264     return serverHandler;
265   }
266 
267   /**
268    * @return the actual dbSession
269    */
270   public final DbSession getDbSession() {
271     if (networkServerHandler != null) {
272       return networkServerHandler.getDbSession();
273     }
274     logger.info("SHOULD NOT BE");
275     return admin.getSession();
276   }
277 
278   /**
279    * @param remoteId the remoteId to set
280    */
281   public final void setRemoteId(final Integer remoteId) {
282     this.remoteId = remoteId;
283   }
284 
285   /**
286    * @return the session
287    */
288   public final R66Session getSession() {
289     return session;
290   }
291 
292   /**
293    * @param session the session to set
294    */
295   public final void setSession(final R66Session session) {
296     this.session = session;
297   }
298 
299   /**
300    * @return the current errorMessage
301    */
302   public final String getErrorMessage() {
303     return errorMessage;
304   }
305 
306   /**
307    * @param errorMessage the errorMessage to set
308    */
309   public final void setErrorMessage(final String errorMessage,
310                                     final ErrorCode code) {
311     this.errorMessage = errorMessage;
312     this.code = code;
313   }
314 
315   /**
316    * @return the code
317    */
318   public final ErrorCode getCurrentCode() {
319     return code;
320   }
321 
322   /**
323    * Validate or not the Startup (before connection)
324    *
325    * @param validate
326    */
327   public final void validateStartup(final boolean validate) {
328     if (futureStartup.isDone()) {
329       return;
330     }
331     if (validate) {
332       futureStartup.setSuccess();
333     } else {
334       futureStartup.cancel();
335     }
336   }
337 
338   /**
339    * @return the futureValidateStartup
340    */
341   public final R66Future getFutureValidateStartup() {
342     if (!futureStartup.awaitOrInterruptible()) {
343       validateStartup(false);
344       return futureStartup;
345     }
346     return futureStartup;
347   }
348 
349   /**
350    * @return True if the connection is validated (in OK or KO status)
351    */
352   public final boolean isConnectionValidate() {
353     return futureConnection.isDone();
354   }
355 
356   /**
357    * Validate or Invalidate the connection (authentication)
358    *
359    * @param validate
360    */
361   public final void validateConnection(final boolean validate,
362                                        final R66Result result) {
363     if (futureConnection.isDone()) {
364       logger.debug("LocalChannelReference already validated: {}",
365                    futureConnection.isSuccess());
366       return;
367     }
368     logger.debug("Validation of connection {}", validate);
369     if (validate) {
370       futureConnection.setResult(result);
371       futureConnection.setSuccess();
372     } else {
373       futureConnection.setResult(result);
374       setErrorMessage(result.getMessage(), result.getCode());
375       futureConnection.cancel();
376     }
377   }
378 
379   /**
380    * @return the futureValidateConnection
381    */
382   public final R66Future getFutureValidateConnection() {
383     final R66Result result;
384     final Channel channel = networkChannelRef.channel();
385     if (channel != null && channel.isActive()) {
386       if (!futureConnection.awaitOrInterruptible()) {
387         if (futureConnection.isDone()) {
388           return futureConnection;
389         } else {
390           logger.warn("Cannot get Connection due to out of Time: {}", this);
391           result = new R66Result(
392               new OpenR66ProtocolNoConnectionException("Out of time"), session,
393               false, ErrorCode.ConnectionImpossible, null);
394           validateConnection(false, result);
395           return futureConnection;
396         }
397       } else {
398         return futureConnection;
399       }
400     }
401     if (futureConnection.isDone()) {
402       return futureConnection;
403     }
404 
405     logger.info("Cannot get Connection due to out of Time: {}", this);
406     result =
407         new R66Result(new OpenR66ProtocolNoConnectionException("Out of time"),
408                       session, false, ErrorCode.ConnectionImpossible, null);
409     validateConnection(false, result);
410     return futureConnection;
411   }
412 
413   /**
414    * Validate the End of a Transfer
415    *
416    * @param finalValue
417    */
418   public final void validateEndTransfer(final R66Result finalValue) {
419     if (!futureEndTransfer.isDone()) {
420       futureEndTransfer.setResult(finalValue);
421       futureEndTransfer.setSuccess();
422     } else {
423       logger.debug("Could not validate since Already validated: {} {}",
424                    futureEndTransfer.isSuccess(), finalValue);
425       if (!futureEndTransfer.getResult().isAnswered()) {
426         futureEndTransfer.getResult().setAnswered(finalValue.isAnswered());
427       }
428     }
429   }
430 
431   /**
432    * @return the futureEndTransfer
433    */
434   public final R66Future getFutureEndTransfer() {
435     return futureEndTransfer;
436   }
437 
438   /**
439    * Special waiter for Send Through method. It reset the EndTransfer future.
440    *
441    * @throws OpenR66Exception
442    */
443   public final void waitReadyForSendThrough() throws OpenR66Exception {
444     logger.debug("Wait for End of Prepare Transfer");
445     futureEndTransfer.awaitOrInterruptible();
446     if (futureEndTransfer.isSuccess()) {
447       // reset since transfer will start now
448       futureEndTransfer = new R66Future(true);
449     } else {
450       if (futureEndTransfer.getResult() != null &&
451           futureEndTransfer.getResult().getException() != null) {
452         throw futureEndTransfer.getResult().getException();
453       } else if (futureEndTransfer.getCause() != null) {
454         throw new OpenR66RunnerErrorException(futureEndTransfer.getCause());
455       } else {
456         throw new OpenR66RunnerErrorException("Unknown reason");
457       }
458     }
459   }
460 
461   /**
462    * @return the futureValidRequest
463    */
464   public final R66Future getFutureValidRequest() {
465     return futureValidRequest;
466   }
467 
468   /**
469    * @return the futureRequest
470    */
471   public final R66Future getFutureRequest() {
472     return futureRequest;
473   }
474 
475   /**
476    * Invalidate the current request
477    *
478    * @param finalvalue
479    */
480   public final void invalidateRequest(final R66Result finalvalue) {
481     R66Result finalValue = finalvalue;
482     if (finalValue == null) {
483       finalValue =
484           new R66Result(session, false, ErrorCode.Unknown, session.getRunner());
485     }
486     if (logger.isDebugEnabled()) {
487       logger.debug(
488           "FST: " + futureStartup.isDone() + ":" + futureStartup.isSuccess() +
489           " FCT: " + futureConnection.isDone() + ':' +
490           futureConnection.isSuccess() + " FET: " + futureEndTransfer.isDone() +
491           ':' + futureEndTransfer.isSuccess() + " FVR: " +
492           futureValidRequest.isDone() + ':' + futureValidRequest.isSuccess() +
493           " FR: " + futureRequest.isDone() + ':' + futureRequest.isSuccess() +
494           ' ' + finalValue.getMessage());
495     }
496     if (!futureStartup.isDone()) {
497       futureStartup.setResult(finalValue);
498       if (finalValue.getException() != null) {
499         futureStartup.setFailure(finalValue.getException());
500       } else {
501         futureStartup.cancel();
502       }
503     }
504     if (!futureConnection.isDone()) {
505       futureConnection.setResult(finalValue);
506       if (finalValue.getException() != null) {
507         futureConnection.setFailure(finalValue.getException());
508       } else {
509         futureConnection.cancel();
510       }
511     }
512     if (!futureEndTransfer.isDone()) {
513       futureEndTransfer.setResult(finalValue);
514       if (finalValue.getException() != null) {
515         futureEndTransfer.setFailure(finalValue.getException());
516       } else {
517         futureEndTransfer.cancel();
518       }
519     }
520     if (!futureValidRequest.isDone()) {
521       futureValidRequest.setResult(finalValue);
522       if (finalValue.getException() != null) {
523         futureValidRequest.setFailure(finalValue.getException());
524       } else {
525         futureValidRequest.cancel();
526       }
527     }
528     if (logger.isTraceEnabled()) {
529       logger.trace("Invalidate Request",
530                    new Exception("DEBUG Trace for Invalidation"));
531     }
532     if (finalValue.getCode() != ErrorCode.ServerOverloaded) {
533       if (!futureRequest.isDone()) {
534         setErrorMessage(finalValue.getMessage(), finalValue.getCode());
535         futureRequest.setResult(finalValue);
536         if (finalValue.getException() != null) {
537           futureRequest.setFailure(finalValue.getException());
538         } else {
539           futureRequest.cancel();
540         }
541       } else {
542         logger.debug("Could not invalidate since Already finished: {}",
543                      futureEndTransfer.getResult());
544       }
545     } else {
546       setErrorMessage(finalValue.getMessage(), finalValue.getCode());
547       logger.info("Server Overloaded");
548     }
549     if (session != null) {
550       if (session.isSender()) {
551         NetworkTransaction.stopRetrieve(this);
552       }
553     }
554   }
555 
556   /**
557    * Validate the current Request
558    *
559    * @param finalValue
560    */
561   public final void validateRequest(final R66Result finalValue) {
562     setErrorMessage("NoError", null);
563     if (!futureEndTransfer.isDone()) {
564       logger.debug("Will validate EndTransfer");
565       validateEndTransfer(finalValue);
566     }
567     if (!futureValidRequest.isDone()) {
568       futureValidRequest.setResult(finalValue);
569       futureValidRequest.setSuccess();
570     }
571     logger.debug("Validate Request");
572     if (!futureRequest.isDone()) {
573       if (finalValue.getOther() == null &&
574           session.getBusinessObject() != null &&
575           session.getBusinessObject().getInfo(session) != null) {
576         finalValue.setOther(session.getBusinessObject().getInfo(session));
577       }
578       futureRequest.setResult(finalValue);
579       futureRequest.setSuccess();
580     } else {
581       logger.info("Already validated: {} {}", futureRequest.isSuccess(),
582                   finalValue);
583       if (!futureRequest.getResult().isAnswered()) {
584         futureRequest.getResult().setAnswered(finalValue.isAnswered());
585       }
586     }
587   }
588 
589   private long getMinLimit(final long a, final long b) {
590     long res = a;
591     if (a <= 0) {
592       res = b;
593     } else if (b > 0 && b < a) {
594       res = b;
595     }
596     return res;
597   }
598 
599   public final void setChannelLimit(final boolean isSender, final long limit) {
600     final ChannelTrafficShapingHandler limitHandler =
601         (ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
602                                                         .get(
603                                                             NetworkServerInitializer.LIMITCHANNEL);
604     if (isSender) {
605       limitHandler.setWriteLimit(limit);
606       logger.info("Will write at {} Bytes/sec", limit);
607     } else {
608       limitHandler.setReadLimit(limit);
609       logger.info("Will read at {} Bytes/sec", limit);
610     }
611   }
612 
613   public final long getChannelLimit(final boolean isSender) {
614     final long global;
615     final long channel;
616     if (isSender) {
617       global = Configuration.configuration.getServerGlobalWriteLimit();
618       channel = Configuration.configuration.getServerChannelWriteLimit();
619     } else {
620       global = Configuration.configuration.getServerGlobalReadLimit();
621       channel = Configuration.configuration.getServerChannelReadLimit();
622     }
623     return getMinLimit(global, channel);
624   }
625 
626   @Override
627   public final String toString() {
628     return "LCR: L: " + localId + " R: " + remoteId + " Startup[" +
629            futureStartup + "] Conn[" + futureConnection +
630            "] ValidRequestRequest[" + futureValidRequest + "] EndTransfer[" +
631            (futureEndTransfer != null? futureEndTransfer : "noEndTransfer") +
632            "] Request[" + (futureRequest != null? futureRequest : "noRequest") +
633            ']';
634   }
635 
636   /**
637    * @return the recvThroughHandler
638    */
639   public final RecvThroughHandler getRecvThroughHandler() {
640     return recvThroughHandler;
641   }
642 
643   /**
644    * @return True if in RecvThrough Mode
645    */
646   public final boolean isRecvThroughMode() {
647     return recvThroughHandler != null;
648   }
649 
650   /**
651    * @param recvThroughHandler the recvThroughHandler to set
652    */
653   public final void setRecvThroughHandler(
654       final RecvThroughHandler recvThroughHandler) {
655     this.recvThroughHandler = recvThroughHandler;
656   }
657 
658   /**
659    * @return True if in SendThrough Mode
660    */
661   public final boolean isSendThroughMode() {
662     return isSendThroughMode;
663   }
664 
665   /**
666    * @param isSendThroughMode the isSendThroughMode to set
667    */
668   public final void setSendThroughMode(final boolean isSendThroughMode) {
669     this.isSendThroughMode = isSendThroughMode;
670   }
671 
672   /**
673    * @return the clientRunner
674    */
675   public final ClientRunner getClientRunner() {
676     return clientRunner;
677   }
678 
679   /**
680    * @param clientRunner the clientRunner to set
681    */
682   public final void setClientRunner(final ClientRunner clientRunner) {
683     this.clientRunner = clientRunner;
684   }
685 
686   /**
687    * Shortcut to set a new state in Session
688    *
689    * @param desiredState
690    */
691   public final void sessionNewState(final R66FiniteDualStates desiredState) {
692     if (session != null) {
693       session.newState(desiredState);
694     }
695   }
696 
697   /**
698    * @return the current state or TEST if no session exists
699    */
700   public final R66FiniteDualStates getSessionState() {
701     if (session != null) {
702       return session.getState();
703     }
704     return R66FiniteDualStates.TEST;
705   }
706 
707   /**
708    * @return the hashComputeDuringTransfer
709    */
710   public final String getHashComputeDuringTransfer() {
711     return hashComputeDuringTransfer;
712   }
713 
714   /**
715    * @param hashComputeDuringTransfer the hashComputeDuringTransfer to
716    *     set
717    */
718   public final void setHashComputeDuringTransfer(
719       final String hashComputeDuringTransfer) {
720     this.hashComputeDuringTransfer = hashComputeDuringTransfer;
721   }
722 
723   public final void setPartialHash() {
724     partialHash = true;
725   }
726 
727   public final boolean isPartialHash() {
728     return partialHash;
729   }
730 
731   /**
732    * @return the partner
733    */
734   public final PartnerConfiguration getPartner() {
735     return partner;
736   }
737 
738   /**
739    * @param hostId the partner to set
740    */
741   public final void setPartner(final String hostId) {
742     logger.debug("host: {}", hostId);
743     partner = Configuration.configuration.getVersions().get(hostId);
744     if (partner == null) {
745       partner =
746           new PartnerConfiguration(hostId, R66Versions.V2_4_12.getVersion());
747     }
748     logger.debug("DEBUG {}", partner);
749   }
750 
751   /**
752    * @return the requestId
753    */
754   public final String getRequestId() {
755     return requestId;
756   }
757 
758   /**
759    * @param requestId the requestId to set
760    */
761   public final void setRequestId(final String requestId) {
762     this.requestId = requestId;
763   }
764 
765 }