1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.protocol.localhandler;
21
22 import io.netty.channel.ChannelFuture;
23 import org.waarp.common.digest.FilesystemBasedDigest;
24 import org.waarp.common.file.DataBlock;
25 import org.waarp.common.logging.WaarpLogger;
26 import org.waarp.common.logging.WaarpLoggerFactory;
27 import org.waarp.openr66.context.ErrorCode;
28 import org.waarp.openr66.context.R66FiniteDualStates;
29 import org.waarp.openr66.context.R66Result;
30 import org.waarp.openr66.context.R66Session;
31 import org.waarp.openr66.context.filesystem.R66File;
32 import org.waarp.openr66.context.task.exception.OpenR66RunnerErrorException;
33 import org.waarp.openr66.database.data.DbTaskRunner.TASKSTEP;
34 import org.waarp.openr66.protocol.configuration.Configuration;
35 import org.waarp.openr66.protocol.exception.OpenR66Exception;
36 import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
37 import org.waarp.openr66.protocol.exception.OpenR66ProtocolSystemException;
38 import org.waarp.openr66.protocol.localhandler.packet.EndRequestPacket;
39 import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
40 import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
41 import org.waarp.openr66.protocol.utils.ChannelUtils;
42
43 import java.util.concurrent.atomic.AtomicBoolean;
44
45
46
47
48 public class RetrieveRunner extends Thread {
49 private static final String END_RETRIEVE_IN_ERROR = "End Retrieve in Error";
50
51
52
53
54 private static final WaarpLogger logger =
55 WaarpLoggerFactory.getLogger(RetrieveRunner.class);
56
57 private final R66Session session;
58
59 private final LocalChannelReference localChannelReference;
60
61 private boolean done;
62
63 protected final AtomicBoolean running = new AtomicBoolean(true);
64 private final String nameThread;
65
66 protected RetrieveRunner() {
67
68 session = null;
69 localChannelReference = null;
70 nameThread = "RetrieveRunner: None";
71 setName(nameThread);
72 setDaemon(true);
73 }
74
75
76
77
78 public RetrieveRunner(final R66Session session) {
79 this.session = session;
80 localChannelReference = this.session.getLocalChannelReference();
81 nameThread = "RetrieveRunner: " + localChannelReference.getLocalId();
82 setName(nameThread);
83 setDaemon(true);
84 }
85
86
87
88
89 public final void stopRunner() {
90 running.set(false);
91 }
92
93 @Override
94 public void run() {
95 boolean requestValidDone = false;
96 setName(nameThread);
97 try {
98 try {
99 if (session.getRunner().getGloballaststep() ==
100 TASKSTEP.POSTTASK.ordinal()) {
101 logger.warn("Restart from POSTTASK: EndTransfer");
102
103 try {
104 ChannelUtils.writeEndTransfer(localChannelReference);
105 } catch (final OpenR66ProtocolPacketException e) {
106 transferInError(e);
107 logger.error(END_RETRIEVE_IN_ERROR);
108 return;
109 }
110 } else {
111 logger.debug("Start retrieve operation (send)");
112 final R66File r66File = session.getFile();
113 if (r66File == null) {
114 logger.error("R66File null : {}", r66File);
115 transferInError(
116 new OpenR66RunnerErrorException("R66File not setup"));
117 logger.info(END_RETRIEVE_IN_ERROR);
118 return;
119 } else {
120 r66File.retrieveBlocking(running);
121 }
122 }
123 } catch (final OpenR66RunnerErrorException e) {
124 transferInError(e);
125 logger.info(END_RETRIEVE_IN_ERROR);
126 return;
127 } catch (final OpenR66ProtocolSystemException e) {
128 transferInError(e);
129 logger.info(END_RETRIEVE_IN_ERROR);
130 return;
131 } catch (final Exception e) {
132 logger.info("TRACE for unknown Exception ", e);
133 transferInError(new OpenR66RunnerErrorException(e));
134 logger.info(END_RETRIEVE_IN_ERROR);
135 return;
136 }
137 localChannelReference.getFutureEndTransfer().awaitOrInterruptible();
138 logger.debug("Await future End Transfer done: {}",
139 localChannelReference.getFutureEndTransfer().isSuccess());
140 if (localChannelReference.getFutureEndTransfer().isDone() &&
141 localChannelReference.getFutureEndTransfer().isSuccess()) {
142
143 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
144 final EndRequestPacket validPacket =
145 new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
146 if (session.getExtendedProtocol() &&
147 session.getBusinessObject() != null &&
148 session.getBusinessObject().getInfo(session) != null) {
149 validPacket.setOptional(session.getBusinessObject().getInfo(session));
150 }
151 try {
152 ChannelUtils.writeAbstractLocalPacket(localChannelReference,
153 validPacket, false);
154 requestValidDone = true;
155 } catch (final OpenR66ProtocolPacketException ignored) {
156
157 }
158 if (!localChannelReference.getFutureRequest().awaitOrInterruptible(
159 Configuration.configuration.getTimeoutCon()) ||
160 Thread.interrupted()) {
161
162 finalizeInternal();
163 }
164 if (session.getRunner() != null &&
165 session.getRunner().isRequestOnRequested()) {
166 localChannelReference.close();
167 }
168 done = true;
169 } else {
170 checkDoneNotAnswered();
171 if (!localChannelReference.getFutureRequest().isDone()) {
172 R66Result result =
173 localChannelReference.getFutureEndTransfer().getResult();
174 if (result == null) {
175 result = new R66Result(session, false, ErrorCode.TransferError,
176 session.getRunner());
177 }
178 localChannelReference.invalidateRequest(result);
179 }
180 done = true;
181 logger.info(END_RETRIEVE_IN_ERROR);
182 }
183 } finally {
184 try {
185 if (!done) {
186 finalizeRequestDone(requestValidDone);
187 }
188 NetworkTransaction.normalEndRetrieve(localChannelReference);
189 } finally {
190 setName("Finished_" + nameThread);
191 }
192 }
193 }
194
195 private void finalizeInternal() {
196 session.getRunner().setAllDone();
197 try {
198 session.getRunner().saveStatus();
199 } catch (final OpenR66RunnerErrorException e) {
200
201 }
202 localChannelReference.validateRequest(
203 localChannelReference.getFutureEndTransfer().getResult());
204 }
205
206 private boolean checkDoneNotAnswered() {
207 if (localChannelReference.getFutureEndTransfer().isDone()) {
208
209 if (!localChannelReference.getFutureEndTransfer().getResult()
210 .isAnswered()) {
211 localChannelReference.sessionNewState(R66FiniteDualStates.ERROR);
212 final ErrorPacket error =
213 new ErrorPacket(localChannelReference.getErrorMessage(),
214 localChannelReference.getFutureEndTransfer()
215 .getResult().getCode()
216 .getCode(),
217 ErrorPacket.FORWARDCLOSECODE);
218 try {
219 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
220 false);
221 } catch (final OpenR66ProtocolPacketException ignored) {
222
223 }
224 }
225 return true;
226 }
227 return false;
228 }
229
230 private void finalizeRequestDone(final boolean requestValidDone) {
231 if (localChannelReference.getFutureEndTransfer().isDone() &&
232 localChannelReference.getFutureEndTransfer().isSuccess()) {
233 if (!requestValidDone) {
234 localChannelReference.sessionNewState(R66FiniteDualStates.ENDREQUESTS);
235 final EndRequestPacket validPacket =
236 new EndRequestPacket(ErrorCode.CompleteOk.ordinal());
237 if (session.getExtendedProtocol() &&
238 session.getBusinessObject() != null &&
239 session.getBusinessObject().getInfo(session) != null) {
240 validPacket.setOptional(session.getBusinessObject().getInfo(session));
241 }
242 try {
243 ChannelUtils.writeAbstractLocalPacket(localChannelReference,
244 validPacket, false);
245 } catch (final OpenR66ProtocolPacketException ignored) {
246
247 }
248 }
249 finalizeInternal();
250 if (session.getRunner() != null &&
251 session.getRunner().isRequestOnRequested()) {
252 localChannelReference.close();
253 }
254 } else {
255 if (!checkDoneNotAnswered()) {
256 R66Result result =
257 localChannelReference.getFutureEndTransfer().getResult();
258 if (result == null) {
259 result = new R66Result(session, false, ErrorCode.TransferError,
260 session.getRunner());
261 }
262 localChannelReference.invalidateRequest(result);
263 }
264 }
265 }
266
267 private void transferInError(final OpenR66Exception e) {
268 final R66Result result =
269 new R66Result(e, session, true, ErrorCode.TransferError,
270 session.getRunner());
271 logger.error("Transfer in error", e);
272 session.newState(R66FiniteDualStates.ERROR);
273 final ErrorPacket error =
274 new ErrorPacket("Transfer in error", ErrorCode.TransferError.getCode(),
275 ErrorPacket.FORWARDCLOSECODE);
276 try {
277 ChannelUtils.writeAbstractLocalPacket(localChannelReference, error,
278 false);
279 } catch (final OpenR66ProtocolPacketException ignored) {
280
281 }
282 localChannelReference.invalidateRequest(result);
283 localChannelReference.close();
284 done = true;
285 }
286
287
288
289
290
291
292
293
294
295
296
297
298
299 public static ChannelFuture writeWhenPossible(final DataBlock block,
300 final LocalChannelReference localChannelReference,
301 final FilesystemBasedDigest digestGlobal,
302 final FilesystemBasedDigest digestBlock)
303 throws OpenR66ProtocolPacketException {
304 return ChannelUtils.writeBackDataBlock(localChannelReference, digestGlobal,
305 block, digestBlock);
306 }
307
308 public final int getLocalId() {
309 return localChannelReference.getLocalId();
310 }
311
312
313
314
315
316 public final void notStartRunner() {
317 transferInError(
318 new OpenR66RunnerErrorException("Cannot Start Runner: " + session));
319 stopRunner();
320 }
321
322 }