1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.protocol.localhandler;
21
22 import io.netty.channel.Channel;
23 import io.netty.handler.traffic.ChannelTrafficShapingHandler;
24 import org.waarp.common.database.DbSession;
25 import org.waarp.common.guid.IntegerUuid;
26 import org.waarp.common.logging.WaarpLogger;
27 import org.waarp.common.logging.WaarpLoggerFactory;
28 import org.waarp.openr66.client.RecvThroughHandler;
29 import org.waarp.openr66.commander.ClientRunner;
30 import org.waarp.openr66.context.ErrorCode;
31 import org.waarp.openr66.context.R66FiniteDualStates;
32 import org.waarp.openr66.context.R66Result;
33 import org.waarp.openr66.context.R66Session;
34 import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
35 import org.waarp.openr66.protocol.configuration.Configuration;
36 import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
37 import org.waarp.openr66.protocol.exception.OpenR66Exception;
38 import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
39 import org.waarp.openr66.protocol.exception.OpenR66ProtocolRemoteShutdownException;
40 import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
41 import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
42 import org.waarp.openr66.protocol.networkhandler.NetworkServerInitializer;
43 import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
44 import org.waarp.openr66.protocol.utils.R66Future;
45 import org.waarp.openr66.protocol.utils.R66Versions;
46
47 import static org.waarp.common.database.DbConstant.*;
48
49
50
51
52
53 public class LocalChannelReference {
54
55
56
57 private static final WaarpLogger logger =
58 WaarpLoggerFactory.getLogger(LocalChannelReference.class);
59
60
61
62
63 private final NetworkChannelReference networkChannelRef;
64
65
66
67 private final ChannelTrafficShapingHandler cts;
68
69
70
71
72 private final NetworkServerHandler networkServerHandler;
73
74
75
76
77 private final TransferActions serverHandler = new TransferActions();
78
79
80
81
82 private final Integer localId;
83
84
85
86
87 private Integer remoteId;
88
89
90
91
92 private String requestId;
93
94
95
96 private final R66Future futureRequest;
97
98
99
100
101 private final R66Future futureValidRequest = new R66Future(true);
102
103
104
105
106 private R66Future futureEndTransfer = new R66Future(true);
107
108
109
110
111 private final R66Future futureConnection = new R66Future(true);
112
113
114
115
116 private final R66Future futureStartup = new R66Future(true);
117
118
119
120
121 private R66Session session;
122
123
124
125
126 private String errorMessage = "NoError";
127
128
129
130
131 private ErrorCode code = ErrorCode.Unknown;
132
133
134
135
136 private RecvThroughHandler recvThroughHandler;
137
138 private boolean isSendThroughMode;
139
140
141
142 private ClientRunner clientRunner;
143
144
145
146
147 private String hashComputeDuringTransfer;
148
149
150
151 private boolean partialHash;
152
153
154
155
156 private PartnerConfiguration partner;
157
158
159
160
161
162
163
164
165 public LocalChannelReference(final NetworkChannelReference networkChannelRef,
166 final Integer remoteId,
167 final R66Future futureRequest)
168 throws OpenR66ProtocolRemoteShutdownException {
169 this.networkChannelRef = networkChannelRef;
170 networkServerHandler =
171 (NetworkServerHandler) this.networkChannelRef.channel().pipeline().get(
172 NetworkServerInitializer.NETWORK_HANDLER);
173 localId = new IntegerUuid().getInt();
174 this.remoteId = remoteId;
175 if (futureRequest == null) {
176 this.futureRequest = new R66Future(true);
177 } else {
178 if (futureRequest.isDone()) {
179 futureRequest.reset();
180 }
181 this.futureRequest = futureRequest;
182 }
183 cts = (ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
184 .get(
185 NetworkServerInitializer.LIMITCHANNEL);
186 LocalServerHandler.channelActive(serverHandler);
187 serverHandler.setLocalChannelReference(this);
188 networkChannelRef.add(this);
189 }
190
191
192
193
194 public LocalChannelReference() {
195 networkChannelRef = null;
196 networkServerHandler = null;
197 localId = 0;
198 futureRequest = new R66Future(true);
199 cts = null;
200 serverHandler.localChannelReference = this;
201 }
202
203
204
205
206 public final void close() {
207 LocalServerHandler.channelInactive(serverHandler);
208 if (networkChannelRef != null) {
209 networkChannelRef.remove(this);
210 }
211 final LocalTransaction lt =
212 Configuration.configuration.getLocalTransaction();
213 if (lt != null) {
214 lt.remove(this);
215 }
216 }
217
218
219
220
221 public final Channel getNetworkChannel() {
222 return networkChannelRef.channel();
223 }
224
225
226
227
228 public final Integer getLocalId() {
229 return localId;
230 }
231
232
233
234
235 public final Integer getRemoteId() {
236 return remoteId;
237 }
238
239
240
241
242 public final ChannelTrafficShapingHandler getChannelTrafficShapingHandler() {
243 return cts;
244 }
245
246
247
248
249 public final NetworkChannelReference getNetworkChannelObject() {
250 return networkChannelRef;
251 }
252
253
254
255
256 public final NetworkServerHandler getNetworkServerHandler() {
257 return networkServerHandler;
258 }
259
260
261
262
263 public final TransferActions getServerHandler() {
264 return serverHandler;
265 }
266
267
268
269
270 public final DbSession getDbSession() {
271 if (networkServerHandler != null) {
272 return networkServerHandler.getDbSession();
273 }
274 logger.info("SHOULD NOT BE");
275 return admin.getSession();
276 }
277
278
279
280
281 public final void setRemoteId(final Integer remoteId) {
282 this.remoteId = remoteId;
283 }
284
285
286
287
288 public final R66Session getSession() {
289 return session;
290 }
291
292
293
294
295 public final void setSession(final R66Session session) {
296 this.session = session;
297 }
298
299
300
301
302 public final String getErrorMessage() {
303 return errorMessage;
304 }
305
306
307
308
309 public final void setErrorMessage(final String errorMessage,
310 final ErrorCode code) {
311 this.errorMessage = errorMessage;
312 this.code = code;
313 }
314
315
316
317
318 public final ErrorCode getCurrentCode() {
319 return code;
320 }
321
322
323
324
325
326
327 public final void validateStartup(final boolean validate) {
328 if (futureStartup.isDone()) {
329 return;
330 }
331 if (validate) {
332 futureStartup.setSuccess();
333 } else {
334 futureStartup.cancel();
335 }
336 }
337
338
339
340
341 public final R66Future getFutureValidateStartup() {
342 if (!futureStartup.awaitOrInterruptible()) {
343 validateStartup(false);
344 return futureStartup;
345 }
346 return futureStartup;
347 }
348
349
350
351
352 public final boolean isConnectionValidate() {
353 return futureConnection.isDone();
354 }
355
356
357
358
359
360
361 public final void validateConnection(final boolean validate,
362 final R66Result result) {
363 if (futureConnection.isDone()) {
364 logger.debug("LocalChannelReference already validated: {}",
365 futureConnection.isSuccess());
366 return;
367 }
368 logger.debug("Validation of connection {}", validate);
369 if (validate) {
370 futureConnection.setResult(result);
371 futureConnection.setSuccess();
372 } else {
373 futureConnection.setResult(result);
374 setErrorMessage(result.getMessage(), result.getCode());
375 futureConnection.cancel();
376 }
377 }
378
379
380
381
382 public final R66Future getFutureValidateConnection() {
383 final R66Result result;
384 final Channel channel = networkChannelRef.channel();
385 if (channel != null && channel.isActive()) {
386 if (!futureConnection.awaitOrInterruptible()) {
387 if (futureConnection.isDone()) {
388 return futureConnection;
389 } else {
390 logger.warn("Cannot get Connection due to out of Time: {}", this);
391 result = new R66Result(
392 new OpenR66ProtocolNoConnectionException("Out of time"), session,
393 false, ErrorCode.ConnectionImpossible, null);
394 validateConnection(false, result);
395 return futureConnection;
396 }
397 } else {
398 return futureConnection;
399 }
400 }
401 if (futureConnection.isDone()) {
402 return futureConnection;
403 }
404
405 logger.info("Cannot get Connection due to out of Time: {}", this);
406 result =
407 new R66Result(new OpenR66ProtocolNoConnectionException("Out of time"),
408 session, false, ErrorCode.ConnectionImpossible, null);
409 validateConnection(false, result);
410 return futureConnection;
411 }
412
413
414
415
416
417
418 public final void validateEndTransfer(final R66Result finalValue) {
419 if (!futureEndTransfer.isDone()) {
420 futureEndTransfer.setResult(finalValue);
421 futureEndTransfer.setSuccess();
422 } else {
423 logger.debug("Could not validate since Already validated: {} {}",
424 futureEndTransfer.isSuccess(), finalValue);
425 if (!futureEndTransfer.getResult().isAnswered()) {
426 futureEndTransfer.getResult().setAnswered(finalValue.isAnswered());
427 }
428 }
429 }
430
431
432
433
434 public final R66Future getFutureEndTransfer() {
435 return futureEndTransfer;
436 }
437
438
439
440
441
442
443 public final void waitReadyForSendThrough() throws OpenR66Exception {
444 logger.debug("Wait for End of Prepare Transfer");
445 futureEndTransfer.awaitOrInterruptible();
446 if (futureEndTransfer.isSuccess()) {
447
448 futureEndTransfer = new R66Future(true);
449 } else {
450 if (futureEndTransfer.getResult() != null &&
451 futureEndTransfer.getResult().getException() != null) {
452 throw futureEndTransfer.getResult().getException();
453 } else if (futureEndTransfer.getCause() != null) {
454 throw new OpenR66RunnerErrorException(futureEndTransfer.getCause());
455 } else {
456 throw new OpenR66RunnerErrorException("Unknown reason");
457 }
458 }
459 }
460
461
462
463
464 public final R66Future getFutureValidRequest() {
465 return futureValidRequest;
466 }
467
468
469
470
471 public final R66Future getFutureRequest() {
472 return futureRequest;
473 }
474
475
476
477
478
479
480 public final void invalidateRequest(final R66Result finalvalue) {
481 R66Result finalValue = finalvalue;
482 if (finalValue == null) {
483 finalValue =
484 new R66Result(session, false, ErrorCode.Unknown, session.getRunner());
485 }
486 if (logger.isDebugEnabled()) {
487 logger.debug(
488 "FST: " + futureStartup.isDone() + ":" + futureStartup.isSuccess() +
489 " FCT: " + futureConnection.isDone() + ':' +
490 futureConnection.isSuccess() + " FET: " + futureEndTransfer.isDone() +
491 ':' + futureEndTransfer.isSuccess() + " FVR: " +
492 futureValidRequest.isDone() + ':' + futureValidRequest.isSuccess() +
493 " FR: " + futureRequest.isDone() + ':' + futureRequest.isSuccess() +
494 ' ' + finalValue.getMessage());
495 }
496 if (!futureStartup.isDone()) {
497 futureStartup.setResult(finalValue);
498 if (finalValue.getException() != null) {
499 futureStartup.setFailure(finalValue.getException());
500 } else {
501 futureStartup.cancel();
502 }
503 }
504 if (!futureConnection.isDone()) {
505 futureConnection.setResult(finalValue);
506 if (finalValue.getException() != null) {
507 futureConnection.setFailure(finalValue.getException());
508 } else {
509 futureConnection.cancel();
510 }
511 }
512 if (!futureEndTransfer.isDone()) {
513 futureEndTransfer.setResult(finalValue);
514 if (finalValue.getException() != null) {
515 futureEndTransfer.setFailure(finalValue.getException());
516 } else {
517 futureEndTransfer.cancel();
518 }
519 }
520 if (!futureValidRequest.isDone()) {
521 futureValidRequest.setResult(finalValue);
522 if (finalValue.getException() != null) {
523 futureValidRequest.setFailure(finalValue.getException());
524 } else {
525 futureValidRequest.cancel();
526 }
527 }
528 if (logger.isTraceEnabled()) {
529 logger.trace("Invalidate Request",
530 new Exception("DEBUG Trace for Invalidation"));
531 }
532 if (finalValue.getCode() != ErrorCode.ServerOverloaded) {
533 if (!futureRequest.isDone()) {
534 setErrorMessage(finalValue.getMessage(), finalValue.getCode());
535 futureRequest.setResult(finalValue);
536 if (finalValue.getException() != null) {
537 futureRequest.setFailure(finalValue.getException());
538 } else {
539 futureRequest.cancel();
540 }
541 } else {
542 logger.debug("Could not invalidate since Already finished: {}",
543 futureEndTransfer.getResult());
544 }
545 } else {
546 setErrorMessage(finalValue.getMessage(), finalValue.getCode());
547 logger.info("Server Overloaded");
548 }
549 if (session != null) {
550 if (session.isSender()) {
551 NetworkTransaction.stopRetrieve(this);
552 }
553 }
554 }
555
556
557
558
559
560
561 public final void validateRequest(final R66Result finalValue) {
562 setErrorMessage("NoError", null);
563 if (!futureEndTransfer.isDone()) {
564 logger.debug("Will validate EndTransfer");
565 validateEndTransfer(finalValue);
566 }
567 if (!futureValidRequest.isDone()) {
568 futureValidRequest.setResult(finalValue);
569 futureValidRequest.setSuccess();
570 }
571 logger.debug("Validate Request");
572 if (!futureRequest.isDone()) {
573 if (finalValue.getOther() == null &&
574 session.getBusinessObject() != null &&
575 session.getBusinessObject().getInfo(session) != null) {
576 finalValue.setOther(session.getBusinessObject().getInfo(session));
577 }
578 futureRequest.setResult(finalValue);
579 futureRequest.setSuccess();
580 } else {
581 logger.info("Already validated: {} {}", futureRequest.isSuccess(),
582 finalValue);
583 if (!futureRequest.getResult().isAnswered()) {
584 futureRequest.getResult().setAnswered(finalValue.isAnswered());
585 }
586 }
587 }
588
589 private long getMinLimit(final long a, final long b) {
590 long res = a;
591 if (a <= 0) {
592 res = b;
593 } else if (b > 0 && b < a) {
594 res = b;
595 }
596 return res;
597 }
598
599 public final void setChannelLimit(final boolean isSender, final long limit) {
600 final ChannelTrafficShapingHandler limitHandler =
601 (ChannelTrafficShapingHandler) networkChannelRef.channel().pipeline()
602 .get(
603 NetworkServerInitializer.LIMITCHANNEL);
604 if (isSender) {
605 limitHandler.setWriteLimit(limit);
606 logger.info("Will write at {} Bytes/sec", limit);
607 } else {
608 limitHandler.setReadLimit(limit);
609 logger.info("Will read at {} Bytes/sec", limit);
610 }
611 }
612
613 public final long getChannelLimit(final boolean isSender) {
614 final long global;
615 final long channel;
616 if (isSender) {
617 global = Configuration.configuration.getServerGlobalWriteLimit();
618 channel = Configuration.configuration.getServerChannelWriteLimit();
619 } else {
620 global = Configuration.configuration.getServerGlobalReadLimit();
621 channel = Configuration.configuration.getServerChannelReadLimit();
622 }
623 return getMinLimit(global, channel);
624 }
625
626 @Override
627 public final String toString() {
628 return "LCR: L: " + localId + " R: " + remoteId + " Startup[" +
629 futureStartup + "] Conn[" + futureConnection +
630 "] ValidRequestRequest[" + futureValidRequest + "] EndTransfer[" +
631 (futureEndTransfer != null? futureEndTransfer : "noEndTransfer") +
632 "] Request[" + (futureRequest != null? futureRequest : "noRequest") +
633 ']';
634 }
635
636
637
638
639 public final RecvThroughHandler getRecvThroughHandler() {
640 return recvThroughHandler;
641 }
642
643
644
645
646 public final boolean isRecvThroughMode() {
647 return recvThroughHandler != null;
648 }
649
650
651
652
653 public final void setRecvThroughHandler(
654 final RecvThroughHandler recvThroughHandler) {
655 this.recvThroughHandler = recvThroughHandler;
656 }
657
658
659
660
661 public final boolean isSendThroughMode() {
662 return isSendThroughMode;
663 }
664
665
666
667
668 public final void setSendThroughMode(final boolean isSendThroughMode) {
669 this.isSendThroughMode = isSendThroughMode;
670 }
671
672
673
674
675 public final ClientRunner getClientRunner() {
676 return clientRunner;
677 }
678
679
680
681
682 public final void setClientRunner(final ClientRunner clientRunner) {
683 this.clientRunner = clientRunner;
684 }
685
686
687
688
689
690
691 public final void sessionNewState(final R66FiniteDualStates desiredState) {
692 if (session != null) {
693 session.newState(desiredState);
694 }
695 }
696
697
698
699
700 public final R66FiniteDualStates getSessionState() {
701 if (session != null) {
702 return session.getState();
703 }
704 return R66FiniteDualStates.TEST;
705 }
706
707
708
709
710 public final String getHashComputeDuringTransfer() {
711 return hashComputeDuringTransfer;
712 }
713
714
715
716
717
718 public final void setHashComputeDuringTransfer(
719 final String hashComputeDuringTransfer) {
720 this.hashComputeDuringTransfer = hashComputeDuringTransfer;
721 }
722
723 public final void setPartialHash() {
724 partialHash = true;
725 }
726
727 public final boolean isPartialHash() {
728 return partialHash;
729 }
730
731
732
733
734 public final PartnerConfiguration getPartner() {
735 return partner;
736 }
737
738
739
740
741 public final void setPartner(final String hostId) {
742 logger.debug("host: {}", hostId);
743 partner = Configuration.configuration.getVersions().get(hostId);
744 if (partner == null) {
745 partner =
746 new PartnerConfiguration(hostId, R66Versions.V2_4_12.getVersion());
747 }
748 logger.debug("DEBUG {}", partner);
749 }
750
751
752
753
754 public final String getRequestId() {
755 return requestId;
756 }
757
758
759
760
761 public final void setRequestId(final String requestId) {
762 this.requestId = requestId;
763 }
764
765 }