1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.waarp.vitam.ingest;
22
23 import fr.gouv.vitam.access.external.client.AdminExternalClient;
24 import fr.gouv.vitam.common.GlobalDataRest;
25 import fr.gouv.vitam.common.LocalDateUtil;
26 import fr.gouv.vitam.common.PropertiesUtils;
27 import fr.gouv.vitam.common.client.VitamContext;
28 import fr.gouv.vitam.common.exception.InvalidParseOperationException;
29 import fr.gouv.vitam.common.exception.VitamClientException;
30 import fr.gouv.vitam.common.external.client.IngestCollection;
31 import fr.gouv.vitam.common.i18n.VitamLogbookMessages;
32 import fr.gouv.vitam.common.model.LocalFile;
33 import fr.gouv.vitam.common.model.RequestResponse;
34 import fr.gouv.vitam.common.model.RequestResponseOK;
35 import fr.gouv.vitam.common.model.StatusCode;
36 import fr.gouv.vitam.common.stream.StreamUtils;
37 import fr.gouv.vitam.ingest.external.api.exception.IngestExternalException;
38 import fr.gouv.vitam.ingest.external.client.IngestExternalClient;
39 import org.apache.commons.io.FileUtils;
40 import org.waarp.common.logging.SysErrLogger;
41 import org.waarp.common.logging.WaarpLogger;
42 import org.waarp.common.logging.WaarpLoggerFactory;
43 import org.waarp.common.utility.WaarpThreadFactory;
44 import org.waarp.vitam.common.OperationCheck;
45
46 import javax.ws.rs.core.Response;
47 import javax.ws.rs.core.Response.Status;
48 import java.io.BufferedReader;
49 import java.io.File;
50 import java.io.IOException;
51 import java.io.InputStream;
52 import java.io.InputStreamReader;
53 import java.nio.charset.StandardCharsets;
54 import java.nio.file.Files;
55 import java.nio.file.Path;
56 import java.time.LocalDateTime;
57 import java.util.List;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.TimeUnit;
61
62 import static java.nio.file.StandardCopyOption.*;
63 import static org.waarp.vitam.ingest.IngestRequest.*;
64
65
66
67
68
69 public class IngestManager implements Runnable {
70
71
72
73 public static final String ATR_FAILED = "ATR_FAILED";
74
75
76
77 public static final String ATR = "ATR";
78
79
80
81 public static final String INGEST_ID = "INGEST_ID";
82 protected static final String ERROR_MESSAGE = "{}\n\t{}";
83
84
85
86 private static final WaarpLogger logger =
87 WaarpLoggerFactory.getLogger(IngestManager.class);
88 private static final String INGEST_INT_UPLOAD = "STP_UPLOAD_SIP";
89 private static final String ATR_KO_DEFAULT_XML = "ATR_KO_DEFAULT.xml";
90 private static final String DATE = "#MADATE#";
91 private static final String MESSAGE_IDENTIFIER = "#MESSAGE_IDENTIFIER#";
92 private static final String ARCHIVAL_AGENCY = "#ARCHIVAL_AGENCY#";
93 private static final String TRANSFERRING_AGENCY = "#TRANSFERRING_AGENCY#";
94 private static final String COMMENT = "#COMMENT#";
95 private static final String EVENT_TYPE = "#EVENT_TYPE#";
96 private static final String EVENT_TYPE_CODE = "#EVENT_TYPE_CODE#";
97 private static final String EVENT_DATE_TIME = "#EVENT_DATE_TIME#";
98 private static final String OUTCOME = "#OUTCOME#";
99 private static final String OUTCOME_DETAIL = "#OUTCOME_DETAIL#";
100 private static final String OUTCOME_DETAIL_MESSAGE =
101 "#OUTCOME_DETAIL_MESSAGE#";
102 private static final String ISSUE_SINCE_INGEST_PACKET_PRODUCES_AN_ERROR =
103 "Issue since ingest packet produces an error";
104
105 private IngestRequest ingestRequest;
106 private AdminExternalClient adminExternalClient;
107 private IngestExternalClient client;
108 private IngestRequestFactory ingestRequestFactory;
109
110 IngestManager() {
111
112 }
113
114 private IngestManager(final IngestRequest ingestRequest,
115 final AdminExternalClient adminExternalClient,
116 final IngestExternalClient client,
117 final IngestRequestFactory ingestRequestFactory) {
118 this.ingestRequest = ingestRequest;
119 this.adminExternalClient = adminExternalClient;
120 this.client = client;
121 this.ingestRequestFactory = ingestRequestFactory;
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136 private static String buildAtrInternal(String messageIdentifier,
137 String archivalAgency,
138 String transferringAgency,
139 String eventType, String addedMessage,
140 StatusCode code,
141 LocalDateTime eventDateTime) {
142 String xmlDefault;
143 try {
144 xmlDefault = readInputStream(
145 PropertiesUtils.getResourceAsStream(ATR_KO_DEFAULT_XML));
146 } catch (final IOException e) {
147
148 xmlDefault = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
149 "<ArchiveTransferReply xmlns:xlink=\"http://www.w3.org/1999/xlink\"\n" +
150 " xmlns:pr=\"info:lc/xmlns/premis-v2\"\n" +
151 " xmlns=\"fr:gouv:culture:archivesdefrance:seda:v2.1\"\n" +
152 " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" +
153 " xsi:schemaLocation=\"fr:gouv:culture:archivesdefrance:seda:v2.1 seda-2.1-main.xsd\">\n" +
154 " <Comment>#COMMENT#</Comment>\n" +
155 " <Date>#MADATE#</Date>\n" +
156 " <MessageIdentifier>#MESSAGE_IDENTIFIER#</MessageIdentifier>\n" +
157 " \n" + " <CodeListVersions>\n" +
158 " <ReplyCodeListVersion>ReplyCodeListVersion0</ReplyCodeListVersion>\n" +
159 " <MessageDigestAlgorithmCodeListVersion>MessageDigestAlgorithmCodeListVersion0</MessageDigestAlgorithmCodeListVersion>\n" +
160 " <FileFormatCodeListVersion>FileFormatCodeListVersion0</FileFormatCodeListVersion>\n" +
161 " </CodeListVersions>\n" + "\n" +
162 " <ReplyCode>#OUTCOME#</ReplyCode>\n" +
163 " <Operation>\n" + " <Event>\n" +
164 " <EventTypeCode>#EVENT_TYPE_CODE#</EventTypeCode>\n" +
165 " <EventType>#EVENT_TYPE#</EventType>\n" +
166 " <EventDateTime>#EVENT_DATE_TIME#</EventDateTime>\n" +
167 " <Outcome>#OUTCOME#</Outcome>\n" +
168 " <OutcomeDetail>#OUTCOME_DETAIL#</OutcomeDetail>\n" +
169 " <OutcomeDetailMessage>#OUTCOME_DETAIL_MESSAGE#</OutcomeDetailMessage>\n" +
170 " </Event>\n" + " </Operation>\n" + "\n" +
171 " <MessageRequestIdentifier>Unknown</MessageRequestIdentifier>\n" +
172 " <ArchivalAgency>\n" +
173 " <Identifier>#ARCHIVAL_AGENCY#</Identifier>\n" +
174 " </ArchivalAgency>\n" + " <TransferringAgency>\n" +
175 " <Identifier>#TRANSFERRING_AGENCY#</Identifier>\n" +
176 " </TransferringAgency>\n" + "</ArchiveTransferReply>\n";
177 }
178 String detail = VitamLogbookMessages.getCodeOp(eventType, code);
179 if (addedMessage != null) {
180 detail += addedMessage;
181 }
182 String event = VitamLogbookMessages.getLabelOp(eventType);
183 return xmlDefault.replace(DATE, LocalDateUtil.now().toString())
184 .replace(MESSAGE_IDENTIFIER, messageIdentifier)
185 .replace(ARCHIVAL_AGENCY, archivalAgency)
186 .replace(TRANSFERRING_AGENCY, transferringAgency)
187 .replace(COMMENT, detail)
188 .replace(EVENT_TYPE_CODE, eventType)
189 .replace(EVENT_TYPE, event)
190 .replace(EVENT_DATE_TIME, eventDateTime.toString())
191 .replaceAll(OUTCOME, code.name())
192 .replace(OUTCOME_DETAIL, eventType + "." + code.name())
193 .replace(OUTCOME_DETAIL_MESSAGE, detail);
194 }
195
196
197
198
199
200
201
202
203
204
205 private static String readInputStreamLimited(InputStream input, int limit)
206 throws IOException {
207 final StringBuilder builder = new StringBuilder();
208 try (final InputStreamReader reader = new InputStreamReader(input)) {
209 try (final BufferedReader buffered = new BufferedReader(reader)) {
210 String line;
211 while ((line = buffered.readLine()) != null) {
212 builder.append(line).append('\n');
213 if (builder.length() >= limit) {
214 break;
215 }
216 }
217 }
218 }
219 return builder.toString();
220 }
221
222
223
224
225
226
227
228
229
230
231 private static String readInputStream(InputStream input) throws IOException {
232 return readInputStreamLimited(input, Integer.MAX_VALUE);
233 }
234
235
236
237
238
239
240
241
242
243
244 void retryAllExistingFiles(final IngestRequestFactory ingestRequestFactory,
245 final IngestExternalClient client,
246 final AdminExternalClient adminExternalClient,
247 final IngestMonitor ingestMonitor) {
248 List<IngestRequest> ingestRequests =
249 ingestRequestFactory.getExistingIngests();
250 if (ingestRequests.isEmpty()) {
251 return;
252 }
253 ExecutorService executorService = Executors
254 .newFixedThreadPool(ingestRequests.size(),
255 new WaarpThreadFactory("IngestManager"));
256 for (IngestRequest ingestRequest : ingestRequests) {
257 if (ingestMonitor.isShutdown()) {
258 return;
259 }
260 IngestManager task =
261 new IngestManager(ingestRequest, adminExternalClient, client,
262 ingestRequestFactory);
263 executorService.execute(task);
264 }
265 try {
266 Thread.sleep(ingestMonitor.getElapseTime());
267 } catch (InterruptedException e) {
268 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
269 }
270 executorService.shutdown();
271 while (!executorService.isTerminated()) {
272 try {
273 executorService.awaitTermination(ingestMonitor.getElapseTime(),
274 TimeUnit.MILLISECONDS);
275 } catch (InterruptedException e) {
276 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
277 }
278 }
279 executorService.shutdownNow();
280 }
281
282 @Override
283 public void run() {
284 logger.warn("Will run {}", ingestRequest);
285 try {
286 while (runStep(ingestRequestFactory, client, adminExternalClient,
287 ingestRequest)) {
288
289 if (ingestRequest.getStep() == null) {
290
291 break;
292 }
293 logger.debug("Will rerun {}", ingestRequest);
294 }
295 } catch (InvalidParseOperationException e) {
296
297 logger.error("Very bad since cannot save IngestRequest", e);
298 }
299 }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314 private boolean runStep(final IngestRequestFactory ingestRequestFactory,
315 final IngestExternalClient client,
316 final AdminExternalClient adminExternalClient,
317 final IngestRequest ingestRequest)
318 throws InvalidParseOperationException {
319 IngestStep step = ingestRequest.getStep();
320 logger.debug("Step is {} from {}", step, ingestRequest);
321 switch (step) {
322 case STARTUP:
323
324 break;
325 case RETRY_INGEST:
326
327 logger.info("Start from Ingest: {}", ingestRequest);
328 ingestLocally(ingestRequestFactory, ingestRequest, client);
329 break;
330 case RETRY_INGEST_ID:
331
332 logger.info("From Ingest Id: {}", ingestRequest);
333 sendBackId(ingestRequestFactory, ingestRequest);
334 break;
335 case RETRY_ATR:
336
337 logger.info("From ATR: {}", ingestRequest);
338 getStatusOfATR(ingestRequestFactory, ingestRequest, client,
339 adminExternalClient, ingestRequest.getVitamContext());
340 break;
341 case RETRY_ATR_FORWARD:
342
343 logger.info("From ATR_FORWARD: {}", ingestRequest);
344 File targetFile = ingestRequest.getAtrFile(ingestRequestFactory);
345 sendATRFile(ingestRequestFactory, ingestRequest, targetFile);
346 break;
347 case ERROR:
348 logger.info("From Error: {}", ingestRequest);
349 sendErrorBack(ingestRequestFactory, ingestRequest);
350 break;
351 case END:
352
353 logger.info("End of Ingest: {}", ingestRequest);
354 toDelete(ingestRequestFactory, ingestRequest);
355 break;
356 default:
357 throw new IllegalStateException("Unexpected value: " + step);
358 }
359 IngestStep newStep = ingestRequest.getStep();
360 return newStep != IngestStep.END && newStep != step;
361 }
362
363
364
365
366
367
368
369
370
371
372 int ingestLocally(final IngestRequestFactory ingestRequestFactory,
373 final IngestRequest ingestRequest,
374 final IngestExternalClient client) {
375 try {
376
377 ingestRequest.setStep(IngestStep.RETRY_INGEST, 0, ingestRequestFactory);
378 VitamContext vitamContext = ingestRequest.getVitamContext();
379 LocalFile localFile = ingestRequest.getLocalFile();
380 RequestResponse requestResponse = client
381 .ingestLocal(vitamContext, localFile, ingestRequest.getContextId(),
382 ingestRequest.getAction());
383 if (!requestResponse.isOk()) {
384 String requestIdNew =
385 requestResponse.getHeaderString(GlobalDataRest.X_REQUEST_ID);
386 if (requestIdNew == null || requestIdNew.isEmpty()) {
387 requestIdNew = "FAKE_REQUEST_ID";
388 }
389 ingestRequest.setRequestId(requestIdNew);
390 Status status = Status.fromStatusCode(requestResponse.getStatus());
391 switch (status) {
392 case SERVICE_UNAVAILABLE:
393
394 logger.warn(ERROR_MESSAGE, "Issue since service or ATR unavailable",
395 requestResponse);
396 ingestRequest
397 .setStep(IngestStep.RETRY_INGEST, 0, ingestRequestFactory);
398
399 return 1;
400 default:
401
402 logger.error(ERROR_MESSAGE,
403 ISSUE_SINCE_INGEST_PACKET_PRODUCES_AN_ERROR,
404 requestResponse);
405 ingestRequest.setStep(IngestStep.ERROR, status.getStatusCode(),
406 ingestRequestFactory);
407
408 }
409
410 return 2;
411 }
412
413 RequestResponseOK responseOK = (RequestResponseOK) requestResponse;
414 ingestRequest.setFromRequestResponse(responseOK);
415
416
417 return sendBackId(ingestRequestFactory, ingestRequest)? 0 : 1;
418 } catch (InvalidParseOperationException e) {
419 logger.error("FATAL: Issue since backup of request produces an error", e);
420 } catch (IngestExternalException e) {
421 logger.error(ISSUE_SINCE_INGEST_PACKET_PRODUCES_AN_ERROR, e);
422
423 try {
424 ingestRequest.setStep(IngestStep.RETRY_INGEST, 0, ingestRequestFactory);
425 } catch (InvalidParseOperationException ex) {
426
427 logger.error("FATAL: Very bad since cannot save IngestRequest", ex);
428 }
429 }
430 return 2;
431 }
432
433
434
435
436
437
438
439
440
441
442
443
444 private boolean sendBackId(final IngestRequestFactory ingestRequestFactory,
445 final IngestRequest ingestRequest)
446 throws InvalidParseOperationException {
447 File idMessage = new File("/tmp/" + ingestRequest.getRequestId() + ".xml");
448 try {
449 String atr = buildAtrInternal(ingestRequest.getRequestId(),
450 "ArchivalAgencyToBeDefined",
451 "TransferringAgencyToBeDefined",
452 INGEST_INT_UPLOAD, "(Accepted by Vitam)",
453 StatusCode.STARTED, LocalDateUtil.now());
454 try {
455 FileUtils.write(idMessage, atr, StandardCharsets.UTF_8);
456 } catch (IOException e) {
457
458 logger.error("Very bad since cannot save pseudo ATR", e);
459 ingestRequest
460 .setStep(IngestStep.RETRY_INGEST_ID, 0, ingestRequestFactory);
461 return false;
462 }
463 ingestRequest
464 .setStep(IngestStep.RETRY_INGEST_ID, 0, ingestRequestFactory);
465 if (ingestRequestFactory.getManagerToWaarp(ingestRequest)
466 .sendBackInformation(ingestRequestFactory,
467 ingestRequest,
468 idMessage.getAbsolutePath(),
469 INGEST_ID)) {
470
471 if (ingestRequest.isCheckAtr()) {
472 ingestRequest.setStep(IngestStep.RETRY_ATR, 0, ingestRequestFactory);
473 } else {
474
475 toDelete(ingestRequestFactory, ingestRequest);
476 }
477 return true;
478 } else {
479
480 return false;
481 }
482 } finally {
483 try {
484 Files.delete(idMessage.toPath());
485 } catch (IOException e) {
486 logger.debug("Temporary file not deleted {}",
487 idMessage.getAbsolutePath());
488 }
489 }
490 }
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505 boolean getStatusOfATR(final IngestRequestFactory ingestRequestFactory,
506 final IngestRequest ingestRequest,
507 final IngestExternalClient client,
508 final AdminExternalClient adminExternalClient,
509 final VitamContext vitamContext)
510 throws InvalidParseOperationException {
511 Response response = null;
512 try {
513
514 ingestRequest.setStep(IngestStep.RETRY_ATR, 0, ingestRequestFactory);
515 OperationCheckperationCheck">OperationCheck operationCheck = new OperationCheck(adminExternalClient);
516 if (operationCheck.checkAvailabilityAtr(ingestRequest.getTenantId(),
517 ingestRequest.getRequestId())) {
518 response = client
519 .downloadObjectAsync(vitamContext, ingestRequest.getRequestId(),
520 IngestCollection.ARCHIVETRANSFERREPLY);
521 Status status = Status.fromStatusCode(response.getStatus());
522 switch (status) {
523 case OK:
524 sendATR(ingestRequestFactory, ingestRequest, response);
525 return true;
526 case SERVICE_UNAVAILABLE:
527 case NOT_FOUND:
528
529 logger.debug("Service or ATR unavailable yet\n\t{}",
530 status.getReasonPhrase());
531 return false;
532 default:
533
534 logger.error(ERROR_MESSAGE,
535 ISSUE_SINCE_INGEST_PACKET_PRODUCES_AN_ERROR,
536 status.getReasonPhrase());
537 ingestRequest.setStep(IngestStep.ERROR, response.getStatus(),
538 ingestRequestFactory);
539 }
540 }
541 } catch (VitamClientException e) {
542 logger.warn("Issue since ingest client produces an error", e);
543
544 ingestRequest.setStep(IngestStep.ERROR, 500, ingestRequestFactory);
545 } finally {
546
547 StreamUtils.consumeAnyEntityAndClose(response);
548 }
549 return false;
550 }
551
552
553
554
555
556
557
558
559
560
561
562 private void sendATR(final IngestRequestFactory ingestRequestFactory,
563 final IngestRequest ingestRequest,
564 final Response response)
565 throws InvalidParseOperationException {
566 try (final InputStream inputStream = response
567 .readEntity(InputStream.class)) {
568
569 File targetFile = ingestRequest.getAtrFile(ingestRequestFactory);
570 Path target = targetFile.toPath();
571 Files.copy(inputStream, target, REPLACE_EXISTING);
572
573 sendATRFile(ingestRequestFactory, ingestRequest, targetFile);
574 } catch (IOException e) {
575 logger
576 .error("File must be writable or InputStream error during close", e);
577 ingestRequest.setStep(IngestStep.ERROR,
578 Status.INTERNAL_SERVER_ERROR.getStatusCode(),
579 ingestRequestFactory);
580 }
581 }
582
583
584
585
586
587
588
589
590
591
592 private void sendATRFile(final IngestRequestFactory ingestRequestFactory,
593 final IngestRequest ingestRequest,
594 final File targetFile)
595 throws InvalidParseOperationException {
596 ingestRequest
597 .setStep(IngestStep.RETRY_ATR_FORWARD, 0, ingestRequestFactory);
598 if (!ingestRequestFactory.getManagerToWaarp(ingestRequest)
599 .sendBackInformation(ingestRequestFactory,
600 ingestRequest,
601 targetFile.getAbsolutePath(),
602 ATR)) {
603
604 ingestRequest
605 .setStep(IngestStep.RETRY_ATR_FORWARD, 0, ingestRequestFactory);
606 } else {
607 toDelete(ingestRequestFactory, ingestRequest);
608 }
609 }
610
611
612
613
614
615
616
617
618
619
620 private void toDelete(final IngestRequestFactory ingestRequestFactory,
621 final IngestRequest ingestRequest)
622 throws InvalidParseOperationException {
623
624 ingestRequest.setStep(IngestStep.END, 0, ingestRequestFactory);
625 if (!ingestRequestFactory.removeIngestRequest(ingestRequest)) {
626 logger
627 .error("Issue while cleaning this IngestRequest: {}", ingestRequest);
628 } else {
629 logger.info("End of IngestRequest: {}", ingestRequest);
630 }
631 }
632
633
634
635
636
637
638
639
640
641
642 private void sendErrorBack(final IngestRequestFactory ingestRequestFactory,
643 final IngestRequest ingestRequest)
644 throws InvalidParseOperationException {
645 logger.warn("Error to feedback since status not ok to restart: {}",
646 ingestRequest);
647
648 File file = ingestRequest.getAtrFile(ingestRequestFactory);
649 if (!file.canRead()) {
650
651 String atr = buildAtrInternal(ingestRequest.getRequestId(),
652 "ArchivalAgencyToBeDefined",
653 "TransferringAgencyToBeDefined",
654 INGEST_INT_UPLOAD,
655 "(Issue during Ingest Step [" +
656 ingestRequest.getStatus() +
657 "] while Waarp accessed to Vitam)",
658 StatusCode.FATAL, LocalDateUtil.now());
659 try {
660 FileUtils.write(file, atr, StandardCharsets.UTF_8);
661 } catch (IOException e) {
662
663 logger.error("Very bad since cannot save pseudo ATR", e);
664 return;
665 }
666 }
667 if (ingestRequestFactory.getManagerToWaarp(ingestRequest)
668 .sendBackInformation(ingestRequestFactory,
669 ingestRequest,
670 file.getAbsolutePath(),
671 ATR_FAILED)) {
672
673 toDelete(ingestRequestFactory, ingestRequest);
674 }
675
676 }
677
678 }