View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.waarp.openr66.protocol.monitoring;
22  
23  import com.fasterxml.jackson.databind.JsonNode;
24  import com.fasterxml.jackson.databind.node.ArrayNode;
25  import com.fasterxml.jackson.databind.node.ObjectNode;
26  import io.netty.channel.EventLoopGroup;
27  import org.apache.http.HttpHost;
28  import org.joda.time.DateTime;
29  import org.waarp.common.database.exception.WaarpDatabaseException;
30  import org.waarp.common.exception.InvalidArgumentException;
31  import org.waarp.common.json.JsonHandler;
32  import org.waarp.common.logging.WaarpLogger;
33  import org.waarp.common.logging.WaarpLoggerFactory;
34  import org.waarp.common.utility.ParametersChecker;
35  import org.waarp.openr66.client.TransferArgs;
36  import org.waarp.openr66.dao.DAOFactory;
37  import org.waarp.openr66.dao.Filter;
38  import org.waarp.openr66.dao.TransferDAO;
39  import org.waarp.openr66.dao.exception.DAOConnectionException;
40  import org.waarp.openr66.database.data.DbHostConfiguration;
41  import org.waarp.openr66.database.data.DbTaskRunner;
42  import org.waarp.openr66.pojo.Transfer;
43  import org.waarp.openr66.protocol.configuration.Configuration;
44  import org.waarp.openr66.protocol.http.restv2.converters.TransferConverter;
45  
46  import javax.ws.rs.InternalServerErrorException;
47  import java.io.Closeable;
48  import java.io.IOException;
49  import java.net.URI;
50  import java.net.URISyntaxException;
51  import java.sql.Timestamp;
52  import java.util.ArrayList;
53  import java.util.List;
54  
55  import static org.waarp.openr66.dao.database.DBTransferDAO.*;
56  import static org.waarp.openr66.protocol.http.restv2.RestConstants.*;
57  
58  /**
59   * The Monitor exports Transfers into a Json Format to a remote API REST
60   * or an Elasticsearch server (if JRE >= 8)
61   * in order to allow to monitor multiple Waarp Servers from one central
62   * monitoring, such as using Elasticsearch with Kibana/Grafana through RESP
63   * API from Logstash engine or equivalent, or your own REST API server
64   * or a Elasticsearch server.<br>
65   * <br>
66   * Json format is:
67   * <pre>{@code
68   *  {
69   *    "results": [                            # Array of Transfer information
70   *      {
71   *        "specialId": 12345,                     # Id as Long (-2^63 to 2^63 - 1)
72   *        "uniqueId": "owner.requester.requested.specialId", # Unique global Id
73   *        "hostId": "R66Owner",                   # R66 Owner (Server name)
74   *        "globalStep": "step",                   # Global Current Step
75   *        "globalLastStep": "laststep",           # Global Last Step previous Current
76   *        "step": 1,                              # Current Step in Global Current Step
77   *        "rank": 123,                            # Current Rank in transfer step
78   *        "status": "status",                     # Current status
79   *        "stepStatus": "stepstatus",             # Status of previous Step
80   *        "originalFilename": "originalFilename", # Original Filename
81   *        "originalSize": 123456,                 # Original file size
82   *        "filename": "filename",                 # Resolved local filename
83   *        "ruleName": "ruleName",                 # Rule name
84   *        "blockSize": 123,                       # Block size during transfer
85   *        "fileInfo": "fileInfo",                 # File information, containing associated file transfer information
86   *        "followId": 123456,                     # Follow Id as Long (-2^63 to 2^63 - 1)
87   *        "transferInfo": "transferInfo as Json", # Transfer internal information as Json String
88   *        "start": "2021-03-28T11:55:15Z",        # Start date time of the transfer operation
89   *        "stop": "2021-03-28T11:58:32Z",         # Current last date time event of the transfer operation
90   *        "requested": "requested",               # Requested R66 hostname
91   *        "requester": "requester",               # Requester R66 hostname
92   *        "retrieve": true,                       # True if the request is a Pull, False if it is a Push
93   *        "errorCode": "errorCode",               # Code of error as one char
94   *        "errorMessage": "errorMessage",         # String message of current Error
95   *        "waarpMonitor": {                       # Extra information for indexing if necessary
96   *          "from": "2021-03-28T11:58:15Z",       # filter from (could be empty if none)
97   *          "to": "2021-03-28T11:59:15Z",         # filter to
98   *          "index": "r66owner"                   # R66 Hostname lowercase
99   *        }
100  *      },
101  *      ...
102  *    ]
103  *  }
104  * }</pre>
105  * And the header of the HTTP request will contain:<br>
106  * X-WAARP-ID (as the host Id), X-WAARP-START (as the waarpMonitor.from),
107  * X-WAARP-STOP  (as the waarpMonitor.to)
108  */
109 public class MonitorExporterTransfers extends Thread implements Closeable {
110   private static final WaarpLogger logger =
111       WaarpLoggerFactory.getLogger(MonitorExporterTransfers.class);
112 
113   public static final boolean MONITOR_KEEP_CONNECTION_DEFAULT = true;
114   public static final boolean MONITOR_INTERVAL_INCLUDED_DEFAULT = true;
115   public static final boolean MONITOR_LONG_AS_STRING_DEFAULT = false;
116 
117   public static final String HEADER_WAARP_ID = "X-WAARP-ID";
118   public static final String HEADER_WAARP_START = "X-WAARP-START";
119   public static final String HEADER_WAARP_STOP = "X-WAARP-STOP";
120 
121   public static final String SPECIAL_ID = "specialId";
122   public static final String FOLLOW_ID = "followId";
123   public static final String UNIQUE_ID = "uniqueId";
124   public static final String HOST_ID = "hostId";
125   public static final String ORIGINAL_SIZE = "originalSize";
126   public static final String RESULTS = "results";
127   public static final String WAARP_MONITOR = "waarpMonitor";
128   public static final String FROM_DATE_TIME = "from";
129   public static final String TO_DATE_TIME = "to";
130   public static final String INDEX_NAME = "index";
131 
132   private final boolean intervalMonitoringIncluded;
133   private final boolean transformLongAsString;
134   private final boolean asApiRest;
135   private final HttpMonitoringExporterClient httpMonitoringExporterClient;
136   private final ElasticsearchMonitoringExporterClient
137       elasticsearchMonitoringExporterClient;
138   private final DbHostConfiguration hostConfiguration;
139 
140   private DateTime lastDateTime;
141   private Timestamp lastTimestamp;
142 
143   /**
144    * Note that only one among (basicAuthent, token, apikey) is allowed and
145    * will be taken into account.
146    *
147    * @param remoteBaseUrl as 'http://myhost.com:8080' or 'https://myhost.com:8443'
148    * @param endpoint as '/waarpr66monitor' or simply '/'
149    * @param basicAuthent Basic Authent in Base64 format to connect to
150    *     REST API if any (Basic authentication from 'username:paswwd')
151    *     (nullable)
152    * @param token access token (Bearer Token authorization
153    *     by Header) (nullable)
154    * @param apiKey API Key (Base64 of 'apiId:apiKey') (ApiKey authorization
155    *     by Header) (nullable)
156    * @param keepConnection True to keep the connexion opened, False to release the connexion each time
157    * @param intervalMonitoringIncluded True to include the interval information within 'waarpMonitor' field
158    * @param transformLongAsString True to transform Long as String (ELK)
159    * @param group the EventLoopGroup to use for HttpMonitoringExporterClient
160    *
161    * @throws IllegalArgumentException if the setup is in error
162    */
163   public MonitorExporterTransfers(final String remoteBaseUrl,
164                                   final String endpoint,
165                                   final String basicAuthent, final String token,
166                                   final String apiKey,
167                                   final boolean keepConnection,
168                                   final boolean intervalMonitoringIncluded,
169                                   final boolean transformLongAsString,
170                                   final EventLoopGroup group) {
171     try {
172       ParametersChecker.checkSanityString(remoteBaseUrl, endpoint);
173     } catch (final InvalidArgumentException e) {
174       throw new IllegalArgumentException(e);
175     }
176     if (ParametersChecker.isEmpty(remoteBaseUrl)) {
177       throw new IllegalArgumentException("RemoteBaseUrl cannot be null");
178     }
179     this.intervalMonitoringIncluded = intervalMonitoringIncluded;
180     this.transformLongAsString = transformLongAsString;
181     this.asApiRest = true;
182     this.elasticsearchMonitoringExporterClient = null;
183     this.httpMonitoringExporterClient =
184         new HttpMonitoringExporterClient(remoteBaseUrl, basicAuthent, token,
185                                          apiKey, endpoint, keepConnection,
186                                          group);
187     DbHostConfiguration temp = null;
188     try {
189       temp = new DbHostConfiguration(Configuration.configuration.getHostId());
190     } catch (final WaarpDatabaseException e) {//NOSONAR
191       logger.error(e.getMessage());
192     }
193     if (temp == null) {
194       DbHostConfiguration.getLastDateTimeMonitoring(
195           Configuration.configuration.getHostId());
196       try {
197         temp = new DbHostConfiguration(Configuration.configuration.getHostId());
198       } catch (final WaarpDatabaseException e) {//NOSONAR
199         logger.error(e.getMessage());
200       }
201     }
202     this.hostConfiguration = temp;
203     lastDateTime = hostConfiguration.getLastDateTimeMonitoring();//NOSONAR
204     if (lastDateTime != null) {
205       lastTimestamp = new Timestamp(lastDateTime.getMillis());
206     }
207   }
208 
209   /**
210    * The index can be a combination of a fixed name and extra dynamic
211    * information:<br>
212    * <ul>
213    *   <li>%%WAARPHOST%% to be replaced by R66 host name</li>
214    *   <li>%%DATETIME%% to be replaced by date in format YYYY.MM.dd.HH.mm</li>
215    *   <li>%%DATEHOUR%% to be replaced by date in format YYYY.MM.dd.HH</li>
216    *   <li>%%DATE%% to be replaced by date in format YYYY.MM.dd</li>
217    *   <li>%%YEARMONTH%% to be replaced by date in format YYYY.MM</li>
218    *   <li>%%YEAR%% to be replaced by date in format YYYY</li>
219    * </ul>
220    * <br>DATE is about current last-time check.<br>
221    * So 'waarpr66-%%WAARPHOST%%-%%DATE%%' will give for instance
222    * 'waarpr66-hosta-2021-06-21' as index name.<br>
223    * Note that only one among (username/pwd, token, apikey) is allowed and
224    * will be taken into account.
225    *
226    * @param remoteBaseUrl as 'http://myelastic.com:9200' or 'https://myelastic.com:9201'
227    * @param prefix as '/prefix' or null if none
228    * @param index as 'waarpr66monitor' as the index name within
229    *     Elasticsearch, including extra dynamic information
230    * @param username username to connect to Elasticsearch if any (Basic
231    *     authentication) (nullable)
232    * @param pwd password to connect to Elasticsearch if any (Basic
233    *     authentication) (nullable)
234    * @param token access token (Bearer Token authorization
235    *     by Header) (nullable)
236    * @param apiKey API Key (Base64 of 'apiId:apiKey') (ApiKey authorization
237    *     by Header) (nullable)
238    * @param intervalMonitoringIncluded True to include the interval information within 'waarpMonitor' field
239    * @param transformLongAsString True to transform Long as String (ELK)
240    * @param compression True to compress REST exchanges between the client
241    *     and the Elasticsearch server
242    *
243    * @throws IllegalArgumentException if the setup is in error
244    */
245   public MonitorExporterTransfers(final String remoteBaseUrl,
246                                   final String prefix, final String index,
247                                   final String username, final String pwd,
248                                   final String token, final String apiKey,
249                                   final boolean intervalMonitoringIncluded,
250                                   final boolean transformLongAsString,
251                                   final boolean compression) {
252     try {
253       ParametersChecker.checkSanityString(remoteBaseUrl, index);
254     } catch (final InvalidArgumentException e) {
255       throw new IllegalArgumentException(e);
256     }
257     if (ParametersChecker.isEmpty(remoteBaseUrl, index)) {
258       throw new IllegalArgumentException(
259           "RemoteBaseUrl or Index cannot be null");
260     }
261     this.intervalMonitoringIncluded = intervalMonitoringIncluded;
262     this.transformLongAsString = transformLongAsString;
263     this.asApiRest = false;
264     this.httpMonitoringExporterClient = null;
265     final String[] urls = remoteBaseUrl.split(",");
266     final ArrayList<HttpHost> httpHostArray =
267         new ArrayList<HttpHost>(urls.length);
268     for (final String url : urls) {
269       try {
270         final URI finalUri = new URI(url);
271         final String host =
272             finalUri.getHost() == null? "127.0.0.1" : finalUri.getHost();
273         final int port = finalUri.getPort();
274         final String scheme =
275             finalUri.getScheme() == null? "http" : finalUri.getScheme();
276         logger.info("Elasticsearch from {} Host: {} on port {} using {}", url,
277                     host, port, scheme);
278         httpHostArray.add(new HttpHost(host, port, scheme));
279       } catch (final URISyntaxException e) {
280         logger.error("URI syntax error: {}", e.getMessage());
281         throw new IllegalArgumentException(e);
282       }
283     }
284     this.elasticsearchMonitoringExporterClient =
285         ElasticsearchMonitoringExporterClientBuilder.getFactory()
286                                                     .createElasticsearchClient(
287                                                         username, pwd, token,
288                                                         apiKey, prefix, index,
289                                                         compression,
290                                                         httpHostArray.toArray(
291                                                             new HttpHost[0]));
292     DbHostConfiguration temp = null;
293     try {
294       temp = new DbHostConfiguration(Configuration.configuration.getHostId());
295     } catch (final WaarpDatabaseException e) {//NOSONAR
296       logger.error(e.getMessage());
297     }
298     if (temp == null) {
299       DbHostConfiguration.getLastDateTimeMonitoring(
300           Configuration.configuration.getHostId());
301       try {
302         temp = new DbHostConfiguration(Configuration.configuration.getHostId());
303       } catch (final WaarpDatabaseException e) {//NOSONAR
304         logger.error(e.getMessage());
305       }
306     }
307     this.hostConfiguration = temp;
308     lastDateTime = hostConfiguration.getLastDateTimeMonitoring();//NOSONAR
309     if (lastDateTime != null) {
310       lastTimestamp = new Timestamp(lastDateTime.getMillis());
311     }
312   }
313 
314   @Override
315   public void run() {
316     final DateTime now = new DateTime();
317     final Timestamp timestamp = new Timestamp(now.getMillis());
318     logger.info("Start from {} to {}", lastDateTime, now);
319     final TransferConverter.Order order = TransferConverter.Order.ascId;
320     final List<Filter> filters = new ArrayList<Filter>(2);
321     filters.add(DbTaskRunner.getOwnerFilter());
322     if (lastTimestamp != null) {
323       filters.add(new Filter(TRANSFER_STOP_FIELD, Filter.BETWEEN, lastTimestamp,
324                              timestamp));
325     } else {
326       filters.add(new Filter(TRANSFER_STOP_FIELD, "<=", timestamp));
327     }
328     TransferDAO transferDAO = null;
329     List<Transfer> transferList;
330     try {
331       transferDAO = DAO_FACTORY.getTransferDAO();
332       transferList = transferDAO.find(filters, order.column, order.ascend);
333       logger.debug("Get List {}", transferList.size());
334     } catch (final DAOConnectionException e) {
335       logger.error(e.getMessage());
336       throw new InternalServerErrorException(e);
337     } finally {
338       DAOFactory.closeDAO(transferDAO);
339     }
340     if (transferList.isEmpty()) {
341       logger.info("No Transfer from {} to {}", lastDateTime, now);
342       lastDateTime = now;
343       lastTimestamp = timestamp;
344       hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
345       return;
346     }
347     logger.debug("Create Json");
348 
349     final ObjectNode monitoredTransfers = JsonHandler.createObjectNode();
350     final ArrayNode resultList = monitoredTransfers.putArray(RESULTS);
351     final String owner = Configuration.configuration.getHostId();
352     for (final Transfer transfer : transferList) {
353       final ObjectNode item = TransferConverter.transferToNode(transfer);
354       final long specialId = item.get(TransferFields.TRANSFER_ID).asLong();
355       final String transferInfo =
356           item.get(TransferFields.TRANSFER_INFO).asText();
357       final ObjectNode root = JsonHandler.getFromString(transferInfo);
358       long followId = Long.MIN_VALUE;
359       long originalSize = -1;
360       if (root != null) {
361         JsonNode node = root.get(TransferArgs.FOLLOW_JSON_KEY);
362         if (node != null) {
363           followId = node.asLong();
364         }
365         node = root.get(DbTaskRunner.JSON_ORIGINALSIZE);
366         if (node != null) {
367           originalSize = node.asLong();
368         }
369       }
370       if (transformLongAsString) {
371         item.put(SPECIAL_ID, Long.toString(specialId));
372         item.put(FOLLOW_ID, Long.toString(followId));
373         item.put(ORIGINAL_SIZE, Long.toString(originalSize));
374       } else {
375         item.put(SPECIAL_ID, specialId);
376         item.put(FOLLOW_ID, followId);
377         item.put(ORIGINAL_SIZE, originalSize);
378       }
379       final String uniqueId =
380           owner + "." + item.get(TransferFields.REQUESTER).asText() + "." +
381           item.get(TransferFields.REQUESTED).asText() + "." + specialId;
382       item.put(UNIQUE_ID, uniqueId);
383       item.put(HOST_ID, owner);
384       item.remove(TransferFields.TRANSFER_ID);
385       if (intervalMonitoringIncluded) {
386         final ObjectNode waarpMonitor = item.putObject(WAARP_MONITOR);
387         waarpMonitor.put(FROM_DATE_TIME,
388                          lastDateTime != null? lastDateTime.toString() : "");
389         waarpMonitor.put(TO_DATE_TIME, now.toString());
390         waarpMonitor.put(INDEX_NAME, owner.toLowerCase());
391       }
392       resultList.add(item);
393     }
394     final int size = resultList.size();
395     logger.debug("Create Json {}", size);
396     transferList.clear();
397     if (asApiRest) {
398       if (httpMonitoringExporterClient.post(monitoredTransfers, lastDateTime,
399                                             now,
400                                             Configuration.configuration.getHostId())) {
401         logger.info("Transferred from {} to {} = {}", lastDateTime, now, size);
402         lastDateTime = now;
403         lastTimestamp = timestamp;
404         hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
405       } else {
406         logger.error("Not Transferred from {} to {} = {}", lastDateTime, now,
407                      size);
408       }
409     } else if (elasticsearchMonitoringExporterClient.post(monitoredTransfers,
410                                                           lastDateTime, now,
411                                                           Configuration.configuration.getHostId())) {
412       logger.info("ES Transferred from {} to {} = {}", lastDateTime, now, size);
413       lastDateTime = now;
414       lastTimestamp = timestamp;
415       hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
416     } else {
417       logger.error("ES Not Transferred from {} to {} = {}", lastDateTime, now,
418                    size);
419     }
420   }
421 
422   @Override
423   public void close() throws IOException {
424     if (httpMonitoringExporterClient != null) {
425       httpMonitoringExporterClient.close();
426     }
427     if (elasticsearchMonitoringExporterClient != null) {
428       elasticsearchMonitoringExporterClient.close();
429     }
430   }
431 }