View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.waarp.vitam.dip;
22  
23  import com.fasterxml.jackson.databind.JsonNode;
24  import fr.gouv.vitam.access.external.client.AccessExternalClient;
25  import fr.gouv.vitam.access.external.client.AdminExternalClient;
26  import fr.gouv.vitam.common.GlobalDataRest;
27  import fr.gouv.vitam.common.client.VitamContext;
28  import fr.gouv.vitam.common.error.VitamError;
29  import fr.gouv.vitam.common.exception.InvalidParseOperationException;
30  import fr.gouv.vitam.common.exception.VitamClientException;
31  import fr.gouv.vitam.common.model.RequestResponse;
32  import fr.gouv.vitam.common.model.RequestResponseOK;
33  import fr.gouv.vitam.common.stream.StreamUtils;
34  import org.apache.commons.io.FileUtils;
35  import org.waarp.common.logging.SysErrLogger;
36  import org.waarp.common.logging.WaarpLogger;
37  import org.waarp.common.logging.WaarpLoggerFactory;
38  import org.waarp.common.utility.WaarpThreadFactory;
39  import org.waarp.vitam.common.OperationCheck;
40  import org.waarp.vitam.dip.DipRequest.DIPStep;
41  
42  import javax.ws.rs.core.Response;
43  import javax.ws.rs.core.Response.Status;
44  import java.io.File;
45  import java.io.IOException;
46  import java.io.InputStream;
47  import java.nio.charset.StandardCharsets;
48  import java.nio.file.Files;
49  import java.nio.file.Path;
50  import java.util.List;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.concurrent.TimeUnit;
54  
55  import static java.nio.file.StandardCopyOption.*;
56  
57  /**
58   * DipManager is the central logic for DIP management between Waarp and
59   * Vitam
60   */
61  public class DipManager implements Runnable {
62    /**
63     * Prefix of File Information for DIP_FAILED
64     */
65    public static final String DIP_FAILED = "DIP_FAILED";
66    /**
67     * Prefix of File Information for DIP
68     */
69    public static final String DIP = "DIP";
70    protected static final String ERROR_MESSAGE = "{}\n\t{}";
71    /**
72     * Internal Logger
73     */
74    private static final WaarpLogger logger =
75        WaarpLoggerFactory.getLogger(DipManager.class);
76    private static final String ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR =
77        "Issue since Select produces an error";
78  
79    private DipRequest dipRequest;
80    private AdminExternalClient adminExternalClient;
81    private AccessExternalClient client;
82    private DipRequestFactory dipRequestFactory;
83  
84    DipManager() {
85      // Empty
86    }
87  
88    private DipManager(final DipRequest dipRequest,
89                       final AdminExternalClient adminExternalClient,
90                       final AccessExternalClient client,
91                       final DipRequestFactory dipRequestFactory) {
92      this.dipRequest = dipRequest;
93      this.adminExternalClient = adminExternalClient;
94      this.client = client;
95      this.dipRequestFactory = dipRequestFactory;
96    }
97  
98    /**
99     * Get all existing DipRequest and try to continue according to their
100    * status
101    *
102    * @param dipRequestFactory
103    * @param client
104    * @param adminExternalClient
105    * @param dipMonitor
106    */
107   void retryAllExistingFiles(final DipRequestFactory dipRequestFactory,
108                              final AccessExternalClient client,
109                              final AdminExternalClient adminExternalClient,
110                              final DipMonitor dipMonitor) {
111     List<DipRequest> dipRequests = dipRequestFactory.getExistingDips();
112     if (dipRequests.isEmpty()) {
113       return;
114     }
115     ExecutorService executorService = Executors
116         .newFixedThreadPool(dipRequests.size(),
117                             new WaarpThreadFactory("DipManager"));
118     for (DipRequest dipRequest : dipRequests) {
119       if (dipMonitor.isShutdown()) {
120         return;
121       }
122       DipManager.html#DipManager">DipManager task = new DipManager(dipRequest, adminExternalClient, client,
123                                        dipRequestFactory);
124       executorService.execute(task);
125     }
126     try {
127       Thread.sleep(dipMonitor.getElapseTime());
128     } catch (InterruptedException e) {//NOSONAR
129       SysErrLogger.FAKE_LOGGER.ignoreLog(e);
130     }
131     executorService.shutdown();
132     while (!executorService.isTerminated()) {
133       try {
134         executorService.awaitTermination(dipMonitor.getElapseTime(),
135                                          TimeUnit.MILLISECONDS);
136       } catch (InterruptedException e) {//NOSONAR
137         SysErrLogger.FAKE_LOGGER.ignoreLog(e);
138       }
139     }
140     executorService.shutdownNow();
141   }
142 
143   @Override
144   public void run() {
145     logger.warn("Will run {}", dipRequest);
146     try {
147       while (runStep(dipRequestFactory, client, adminExternalClient,
148                      dipRequest)) {
149         // Executing next step
150         if (dipRequest.getStep() == null) {
151           // END
152           break;
153         }
154         logger.debug("Will rerun {}", dipRequest);
155       }
156     } catch (InvalidParseOperationException e) {
157       // very bad
158       logger.error("Very bad since cannot save DipRequest", e);
159     }
160   }
161 
162   /**
163    * Rune next step for this DipRequest
164    *
165    * @param dipRequestFactory
166    * @param client
167    * @param adminExternalClient
168    * @param dipRequest
169    *
170    * @return true if it is possible to run again the next step for this
171    *     DipRequest
172    *
173    * @throws InvalidParseOperationException
174    */
175   private boolean runStep(final DipRequestFactory dipRequestFactory,
176                           final AccessExternalClient client,
177                           final AdminExternalClient adminExternalClient,
178                           final DipRequest dipRequest)
179       throws InvalidParseOperationException {
180     DIPStep step = dipRequest.getStep();
181     logger.debug("Step is {} from {}", step, dipRequest);
182     switch (step) {
183       case STARTUP:
184         // Ignore: request not ready for the manager
185         break;
186       case RETRY_SELECT:
187         // restart from Select
188         logger.info("Start from Select: {}", dipRequest);
189         select(dipRequestFactory, dipRequest, client);
190         break;
191       case RETRY_DIP:
192         // restart from DIP
193         logger.info("From DIP: {}", dipRequest);
194         getDip(dipRequestFactory, dipRequest, client, adminExternalClient,
195                dipRequest.getVitamContext());
196         break;
197       case RETRY_DIP_FORWARD:
198         // Write back the content of the DIP through Waarp
199         logger.info("From DIP_FORWARD: {}", dipRequest);
200         File targetFile = dipRequest.getDipFile(dipRequestFactory);
201         sendDipFile(dipRequestFactory, dipRequest, targetFile);
202         break;
203       case ERROR:
204         logger.info("From Error: {}", dipRequest);
205         sendErrorBack(dipRequestFactory, dipRequest);
206         break;
207       case END:
208         // To be deleted
209         logger.info("End of DIP: {}", dipRequest);
210         toDelete(dipRequestFactory, dipRequest);
211         break;
212       default:
213         throw new IllegalStateException("Unexpected value: " + step);
214     }
215     DIPStep newStep = dipRequest.getStep();
216     return newStep != DIPStep.END && newStep != step;
217   }
218 
219   /**
220    * Try to launch first step of DIP (step 1)
221    *
222    * @param dipRequestFactory
223    * @param dipRequest
224    * @param client
225    *
226    * @return 0 if OK, 1 if Warning, 2 if error
227    */
228   int select(final DipRequestFactory dipRequestFactory,
229              final DipRequest dipRequest, final AccessExternalClient client) {
230     try {
231       // Inform Vitam of an Ingest to proceed locally
232       dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
233       VitamContext vitamContext = dipRequest.getVitamContext();
234       JsonNode jsonNode = dipRequest.getSelectJson();
235       RequestResponse requestResponse =
236           client.exportDIP(vitamContext, jsonNode);
237       if (!requestResponse.isOk()) {
238         String requestIdNew =
239             requestResponse.getHeaderString(GlobalDataRest.X_REQUEST_ID);
240         if (requestIdNew == null || requestIdNew.isEmpty()) {
241           requestIdNew = "FAKE_REQUEST_ID";
242         }
243         dipRequest.setRequestId(requestIdNew);
244         Status status = Status.fromStatusCode(requestResponse.getStatus());
245         switch (status) {
246           case SERVICE_UNAVAILABLE:
247             // Should retry later on
248             logger.warn(ERROR_MESSAGE, "Issue since service unavailable",
249                         requestResponse);
250             dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
251             // Next step is RETRY_SELECT
252             return 1;
253           default:
254             // Very Bad: inform back of error
255             logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
256                          requestResponse);
257             dipRequest.setStep(DIPStep.ERROR, status.getStatusCode(),
258                                dipRequestFactory);
259             // Will inform back of error which could not be fixed when reloaded
260         }
261         // Next step is ERROR
262         return 2;
263       }
264       // Select sent and accepted
265       RequestResponseOK responseOK = (RequestResponseOK) requestResponse;
266       dipRequest.setFromRequestResponse(responseOK);
267 
268       // Now will start DIP pooling
269       dipRequest
270           .setStep(DIPStep.RETRY_DIP, DIPStep.RETRY_DIP.getStatusMonitor(),
271                    dipRequestFactory);
272       return 0;
273     } catch (InvalidParseOperationException e) {
274       logger.error("FATAL: Issue since backup of request produces an error", e);
275     } catch (VitamClientException e) {
276       logger.error(ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR, e);
277       // Should retry select from the beginning
278       try {
279         // FIXME this does not take into account various cases since Vitam masks the real reason
280         dipRequest.setStep(DIPStep.ERROR, 500, dipRequestFactory);
281         // Will inform back of error which could not be fixed when reloaded
282         // Ignore: dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
283       } catch (InvalidParseOperationException ex) {
284         // very bad
285         logger.error("FATAL: Very bad since cannot save DipRequest", ex);
286       }
287     }
288     return 2;
289   }
290 
291   /**
292    * Get the DIP (step 2)
293    *
294    * @param dipRequestFactory
295    * @param dipRequest
296    * @param client
297    * @param adminExternalClient
298    * @param vitamContext
299    *
300    * @return True if OK
301    *
302    * @throws InvalidParseOperationException
303    */
304   boolean getDip(final DipRequestFactory dipRequestFactory,
305                  final DipRequest dipRequest, final AccessExternalClient client,
306                  final AdminExternalClient adminExternalClient,
307                  final VitamContext vitamContext)
308       throws InvalidParseOperationException {
309     Response response = null;
310     try {
311       dipRequest.setStep(DIPStep.RETRY_DIP, 0, dipRequestFactory);
312       OperationCheckperationCheck">OperationCheck operationCheck = new OperationCheck(adminExternalClient);
313       if (operationCheck.checkAvailabilityAtr(dipRequest.getTenantId(),
314                                               dipRequest.getRequestId())) {
315         response = client.getDIPById(vitamContext, dipRequest.getRequestId());
316         Status status = Status.fromStatusCode(response.getStatus());
317         switch (status) {
318           case OK:
319           case ACCEPTED:
320             sendDIP(dipRequestFactory, dipRequest, response);
321             return true;
322           case SERVICE_UNAVAILABLE:
323           case NOT_FOUND:
324             // Should retry later on
325             logger.debug("Service or DIP unavailable yet\n\t{}",
326                          status.getReasonPhrase());
327             return false;
328           default:
329             // Very Bad: inform back of error
330             logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
331                          status.getReasonPhrase());
332             dipRequest.setStep(DIPStep.ERROR, response.getStatus(),
333                                dipRequestFactory);
334         }
335       }
336     } catch (VitamClientException e) {
337       logger.warn("Issue since access client produces an error", e);
338     } finally {
339       // Shall read all InputStream
340       StreamUtils.consumeAnyEntityAndClose(response);
341     }
342     return false;
343   }
344 
345   /**
346    * Send the DIP back to the Waarp Partner, directly from step 2 (DIP
347    * retrieve) (step 3)
348    *
349    * @param dipRequestFactory
350    * @param dipRequest
351    * @param response
352    *
353    * @throws InvalidParseOperationException
354    */
355   private void sendDIP(final DipRequestFactory dipRequestFactory,
356                        final DipRequest dipRequest, final Response response)
357       throws InvalidParseOperationException {
358     try (final InputStream inputStream = response
359         .readEntity(InputStream.class)) {
360       // Write file to be forwarded
361       File targetFile = dipRequest.getDipFile(dipRequestFactory);
362       Path target = targetFile.toPath();
363       Files.copy(inputStream, target, REPLACE_EXISTING);
364       // Write back the content of the DIP through Waarp
365       sendDipFile(dipRequestFactory, dipRequest, targetFile);
366     } catch (IOException e) {
367       logger
368           .error("File must be writable or InputStream error during close", e);
369       dipRequest
370           .setStep(DIPStep.ERROR, Status.INTERNAL_SERVER_ERROR.getStatusCode(),
371                    dipRequestFactory);
372     }
373   }
374 
375   /**
376    * Step to send DIP before finished (step 3)
377    *
378    * @param dipRequestFactory
379    * @param dipRequest
380    * @param targetFile
381    *
382    * @throws InvalidParseOperationException
383    */
384   private void sendDipFile(final DipRequestFactory dipRequestFactory,
385                            final DipRequest dipRequest, final File targetFile)
386       throws InvalidParseOperationException {
387     dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
388     if (!dipRequestFactory.getManagerToWaarp(dipRequest)
389                           .sendBackInformation(dipRequestFactory, dipRequest,
390                                                targetFile.getAbsolutePath(),
391                                                DIP)) {
392       // ATR already there but not sent, so retry
393       dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
394     } else {
395       toDelete(dipRequestFactory, dipRequest);
396     }
397   }
398 
399   /**
400    * Finalize DipRequest, whatever Done or in Error (final step 4 in case
401    * of Done)
402    *
403    * @param dipRequestFactory
404    * @param dipRequest
405    *
406    * @throws InvalidParseOperationException
407    */
408   private void toDelete(final DipRequestFactory dipRequestFactory,
409                         final DipRequest dipRequest)
410       throws InvalidParseOperationException {
411     // Ensure it will not be reloaded
412     dipRequest.setStep(DIPStep.END, 0, dipRequestFactory);
413     if (!dipRequestFactory.removeDipRequest(dipRequest)) {
414       logger.error("Issue while cleaning this DipRequest: {}", dipRequest);
415     } else {
416       logger.info("End of DipRequest: {}", dipRequest);
417     }
418   }
419 
420   /**
421    * If in Error, will send back the status of the operation to the Waarp
422    * Partner before ending.
423    *
424    * @param dipRequestFactory
425    * @param dipRequest
426    *
427    * @throws InvalidParseOperationException
428    */
429   private void sendErrorBack(final DipRequestFactory dipRequestFactory,
430                              final DipRequest dipRequest)
431       throws InvalidParseOperationException {
432     logger.warn("Error to feedback since status not ok to restart: {}",
433                 dipRequest);
434     // Feedback through Waarp
435     File file = dipRequest.getErrorFile(dipRequestFactory);
436     if (!file.canRead()) {
437       // Create a pseudo one
438       Status status = Status.fromStatusCode(dipRequest.getStatus());
439       VitamError error = getErrorEntity(status, status.getReasonPhrase(),
440                                         "Internal error while processing the DIP");
441       String err = error.toString();
442       try {
443         FileUtils.write(file, err, StandardCharsets.UTF_8);
444       } catch (IOException e) {
445         // very bad
446         logger.error("Very bad since cannot save pseudo DIP", e);
447         return;
448       }
449     }
450     if (dipRequestFactory.getManagerToWaarp(dipRequest)
451                          .sendBackInformation(dipRequestFactory, dipRequest,
452                                               file.getAbsolutePath(),
453                                               DIP_FAILED)) {
454       // Very end of this IngestRequest
455       toDelete(dipRequestFactory, dipRequest);
456     }
457     // else Since not sent, will retry later on: keep as is
458   }
459 
460   private VitamError getErrorEntity(Status status, String msg,
461                                     String description) {
462     return new VitamError(status.name()).setHttpCode(status.getStatusCode())
463                                         .setContext("access")
464                                         .setState("code_vitam").setMessage(msg)
465                                         .setDescription(description);
466   }
467 }