View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.protocol.localhandler;
21  
22  import io.netty.channel.ChannelFuture;
23  import org.waarp.common.digest.FilesystemBasedDigest;
24  import org.waarp.common.file.DataBlock;
25  import org.waarp.common.logging.WaarpLogger;
26  import org.waarp.common.logging.WaarpLoggerFactory;
27  import org.waarp.openr66.context.ErrorCode;
28  import org.waarp.openr66.context.R66FiniteDualStates;
29  import org.waarp.openr66.context.R66Result;
30  import org.waarp.openr66.context.R66Session;
31  import org.waarp.openr66.context.filesystem.R66File;
32  import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
33  import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
34  import org.waarp.openr66.protocol.configuration.Configuration;
35  import org.waarp.openr66.protocol.exception.OpenR66Exception;
36  import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
37  import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
38  import org.waarp.openr66.protocol.localhandler.packet.EndRequestPacket;
39  import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
40  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
41  import org.waarp.openr66.protocol.utils.ChannelUtils;
42  
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  /**
46   * Retrieve transfer runner
47   */
48  public class RetrieveRunner extends Thread {
49    private static final String END_RETRIEVE_IN_ERROR = "End Retrieve in Error";
50  
51    /**
52     * Internal Logger
53     */
54    private static final WaarpLogger logger =
55        WaarpLoggerFactory.getLogger(RetrieveRunner.class);
56  
57    private final R66Session session;
58  
59    private final LocalChannelReference localChannelReference;
60  
61    private boolean done;
62  
63    protected final AtomicBoolean running = new AtomicBoolean(true);
64    private final String nameThread;
65  
66    protected RetrieveRunner() {
67      // empty constructor
68      session = null;
69      localChannelReference = null;
70      nameThread = "RetrieveRunner: None";
71      setName(nameThread);
72      setDaemon(true);
73    }
74  
75    /**
76     * @param session
77     */
78    public RetrieveRunner(final R66Session session) {
79      this.session = session;
80      localChannelReference = this.session.getLocalChannelReference();
81      nameThread = "RetrieveRunner: " + localChannelReference.getLocalId();
82      setName(nameThread);
83      setDaemon(true);
84    }
85  
86    /**
87     * Try to stop the runner
88     */
89    public final void stopRunner() {
90      running.set(false);
91    }
92  
93    @Override
94    public void run() {
95      boolean requestValidDone = false;
96      setName(nameThread);
97      try {
98        try {
99          if (session.getRunner().getGloballaststep() ==
100             TASKSTEP.POSTTASK.ordinal()) {
101           logger.warn("Restart from POSTTASK: EndTransfer");
102           // restart from PostTask global step so just end now
103           try {
104             ChannelUtils.writeEndTransfer(localChannelReference);
105           } catch (final OpenR66ProtocolPacketException e) {
106             transferInError(e);
107             logger.error(END_RETRIEVE_IN_ERROR);
108             return;
109           }
110         } else {
111           logger.debug("Start retrieve operation (send)");
112           final R66File r66File = session.getFile();
113           if (r66File == null) {
114             logger.error("R66File null : {}", r66File);
115             transferInError(
116                 new OpenR66RunnerErrorException("R66File not setup"));
117             logger.info(END_RETRIEVE_IN_ERROR);
118             return;
119           } else {
120             r66File.retrieveBlocking(running);
121           }
122         }
123       } catch (final OpenR66RunnerErrorException e) {
124         transferInError(e);
125         logger.info(END_RETRIEVE_IN_ERROR);
126         return;
127       } catch (final OpenR66ProtocolSystemException e) {
128         transferInError(e);
129         logger.info(END_RETRIEVE_IN_ERROR);
130         return;
131       } catch (final Exception e) {
132         logger.info("TRACE for unknown Exception ", e);
133         transferInError(new OpenR66RunnerErrorException(e));
134         logger.info(END_RETRIEVE_IN_ERROR);
135         return;
136       }
137       localChannelReference.getFutureEndTransfer().awaitOrInterruptible();
138       logger.debug("Await future End Transfer done: {}",
139                    localChannelReference.getFutureEndTransfer().isSuccess());
140       if (localChannelReference.getFutureEndTransfer().isDone() &&
141           localChannelReference.getFutureEndTransfer().isSuccess()) {
142         // send a validation
143         localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
144         final EndRequestPacket validPacket =
145             new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
146         if (session.getExtendedProtocol() &&
147             session.getBusinessObject() != null &&
148             session.getBusinessObject().getInfo(session) != null) {
149           validPacket.setOptional(session.getBusinessObject().getInfo(session));
150         }
151         try {
152           ChannelUtils.writeAbstractLocalPacket(localChannelReference,
153                                                 validPacket, false);
154           requestValidDone = true;
155         } catch (final OpenR66ProtocolPacketException ignored) {
156           // nothing
157         }
158         if (!localChannelReference.getFutureRequest().awaitOrInterruptible(
159             Configuration.configuration.getTimeoutCon()) ||
160             Thread.interrupted()) {
161           // valid it however
162           finalizeInternal();
163         }
164         if (session.getRunner() != null &&
165             session.getRunner().isRequestOnRequested()) {
166           localChannelReference.close();
167         }
168         done = true;
169       } else {
170         checkDoneNotAnswered();
171         if (!localChannelReference.getFutureRequest().isDone()) {
172           R66Result result =
173               localChannelReference.getFutureEndTransfer().getResult();
174           if (result == null) {
175             result = new R66Result(session, false, ErrorCode.TransferError,
176                                    session.getRunner());
177           }
178           localChannelReference.invalidateRequest(result);
179         }
180         done = true;
181         logger.info(END_RETRIEVE_IN_ERROR);
182       }
183     } finally {
184       try {
185         if (!done) {
186           finalizeRequestDone(requestValidDone);
187         }
188         NetworkTransaction.normalEndRetrieve(localChannelReference);
189       } finally {
190         setName("Finished_" + nameThread);
191       }
192     }
193   }
194 
195   private void finalizeInternal() {
196     session.getRunner().setAllDone();
197     try {
198       session.getRunner().saveStatus();
199     } catch (final OpenR66RunnerErrorException e) {
200       // ignore
201     }
202     localChannelReference.validateRequest(
203         localChannelReference.getFutureEndTransfer().getResult());
204   }
205 
206   private boolean checkDoneNotAnswered() {
207     if (localChannelReference.getFutureEndTransfer().isDone()) {
208       // Done and Not Success => error
209       if (!localChannelReference.getFutureEndTransfer().getResult()
210                                 .isAnswered()) {
211         localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
212         final ErrorPacket error =
213             new ErrorPacket(localChannelReference.getErrorMessage(),
214                             localChannelReference.getFutureEndTransfer()
215                                                  .getResult().getCode()
216                                                  .getCode(),
217                             ErrorPacket.FORWARDCLOSECODE);
218         try {
219           ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
220                                                 false);
221         } catch (final OpenR66ProtocolPacketException ignored) {
222           // ignore
223         }
224       }
225       return true;
226     }
227     return false;
228   }
229 
230   private void finalizeRequestDone(final boolean requestValidDone) {
231     if (localChannelReference.getFutureEndTransfer().isDone() &&
232         localChannelReference.getFutureEndTransfer().isSuccess()) {
233       if (!requestValidDone) {
234         localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
235         final EndRequestPacket validPacket =
236             new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
237         if (session.getExtendedProtocol() &&
238             session.getBusinessObject() != null &&
239             session.getBusinessObject().getInfo(session) != null) {
240           validPacket.setOptional(session.getBusinessObject().getInfo(session));
241         }
242         try {
243           ChannelUtils.writeAbstractLocalPacket(localChannelReference,
244                                                 validPacket, false);
245         } catch (final OpenR66ProtocolPacketException ignored) {
246           // nothing
247         }
248       }
249       finalizeInternal();
250       if (session.getRunner() != null &&
251           session.getRunner().isRequestOnRequested()) {
252         localChannelReference.close();
253       }
254     } else {
255       if (!checkDoneNotAnswered()) {
256         R66Result result =
257             localChannelReference.getFutureEndTransfer().getResult();
258         if (result == null) {
259           result = new R66Result(session, false, ErrorCode.TransferError,
260                                  session.getRunner());
261         }
262         localChannelReference.invalidateRequest(result);
263       }
264     }
265   }
266 
267   private void transferInError(final OpenR66Exception e) {
268     final R66Result result =
269         new R66Result(e, session, true, ErrorCode.TransferError,
270                       session.getRunner());
271     logger.error("Transfer in error", e);
272     session.newState(R66FiniteDualStates.ERROR);
273     final ErrorPacket error =
274         new ErrorPacket("Transfer in error", ErrorCode.TransferError.getCode(),
275                         ErrorPacket.FORWARDCLOSECODE);
276     try {
277       ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
278                                             false);
279     } catch (final OpenR66ProtocolPacketException ignored) {
280       // ignore
281     }
282     localChannelReference.invalidateRequest(result);
283     localChannelReference.close();
284     done = true;
285   }
286 
287   /**
288    * Write the next block when the channel is ready to prevent OOM
289    *
290    * @param block
291    * @param localChannelReference
292    * @param digestGlobal
293    * @param digestBlock
294    *
295    * @return the ChannelFuture on the write operation
296    *
297    * @throws OpenR66ProtocolPacketException
298    */
299   public static ChannelFuture writeWhenPossible(final DataBlock block,
300                                                 final LocalChannelReference localChannelReference,
301                                                 final FilesystemBasedDigest digestGlobal,
302                                                 final FilesystemBasedDigest digestBlock)
303       throws OpenR66ProtocolPacketException {
304     return ChannelUtils.writeBackDataBlock(localChannelReference, digestGlobal,
305                                            block, digestBlock);
306   }
307 
308   public final int getLocalId() {
309     return localChannelReference.getLocalId();
310   }
311 
312 
313   /**
314    * When submit RetrieveRunner cannot be done since Executor is already stopped
315    */
316   public final void notStartRunner() {
317     transferInError(
318         new OpenR66RunnerErrorException("Cannot Start Runner: " + session));
319     stopRunner();
320   }
321 
322 }