DipManager.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.vitam.dip;
import com.fasterxml.jackson.databind.JsonNode;
import fr.gouv.vitam.access.external.client.AccessExternalClient;
import fr.gouv.vitam.access.external.client.AdminExternalClient;
import fr.gouv.vitam.common.GlobalDataRest;
import fr.gouv.vitam.common.client.VitamContext;
import fr.gouv.vitam.common.error.VitamError;
import fr.gouv.vitam.common.exception.InvalidParseOperationException;
import fr.gouv.vitam.common.exception.VitamClientException;
import fr.gouv.vitam.common.model.RequestResponse;
import fr.gouv.vitam.common.model.RequestResponseOK;
import fr.gouv.vitam.common.stream.StreamUtils;
import org.apache.commons.io.FileUtils;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpThreadFactory;
import org.waarp.vitam.common.OperationCheck;
import org.waarp.vitam.dip.DipRequest.DIPStep;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static java.nio.file.StandardCopyOption.*;
/**
* DipManager is the central logic for DIP management between Waarp and
* Vitam
*/
public class DipManager implements Runnable {
/**
* Prefix of File Information for DIP_FAILED
*/
public static final String DIP_FAILED = "DIP_FAILED";
/**
* Prefix of File Information for DIP
*/
public static final String DIP = "DIP";
protected static final String ERROR_MESSAGE = "{}\n\t{}";
/**
* Internal Logger
*/
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(DipManager.class);
private static final String ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR =
"Issue since Select produces an error";
private DipRequest dipRequest;
private AdminExternalClient adminExternalClient;
private AccessExternalClient client;
private DipRequestFactory dipRequestFactory;
DipManager() {
// Empty
}
private DipManager(final DipRequest dipRequest,
final AdminExternalClient adminExternalClient,
final AccessExternalClient client,
final DipRequestFactory dipRequestFactory) {
this.dipRequest = dipRequest;
this.adminExternalClient = adminExternalClient;
this.client = client;
this.dipRequestFactory = dipRequestFactory;
}
/**
* Get all existing DipRequest and try to continue according to their
* status
*
* @param dipRequestFactory
* @param client
* @param adminExternalClient
* @param dipMonitor
*/
void retryAllExistingFiles(final DipRequestFactory dipRequestFactory,
final AccessExternalClient client,
final AdminExternalClient adminExternalClient,
final DipMonitor dipMonitor) {
List<DipRequest> dipRequests = dipRequestFactory.getExistingDips();
if (dipRequests.isEmpty()) {
return;
}
ExecutorService executorService = Executors
.newFixedThreadPool(dipRequests.size(),
new WaarpThreadFactory("DipManager"));
for (DipRequest dipRequest : dipRequests) {
if (dipMonitor.isShutdown()) {
return;
}
DipManager task = new DipManager(dipRequest, adminExternalClient, client,
dipRequestFactory);
executorService.execute(task);
}
try {
Thread.sleep(dipMonitor.getElapseTime());
} catch (InterruptedException e) {//NOSONAR
SysErrLogger.FAKE_LOGGER.ignoreLog(e);
}
executorService.shutdown();
while (!executorService.isTerminated()) {
try {
executorService.awaitTermination(dipMonitor.getElapseTime(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {//NOSONAR
SysErrLogger.FAKE_LOGGER.ignoreLog(e);
}
}
executorService.shutdownNow();
}
@Override
public void run() {
logger.warn("Will run {}", dipRequest);
try {
while (runStep(dipRequestFactory, client, adminExternalClient,
dipRequest)) {
// Executing next step
if (dipRequest.getStep() == null) {
// END
break;
}
logger.debug("Will rerun {}", dipRequest);
}
} catch (InvalidParseOperationException e) {
// very bad
logger.error("Very bad since cannot save DipRequest", e);
}
}
/**
* Rune next step for this DipRequest
*
* @param dipRequestFactory
* @param client
* @param adminExternalClient
* @param dipRequest
*
* @return true if it is possible to run again the next step for this
* DipRequest
*
* @throws InvalidParseOperationException
*/
private boolean runStep(final DipRequestFactory dipRequestFactory,
final AccessExternalClient client,
final AdminExternalClient adminExternalClient,
final DipRequest dipRequest)
throws InvalidParseOperationException {
DIPStep step = dipRequest.getStep();
logger.debug("Step is {} from {}", step, dipRequest);
switch (step) {
case STARTUP:
// Ignore: request not ready for the manager
break;
case RETRY_SELECT:
// restart from Select
logger.info("Start from Select: {}", dipRequest);
select(dipRequestFactory, dipRequest, client);
break;
case RETRY_DIP:
// restart from DIP
logger.info("From DIP: {}", dipRequest);
getDip(dipRequestFactory, dipRequest, client, adminExternalClient,
dipRequest.getVitamContext());
break;
case RETRY_DIP_FORWARD:
// Write back the content of the DIP through Waarp
logger.info("From DIP_FORWARD: {}", dipRequest);
File targetFile = dipRequest.getDipFile(dipRequestFactory);
sendDipFile(dipRequestFactory, dipRequest, targetFile);
break;
case ERROR:
logger.info("From Error: {}", dipRequest);
sendErrorBack(dipRequestFactory, dipRequest);
break;
case END:
// To be deleted
logger.info("End of DIP: {}", dipRequest);
toDelete(dipRequestFactory, dipRequest);
break;
default:
throw new IllegalStateException("Unexpected value: " + step);
}
DIPStep newStep = dipRequest.getStep();
return newStep != DIPStep.END && newStep != step;
}
/**
* Try to launch first step of DIP (step 1)
*
* @param dipRequestFactory
* @param dipRequest
* @param client
*
* @return 0 if OK, 1 if Warning, 2 if error
*/
int select(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest, final AccessExternalClient client) {
try {
// Inform Vitam of an Ingest to proceed locally
dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
VitamContext vitamContext = dipRequest.getVitamContext();
JsonNode jsonNode = dipRequest.getSelectJson();
RequestResponse requestResponse =
client.exportDIP(vitamContext, jsonNode);
if (!requestResponse.isOk()) {
String requestIdNew =
requestResponse.getHeaderString(GlobalDataRest.X_REQUEST_ID);
if (requestIdNew == null || requestIdNew.isEmpty()) {
requestIdNew = "FAKE_REQUEST_ID";
}
dipRequest.setRequestId(requestIdNew);
Status status = Status.fromStatusCode(requestResponse.getStatus());
switch (status) {
case SERVICE_UNAVAILABLE:
// Should retry later on
logger.warn(ERROR_MESSAGE, "Issue since service unavailable",
requestResponse);
dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
// Next step is RETRY_SELECT
return 1;
default:
// Very Bad: inform back of error
logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
requestResponse);
dipRequest.setStep(DIPStep.ERROR, status.getStatusCode(),
dipRequestFactory);
// Will inform back of error which could not be fixed when reloaded
}
// Next step is ERROR
return 2;
}
// Select sent and accepted
RequestResponseOK responseOK = (RequestResponseOK) requestResponse;
dipRequest.setFromRequestResponse(responseOK);
// Now will start DIP pooling
dipRequest
.setStep(DIPStep.RETRY_DIP, DIPStep.RETRY_DIP.getStatusMonitor(),
dipRequestFactory);
return 0;
} catch (InvalidParseOperationException e) {
logger.error("FATAL: Issue since backup of request produces an error", e);
} catch (VitamClientException e) {
logger.error(ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR, e);
// Should retry select from the beginning
try {
// FIXME this does not take into account various cases since Vitam masks the real reason
dipRequest.setStep(DIPStep.ERROR, 500, dipRequestFactory);
// Will inform back of error which could not be fixed when reloaded
// Ignore: dipRequest.setStep(DIPStep.RETRY_SELECT, 0, dipRequestFactory);
} catch (InvalidParseOperationException ex) {
// very bad
logger.error("FATAL: Very bad since cannot save DipRequest", ex);
}
}
return 2;
}
/**
* Get the DIP (step 2)
*
* @param dipRequestFactory
* @param dipRequest
* @param client
* @param adminExternalClient
* @param vitamContext
*
* @return True if OK
*
* @throws InvalidParseOperationException
*/
boolean getDip(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest, final AccessExternalClient client,
final AdminExternalClient adminExternalClient,
final VitamContext vitamContext)
throws InvalidParseOperationException {
Response response = null;
try {
dipRequest.setStep(DIPStep.RETRY_DIP, 0, dipRequestFactory);
OperationCheck operationCheck = new OperationCheck(adminExternalClient);
if (operationCheck.checkAvailabilityAtr(dipRequest.getTenantId(),
dipRequest.getRequestId())) {
response = client.getDIPById(vitamContext, dipRequest.getRequestId());
Status status = Status.fromStatusCode(response.getStatus());
switch (status) {
case OK:
case ACCEPTED:
sendDIP(dipRequestFactory, dipRequest, response);
return true;
case SERVICE_UNAVAILABLE:
case NOT_FOUND:
// Should retry later on
logger.debug("Service or DIP unavailable yet\n\t{}",
status.getReasonPhrase());
return false;
default:
// Very Bad: inform back of error
logger.error(ERROR_MESSAGE, ISSUE_SINCE_SELECT_PRODUCES_AN_ERROR,
status.getReasonPhrase());
dipRequest.setStep(DIPStep.ERROR, response.getStatus(),
dipRequestFactory);
}
}
} catch (VitamClientException e) {
logger.warn("Issue since access client produces an error", e);
} finally {
// Shall read all InputStream
StreamUtils.consumeAnyEntityAndClose(response);
}
return false;
}
/**
* Send the DIP back to the Waarp Partner, directly from step 2 (DIP
* retrieve) (step 3)
*
* @param dipRequestFactory
* @param dipRequest
* @param response
*
* @throws InvalidParseOperationException
*/
private void sendDIP(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest, final Response response)
throws InvalidParseOperationException {
try (final InputStream inputStream = response
.readEntity(InputStream.class)) {
// Write file to be forwarded
File targetFile = dipRequest.getDipFile(dipRequestFactory);
Path target = targetFile.toPath();
Files.copy(inputStream, target, REPLACE_EXISTING);
// Write back the content of the DIP through Waarp
sendDipFile(dipRequestFactory, dipRequest, targetFile);
} catch (IOException e) {
logger
.error("File must be writable or InputStream error during close", e);
dipRequest
.setStep(DIPStep.ERROR, Status.INTERNAL_SERVER_ERROR.getStatusCode(),
dipRequestFactory);
}
}
/**
* Step to send DIP before finished (step 3)
*
* @param dipRequestFactory
* @param dipRequest
* @param targetFile
*
* @throws InvalidParseOperationException
*/
private void sendDipFile(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest, final File targetFile)
throws InvalidParseOperationException {
dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
if (!dipRequestFactory.getManagerToWaarp(dipRequest)
.sendBackInformation(dipRequestFactory, dipRequest,
targetFile.getAbsolutePath(),
DIP)) {
// ATR already there but not sent, so retry
dipRequest.setStep(DIPStep.RETRY_DIP_FORWARD, 0, dipRequestFactory);
} else {
toDelete(dipRequestFactory, dipRequest);
}
}
/**
* Finalize DipRequest, whatever Done or in Error (final step 4 in case
* of Done)
*
* @param dipRequestFactory
* @param dipRequest
*
* @throws InvalidParseOperationException
*/
private void toDelete(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest)
throws InvalidParseOperationException {
// Ensure it will not be reloaded
dipRequest.setStep(DIPStep.END, 0, dipRequestFactory);
if (!dipRequestFactory.removeDipRequest(dipRequest)) {
logger.error("Issue while cleaning this DipRequest: {}", dipRequest);
} else {
logger.info("End of DipRequest: {}", dipRequest);
}
}
/**
* If in Error, will send back the status of the operation to the Waarp
* Partner before ending.
*
* @param dipRequestFactory
* @param dipRequest
*
* @throws InvalidParseOperationException
*/
private void sendErrorBack(final DipRequestFactory dipRequestFactory,
final DipRequest dipRequest)
throws InvalidParseOperationException {
logger.warn("Error to feedback since status not ok to restart: {}",
dipRequest);
// Feedback through Waarp
File file = dipRequest.getErrorFile(dipRequestFactory);
if (!file.canRead()) {
// Create a pseudo one
Status status = Status.fromStatusCode(dipRequest.getStatus());
VitamError error = getErrorEntity(status, status.getReasonPhrase(),
"Internal error while processing the DIP");
String err = error.toString();
try {
FileUtils.write(file, err, StandardCharsets.UTF_8);
} catch (IOException e) {
// very bad
logger.error("Very bad since cannot save pseudo DIP", e);
return;
}
}
if (dipRequestFactory.getManagerToWaarp(dipRequest)
.sendBackInformation(dipRequestFactory, dipRequest,
file.getAbsolutePath(),
DIP_FAILED)) {
// Very end of this IngestRequest
toDelete(dipRequestFactory, dipRequest);
}
// else Since not sent, will retry later on: keep as is
}
private VitamError getErrorEntity(Status status, String msg,
String description) {
return new VitamError(status.name()).setHttpCode(status.getStatusCode())
.setContext("access")
.setState("code_vitam").setMessage(msg)
.setDescription(description);
}
}