1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.waarp.vitam.dip;
22
23 import com.fasterxml.jackson.databind.JsonNode;
24 import fr.gouv.vitam.access.external.client.AccessExternalClient;
25 import fr.gouv.vitam.access.external.client.AdminExternalClient;
26 import fr.gouv.vitam.common.GlobalDataRest;
27 import fr.gouv.vitam.common.client.VitamContext;
28 import fr.gouv.vitam.common.error.VitamError;
29 import fr.gouv.vitam.common.exception.InvalidParseOperationException;
30 import fr.gouv.vitam.common.exception.VitamClientException;
31 import fr.gouv.vitam.common.model.RequestResponse;
32 import fr.gouv.vitam.common.model.RequestResponseOK;
33 import fr.gouv.vitam.common.stream.StreamUtils;
34 import org.apache.commons.io.FileUtils;
35 import org.waarp.common.logging.SysErrLogger;
36 import org.waarp.common.logging.WaarpLogger;
37 import org.waarp.common.logging.WaarpLoggerFactory;
38 import org.waarp.common.utility.WaarpThreadFactory;
39 import org.waarp.vitam.common.OperationCheck;
40 import org.waarp.vitam.dip.DipRequest.DIPStep;
41
42 import javax.ws.rs.core.Response;
43 import javax.ws.rs.core.Response.Status;
44 import java.io.File;
45 import java.io.IOException;
46 import java.io.InputStream;
47 import java.nio.charset.StandardCharsets;
48 import java.nio.file.Files;
49 import java.nio.file.Path;
50 import java.util.List;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.TimeUnit;
54
55 import static java.nio.file.StandardCopyOption.*;
56
57
58
59
60
61 public class DipManager implements Runnable {
62
63
64
65 public static final String DIP_FAILED = "DIP_FAILED";
66
67
68
69 public static final String DIP = "DIP";
70 protected static final String ERROR_MESSAGE = "{}\n\t{}";
71
72
73
74 private static final WaarpLogger logger =
75 WaarpLoggerFactory.getLogger(DipManager.class);
76 private static final String ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR =
77 "Issue since Select produces an error";
78
79 private DipRequest dipRequest;
80 private AdminExternalClient adminExternalClient;
81 private AccessExternalClient client;
82 private DipRequestFactory dipRequestFactory;
83
84 DipManager() {
85
86 }
87
88 private DipManager(final DipRequest dipRequest,
89 final AdminExternalClient adminExternalClient,
90 final AccessExternalClient client,
91 final DipRequestFactory dipRequestFactory) {
92 this.dipRequest = dipRequest;
93 this.adminExternalClient = adminExternalClient;
94 this.client = client;
95 this.dipRequestFactory = dipRequestFactory;
96 }
97
98
99
100
101
102
103
104
105
106
107 void retryAllExistingFiles(final DipRequestFactory dipRequestFactory,
108 final AccessExternalClient client,
109 final AdminExternalClient adminExternalClient,
110 final DipMonitor dipMonitor) {
111 List<DipRequest> dipRequests = dipRequestFactory.getExistingDips();
112 if (dipRequests.isEmpty()) {
113 return;
114 }
115 ExecutorService executorService = Executors
116 .newFixedThreadPool(dipRequests.size(),
117 new WaarpThreadFactory("DipManager"));
118 for (DipRequest dipRequest : dipRequests) {
119 if (dipMonitor.isShutdown()) {
120 return;
121 }
122 DipManager.html#DipManager">DipManager task = new DipManager(dipRequest, adminExternalClient, client,
123 dipRequestFactory);
124 executorService.execute(task);
125 }
126 try {
127 Thread.sleep(dipMonitor.getElapseTime());
128 } catch (InterruptedException e) {
129 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
130 }
131 executorService.shutdown();
132 while (!executorService.isTerminated()) {
133 try {
134 executorService.awaitTermination(dipMonitor.getElapseTime(),
135 TimeUnit.MILLISECONDS);
136 } catch (InterruptedException e) {
137 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
138 }
139 }
140 executorService.shutdownNow();
141 }
142
143 @Override
144 public void run() {
145 logger.warn("Will run {}", dipRequest);
146 try {
147 while (runStep(dipRequestFactory, client, adminExternalClient,
148 dipRequest)) {
149
150 if (dipRequest.getStep() == null) {
151
152 break;
153 }
154 logger.debug("Will rerun {}", dipRequest);
155 }
156 } catch (InvalidParseOperationException e) {
157
158 logger.error("Very bad since cannot save DipRequest", e);
159 }
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 private boolean runStep(final DipRequestFactory dipRequestFactory,
176 final AccessExternalClient client,
177 final AdminExternalClient adminExternalClient,
178 final DipRequest dipRequest)
179 throws InvalidParseOperationException {
180 DIPStep step = dipRequest.getStep();
181 logger.debug("Step is {} from {}", step, dipRequest);
182 switch (step) {
183 case STARTUP:
184
185 break;
186 case RETRY_SELECT:
187
188 logger.info("Start from Select: {}", dipRequest);
189 select(dipRequestFactory, dipRequest, client);
190 break;
191 case RETRY_DIP:
192
193 logger.info("From DIP: {}", dipRequest);
194 getDip(dipRequestFactory, dipRequest, client, adminExternalClient,
195 dipRequest.getVitamContext());
196 break;
197 case RETRY_DIP_FORWARD:
198
199 logger.info("From DIP_FORWARD: {}", dipRequest);
200 File targetFile = dipRequest.getDipFile(dipRequestFactory);
201 sendDipFile(dipRequestFactory, dipRequest, targetFile);
202 break;
203 case ERROR:
204 logger.info("From Error: {}", dipRequest);
205 sendErrorBack(dipRequestFactory, dipRequest);
206 break;
207 case END:
208
209 logger.info("End of DIP: {}", dipRequest);
210 toDelete(dipRequestFactory, dipRequest);
211 break;
212 default:
213 throw new IllegalStateException("Unexpected value: " + step);
214 }
215 DIPStep newStep = dipRequest.getStep();
216 return newStep != DIPStep.END && newStep != step;
217 }
218
219
220
221
222
223
224
225
226
227
228 int select(final DipRequestFactory dipRequestFactory,
229 final DipRequest dipRequest, final AccessExternalClient client) {
230 try {
231
232 dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
233 VitamContext vitamContext = dipRequest.getVitamContext();
234 JsonNode jsonNode = dipRequest.getSelectJson();
235 RequestResponse requestResponse =
236 client.exportDIP(vitamContext, jsonNode);
237 if (!requestResponse.isOk()) {
238 String requestIdNew =
239 requestResponse.getHeaderString(GlobalDataRest.X_REQUEST_ID);
240 if (requestIdNew == null || requestIdNew.isEmpty()) {
241 requestIdNew = "FAKE_REQUEST_ID";
242 }
243 dipRequest.setRequestId(requestIdNew);
244 Status status = Status.fromStatusCode(requestResponse.getStatus());
245 switch (status) {
246 case SERVICE_UNAVAILABLE:
247
248 logger.warn(ERROR_MESSAGE, "Issue since service unavailable",
249 requestResponse);
250 dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
251
252 return 1;
253 default:
254
255 logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
256 requestResponse);
257 dipRequest.setStep(DIPStep.ERROR, status.getStatusCode(),
258 dipRequestFactory);
259
260 }
261
262 return 2;
263 }
264
265 RequestResponseOK responseOK = (RequestResponseOK) requestResponse;
266 dipRequest.setFromRequestResponse(responseOK);
267
268
269 dipRequest
270 .setStep(DIPStep.RETRY_DIP, DIPStep.RETRY_DIP.getStatusMonitor(),
271 dipRequestFactory);
272 return 0;
273 } catch (InvalidParseOperationException e) {
274 logger.error("FATAL: Issue since backup of request produces an error", e);
275 } catch (VitamClientException e) {
276 logger.error(ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR, e);
277
278 try {
279
280 dipRequest.setStep(DIPStep.ERROR, 500, dipRequestFactory);
281
282
283 } catch (InvalidParseOperationException ex) {
284
285 logger.error("FATAL: Very bad since cannot save DipRequest", ex);
286 }
287 }
288 return 2;
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 boolean getDip(final DipRequestFactory dipRequestFactory,
305 final DipRequest dipRequest, final AccessExternalClient client,
306 final AdminExternalClient adminExternalClient,
307 final VitamContext vitamContext)
308 throws InvalidParseOperationException {
309 Response response = null;
310 try {
311 dipRequest.setStep(DIPStep.RETRY_DIP, 0, dipRequestFactory);
312 OperationCheckperationCheck">OperationCheck operationCheck = new OperationCheck(adminExternalClient);
313 if (operationCheck.checkAvailabilityAtr(dipRequest.getTenantId(),
314 dipRequest.getRequestId())) {
315 response = client.getDIPById(vitamContext, dipRequest.getRequestId());
316 Status status = Status.fromStatusCode(response.getStatus());
317 switch (status) {
318 case OK:
319 case ACCEPTED:
320 sendDIP(dipRequestFactory, dipRequest, response);
321 return true;
322 case SERVICE_UNAVAILABLE:
323 case NOT_FOUND:
324
325 logger.debug("Service or DIP unavailable yet\n\t{}",
326 status.getReasonPhrase());
327 return false;
328 default:
329
330 logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
331 status.getReasonPhrase());
332 dipRequest.setStep(DIPStep.ERROR, response.getStatus(),
333 dipRequestFactory);
334 }
335 }
336 } catch (VitamClientException e) {
337 logger.warn("Issue since access client produces an error", e);
338 } finally {
339
340 StreamUtils.consumeAnyEntityAndClose(response);
341 }
342 return false;
343 }
344
345
346
347
348
349
350
351
352
353
354
355 private void sendDIP(final DipRequestFactory dipRequestFactory,
356 final DipRequest dipRequest, final Response response)
357 throws InvalidParseOperationException {
358 try (final InputStream inputStream = response
359 .readEntity(InputStream.class)) {
360
361 File targetFile = dipRequest.getDipFile(dipRequestFactory);
362 Path target = targetFile.toPath();
363 Files.copy(inputStream, target, REPLACE_EXISTING);
364
365 sendDipFile(dipRequestFactory, dipRequest, targetFile);
366 } catch (IOException e) {
367 logger
368 .error("File must be writable or InputStream error during close", e);
369 dipRequest
370 .setStep(DIPStep.ERROR, Status.INTERNAL_SERVER_ERROR.getStatusCode(),
371 dipRequestFactory);
372 }
373 }
374
375
376
377
378
379
380
381
382
383
384 private void sendDipFile(final DipRequestFactory dipRequestFactory,
385 final DipRequest dipRequest, final File targetFile)
386 throws InvalidParseOperationException {
387 dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
388 if (!dipRequestFactory.getManagerToWaarp(dipRequest)
389 .sendBackInformation(dipRequestFactory, dipRequest,
390 targetFile.getAbsolutePath(),
391 DIP)) {
392
393 dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
394 } else {
395 toDelete(dipRequestFactory, dipRequest);
396 }
397 }
398
399
400
401
402
403
404
405
406
407
408 private void toDelete(final DipRequestFactory dipRequestFactory,
409 final DipRequest dipRequest)
410 throws InvalidParseOperationException {
411
412 dipRequest.setStep(DIPStep.END, 0, dipRequestFactory);
413 if (!dipRequestFactory.removeDipRequest(dipRequest)) {
414 logger.error("Issue while cleaning this DipRequest: {}", dipRequest);
415 } else {
416 logger.info("End of DipRequest: {}", dipRequest);
417 }
418 }
419
420
421
422
423
424
425
426
427
428
429 private void sendErrorBack(final DipRequestFactory dipRequestFactory,
430 final DipRequest dipRequest)
431 throws InvalidParseOperationException {
432 logger.warn("Error to feedback since status not ok to restart: {}",
433 dipRequest);
434
435 File file = dipRequest.getErrorFile(dipRequestFactory);
436 if (!file.canRead()) {
437
438 Status status = Status.fromStatusCode(dipRequest.getStatus());
439 VitamError error = getErrorEntity(status, status.getReasonPhrase(),
440 "Internal error while processing the DIP");
441 String err = error.toString();
442 try {
443 FileUtils.write(file, err, StandardCharsets.UTF_8);
444 } catch (IOException e) {
445
446 logger.error("Very bad since cannot save pseudo DIP", e);
447 return;
448 }
449 }
450 if (dipRequestFactory.getManagerToWaarp(dipRequest)
451 .sendBackInformation(dipRequestFactory, dipRequest,
452 file.getAbsolutePath(),
453 DIP_FAILED)) {
454
455 toDelete(dipRequestFactory, dipRequest);
456 }
457
458 }
459
460 private VitamError getErrorEntity(Status status, String msg,
461 String description) {
462 return new VitamError(status.name()).setHttpCode(status.getStatusCode())
463 .setContext("access")
464 .setState("code_vitam").setMessage(msg)
465 .setDescription(description);
466 }
467 }