View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.client;
21  
22  import io.netty.channel.ChannelFuture;
23  import org.waarp.common.database.exception.WaarpDatabaseException;
24  import org.waarp.common.file.DataBlock;
25  import org.waarp.common.logging.WaarpLoggerFactory;
26  import org.waarp.openr66.commander.ClientRunner;
27  import org.waarp.openr66.context.ErrorCode;
28  import org.waarp.openr66.context.R66FiniteDualStates;
29  import org.waarp.openr66.context.R66Result;
30  import org.waarp.openr66.context.R66Session;
31  import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
32  import org.waarp.openr66.database.data.DbRule;
33  import org.waarp.openr66.database.data.DbTaskRunner;
34  import org.waarp.openr66.protocol.configuration.Configuration;
35  import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
36  import org.waarp.openr66.protocol.exception.OpenR66DatabaseGlobalException;
37  import org.waarp.openr66.protocol.exception.OpenR66Exception;
38  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNoConnectionException;
39  import org.waarp.openr66.protocol.exception.OpenR66ProtocolNotYetConnectionException;
40  import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
41  import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
42  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
43  import org.waarp.openr66.protocol.localhandler.RetrieveRunner;
44  import org.waarp.openr66.protocol.localhandler.packet.EndRequestPacket;
45  import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
46  import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
47  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
48  import org.waarp.openr66.protocol.utils.ChannelUtils;
49  import org.waarp.openr66.protocol.utils.R66Future;
50  
51  /**
52   * Class for Send Through client
53   * <p>
54   * This class does not included the real file transfer since it is up to the
55   * business project to implement how
56   * to read new data to be sent to the remote host. If an error occurs, no
57   * transfer log is kept.
58   * <p>
59   * 1) Configuration must have been loaded<br>
60   * <br>
61   * 2) Pipeline and NetworkTransaction must have been initiated:<br>
62   * <tt>     Configuration.configuration.pipelineInit();</tt><br>
63   * <tt>     NetworkTransaction networkTransaction = new
64   * NetworkTransaction();</tt><br>
65   * <br>
66   * 3) Prepare the request of transfer:<br>
67   * <tt>     R66Future futureReq = new R66Future(true);</tt><br>
68   * <tt>     SendThroughClient transaction = new SendThroughClient(futureReq,...);</tt><br>
69   * <tt>     if (! transaction.initiateRequest()) { error }</tt><br>
70   * <br>
71   * 4) Once initiateRequest() gives true, you are ready to send the data in
72   * through mode like:<br>
73   * <tt>     byte[] data = readOrGetInSomeWayData();</tt><br>
74   * <tt>     DataBlock block = transaction.transformToDataBlock(data);</tt><br>
75   * <tt>     futureWrite = transaction.writeWhenPossible(block);</tt><br>
76   * <br>
77   * 5) Once you have finished, so this is the last block, you have to do the
78   * following:<br>
79   * If the last block is not empty:<br>
80   * <tt>     DataBlock block = transaction.transformToDataBlock(data);</tt><br>
81   * <tt>     block.setEOF(true);</tt><br>
82   * Or if the last block is empty:<br>
83   * <tt>     DataBlock block = transaction.transformToDataBlock(null);</tt><br>
84   * Then <br>
85   * <tt>     futureWrite = transaction.writeWhenPossible(block);</tt><br>
86   * <tt>     futureWrite.awaitUninterruptibly();</tt><br>
87   * <br>
88   * 6) If everything is in success:<br>
89   * <tt>     transaction.finalizeRequest();</tt><br>
90   * <br>
91   * And now wait for the transfer to finish:<br>
92   * <tt>     futureReq.awaitUninterruptibly();</tt><br>
93   * <tt>     R66Result result = futureReq.getResult();</tt><br>
94   * <br>
95   * 7) If there is the need to re-do, just re-execute the steps from 3 to 6.<br>
96   * Don't forget at the very end to finish the global structure (steps 3 to 6 no
97   * more executed):<br>
98   * <tt>     networkTransaction.closeAll();</tt><br>
99   * <br>
100  * 8) In case of errors during steps 4 or 5 (and only those), call the
101  * following:<br>
102  * <tr>
103  * transaction.transferInError(openR66Exception);
104  * </tr>
105  * <br>
106  * <br>
107  *
108  * @see TestSendThroughClient Class as example of usage in test part
109  */
110 public abstract class SendThroughClient extends AbstractTransfer {
111   protected final NetworkTransaction networkTransaction;
112   protected LocalChannelReference localChannelReference;
113   protected DbTaskRunner taskRunner;
114 
115   /**
116    * @param future
117    * @param remoteHost
118    * @param filename
119    * @param rulename
120    * @param fileinfo
121    * @param isMD5
122    * @param blocksize
123    * @param networkTransaction
124    * @param id
125    */
126   protected SendThroughClient(final R66Future future, final String remoteHost,
127                               final String filename, final String rulename,
128                               final String fileinfo, final boolean isMD5,
129                               final int blocksize, final long id,
130                               final NetworkTransaction networkTransaction) {
131     super(SendThroughClient.class, future, filename, rulename, fileinfo, isMD5,
132           remoteHost, blocksize, id, null);
133     this.networkTransaction = networkTransaction;
134   }
135 
136   /**
137    * DO NOT CALL THIS!
138    */
139   @Override
140   public void run() {
141     logger.error("DO NOT call this method for this class");
142   }
143 
144   /**
145    * Prior to call this method, the pipeline and NetworkTransaction must have
146    * been initialized. It is the
147    * responsibility of the caller to finish all network resources. Note that
148    * this is only the first part of the
149    * execution for this client.
150    *
151    * @return True if the initiate of the request is OK, else False
152    */
153   public boolean initiateRequest() {
154     if (logger == null) {
155       logger = WaarpLoggerFactory.getLogger(SendThroughClient.class);
156     }
157     final DbRule rule;
158     try {
159       rule = new DbRule(transferArgs.getRulename());
160     } catch (final WaarpDatabaseException e) {
161       logger.error("Cannot get Rule: " + transferArgs.getRulename() + ": {}",
162                    e.getMessage());
163       future.setResult(
164           new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
165                         ErrorCode.Internal, null));
166       future.setFailure(e);
167       return false;
168     }
169     int mode = rule.getMode();
170     if (transferArgs.isMD5()) {
171       mode = RequestPacket.getModeMD5(mode);
172     }
173     final String sep =
174         PartnerConfiguration.getSeparator(transferArgs.getRemoteHost());
175     final RequestPacket request =
176         new RequestPacket(transferArgs.getRulename(), mode,
177                           transferArgs.getFilename(),
178                           transferArgs.getBlockSize(), 0, transferArgs.getId(),
179                           transferArgs.getTransferInfo(), -1, sep);
180     // Not isRecv since it is the requester, so send => isSender is true
181     final boolean isSender = true;
182     try {
183       try {
184         // no starttime since immediate
185         taskRunner = new DbTaskRunner(rule, isSender, request,
186                                       transferArgs.getRemoteHost(), null);
187       } catch (final WaarpDatabaseException e) {
188         logger.error("Cannot get task: {}", e.getMessage());
189         future.setResult(
190             new R66Result(new OpenR66DatabaseGlobalException(e), null, true,
191                           ErrorCode.Internal, null));
192         future.setFailure(e);
193         return false;
194       }
195       final ClientRunner runner =
196           new ClientRunner(networkTransaction, taskRunner, future);
197       runner.setSendThroughMode();
198       OpenR66ProtocolNotYetConnectionException exc = null;
199       for (int i = 0; i < Configuration.RETRYNB; i++) {
200         try {
201           localChannelReference = runner.initRequest();
202           exc = null;
203           break;
204         } catch (final OpenR66ProtocolNoConnectionException e) {
205           logger.error("Cannot Connect", e);
206           future.setResult(
207               new R66Result(e, null, true, ErrorCode.ConnectionImpossible,
208                             taskRunner));
209           finalizeInErrorTransferRequest(runner, taskRunner,
210                                          ErrorCode.ConnectionImpossible);
211           future.setFailure(e);
212           return false;
213         } catch (final OpenR66ProtocolPacketException e) {
214           logger.error("Bad Protocol", e);
215           future.setResult(new R66Result(e, null, true, ErrorCode.TransferError,
216                                          taskRunner));
217           future.setFailure(e);
218           return false;
219         } catch (final OpenR66ProtocolNotYetConnectionException e) {
220           logger.debug("Not Yet Connected", e);
221           exc = e;
222         }
223       }
224       if (exc != null) {
225         taskRunner.setLocalChannelReference(new LocalChannelReference());
226         logger.error("Cannot Connect", exc);
227         future.setResult(
228             new R66Result(exc, null, true, ErrorCode.ConnectionImpossible,
229                           taskRunner));
230         future.setFailure(exc);
231         return false;
232       }
233       try {
234         localChannelReference.waitReadyForSendThrough();
235       } catch (final OpenR66Exception e) {
236         logger.error("Cannot Transfer", e);
237         future.setResult(
238             new R66Result(e, null, true, ErrorCode.Internal, taskRunner));
239         future.setFailure(e);
240         return false;
241       }
242       // now start the send from external data
243       return true;
244     } finally {
245       if (taskRunner != null && (future.isFailed() || nolog)) {
246         try {
247           taskRunner.delete();
248         } catch (final WaarpDatabaseException ignored) {
249           // nothing
250         }
251       }
252     }
253   }
254 
255   /**
256    * Finalize the request
257    */
258   public void finalizeRequest() {
259     try {
260       try {
261         ChannelUtils.writeEndTransfer(localChannelReference);
262       } catch (final OpenR66ProtocolPacketException e) {
263         // An error occurs!
264         try {
265           localChannelReference.getSession().setFinalizeTransfer(false,
266                                                                  new R66Result(
267                                                                      e,
268                                                                      localChannelReference.getSession(),
269                                                                      false,
270                                                                      ErrorCode.Internal,
271                                                                      taskRunner));
272         } catch (final OpenR66RunnerErrorException e1) {
273           transferInError(e1);
274           return;
275         } catch (final OpenR66ProtocolSystemException e1) {
276           transferInError(e1);
277           return;
278         }
279       }
280       localChannelReference.getFutureEndTransfer().awaitOrInterruptible();
281       logger.debug("Await future End Transfer done: {}",
282                    localChannelReference.getFutureEndTransfer().isSuccess());
283       if (localChannelReference.getFutureEndTransfer().isSuccess()) {
284         // send a validation
285         localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
286         final EndRequestPacket validPacket =
287             new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
288         final R66Session session = localChannelReference.getSession();
289         if (session != null && session.getExtendedProtocol() &&
290             session.getBusinessObject() != null &&
291             session.getBusinessObject().getInfo(session) != null) {
292           validPacket.setOptional(session.getBusinessObject().getInfo(session));
293         }
294         try {
295           ChannelUtils.writeAbstractLocalPacket(localChannelReference,
296                                                 validPacket, false);
297         } catch (final OpenR66ProtocolPacketException ignored) {
298           // nothing
299         }
300         if (!localChannelReference.getFutureRequest().awaitOrInterruptible()) {
301           // valid it however
302           localChannelReference.validateRequest(
303               localChannelReference.getFutureEndTransfer().getResult());
304         }
305         if (taskRunner != null && taskRunner.isRequestOnRequested()) {
306           localChannelReference.close();
307         }
308       } else {
309         transferInError(null);
310       }
311     } finally {
312       if (taskRunner != null) {
313         if (future.isDone() && !future.isSuccess() || nolog) {
314           try {
315             taskRunner.delete();
316           } catch (final WaarpDatabaseException ignored) {
317             // nothing
318           }
319         }
320       }
321     }
322   }
323 
324   /**
325    * To be used in case of error after a correct initiate of the request
326    *
327    * @param e
328    */
329   public void transferInError(final OpenR66Exception e) {
330     if (localChannelReference != null) {
331       if (!localChannelReference.getFutureEndTransfer().getResult()
332                                 .isAnswered()) {
333         final R66Result result =
334             new R66Result(e, localChannelReference.getSession(), true,
335                           ErrorCode.TransferError, taskRunner);
336         logger.error("Transfer in error", e);
337         localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
338         final ErrorPacket error = new ErrorPacket("Transfer in error",
339                                                   ErrorCode.TransferError.getCode(),
340                                                   ErrorPacket.FORWARDCLOSECODE);
341         try {
342           ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
343                                                 false);
344         } catch (final OpenR66ProtocolPacketException ignored) {
345           // nothing
346         }
347         localChannelReference.invalidateRequest(result);
348       }
349       localChannelReference.close();
350     }
351   }
352 
353   /**
354    * Write the next block when the channel is ready to prevent OOM
355    *
356    * @param block
357    *
358    * @return the ChannelFuture on the write operation
359    *
360    * @throws OpenR66ProtocolPacketException
361    */
362   public ChannelFuture writeWhenPossible(final DataBlock block)
363       throws OpenR66ProtocolPacketException {
364     return RetrieveRunner.writeWhenPossible(block, localChannelReference, null,
365                                             null);
366   }
367 
368   /**
369    * Utility method for send through mode
370    *
371    * @param data the data byte, if null it is the last block
372    * @param length length of data
373    *
374    * @return the DataBlock associated to the data
375    */
376   public DataBlock transformToDataBlock(final byte[] data, final int length) {
377     final DataBlock block = new DataBlock();
378     if (data == null) {
379       // last block
380       block.setEOF(true);
381     } else {
382       block.setBlock(data, length);
383     }
384     return block;
385   }
386 }