1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.waarp.openr66.protocol.monitoring;
22
23 import com.fasterxml.jackson.databind.JsonNode;
24 import com.fasterxml.jackson.databind.node.ArrayNode;
25 import com.fasterxml.jackson.databind.node.ObjectNode;
26 import io.netty.channel.EventLoopGroup;
27 import org.apache.http.HttpHost;
28 import org.joda.time.DateTime;
29 import org.waarp.common.database.exception.WaarpDatabaseException;
30 import org.waarp.common.exception.InvalidArgumentException;
31 import org.waarp.common.json.JsonHandler;
32 import org.waarp.common.logging.WaarpLogger;
33 import org.waarp.common.logging.WaarpLoggerFactory;
34 import org.waarp.common.utility.ParametersChecker;
35 import org.waarp.openr66.client.TransferArgs;
36 import org.waarp.openr66.dao.DAOFactory;
37 import org.waarp.openr66.dao.Filter;
38 import org.waarp.openr66.dao.TransferDAO;
39 import org.waarp.openr66.dao.exception.DAOConnectionException;
40 import org.waarp.openr66.database.data.DbHostConfiguration;
41 import org.waarp.openr66.database.data.DbTaskRunner;
42 import org.waarp.openr66.pojo.Transfer;
43 import org.waarp.openr66.protocol.configuration.Configuration;
44 import org.waarp.openr66.protocol.http.restv2.converters.TransferConverter;
45
46 import javax.ws.rs.InternalServerErrorException;
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.URI;
50 import java.net.URISyntaxException;
51 import java.sql.Timestamp;
52 import java.util.ArrayList;
53 import java.util.List;
54
55 import static org.waarp.openr66.dao.database.DBTransferDAO.*;
56 import static org.waarp.openr66.protocol.http.restv2.RestConstants.*;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public class MonitorExporterTransfers extends Thread implements Closeable {
110 private static final WaarpLogger logger =
111 WaarpLoggerFactory.getLogger(MonitorExporterTransfers.class);
112
113 public static final boolean MONITOR_KEEP_CONNECTION_DEFAULT = true;
114 public static final boolean MONITOR_INTERVAL_INCLUDED_DEFAULT = true;
115 public static final boolean MONITOR_LONG_AS_STRING_DEFAULT = false;
116
117 public static final String HEADER_WAARP_ID = "X-WAARP-ID";
118 public static final String HEADER_WAARP_START = "X-WAARP-START";
119 public static final String HEADER_WAARP_STOP = "X-WAARP-STOP";
120
121 public static final String SPECIAL_ID = "specialId";
122 public static final String FOLLOW_ID = "followId";
123 public static final String UNIQUE_ID = "uniqueId";
124 public static final String HOST_ID = "hostId";
125 public static final String ORIGINAL_SIZE = "originalSize";
126 public static final String RESULTS = "results";
127 public static final String WAARP_MONITOR = "waarpMonitor";
128 public static final String FROM_DATE_TIME = "from";
129 public static final String TO_DATE_TIME = "to";
130 public static final String INDEX_NAME = "index";
131
132 private final boolean intervalMonitoringIncluded;
133 private final boolean transformLongAsString;
134 private final boolean asApiRest;
135 private final HttpMonitoringExporterClient httpMonitoringExporterClient;
136 private final ElasticsearchMonitoringExporterClient
137 elasticsearchMonitoringExporterClient;
138 private final DbHostConfiguration hostConfiguration;
139
140 private DateTime lastDateTime;
141 private Timestamp lastTimestamp;
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 public MonitorExporterTransfers(final String remoteBaseUrl,
164 final String endpoint,
165 final String basicAuthent, final String token,
166 final String apiKey,
167 final boolean keepConnection,
168 final boolean intervalMonitoringIncluded,
169 final boolean transformLongAsString,
170 final EventLoopGroup group) {
171 try {
172 ParametersChecker.checkSanityString(remoteBaseUrl, endpoint);
173 } catch (final InvalidArgumentException e) {
174 throw new IllegalArgumentException(e);
175 }
176 if (ParametersChecker.isEmpty(remoteBaseUrl)) {
177 throw new IllegalArgumentException("RemoteBaseUrl cannot be null");
178 }
179 this.intervalMonitoringIncluded = intervalMonitoringIncluded;
180 this.transformLongAsString = transformLongAsString;
181 this.asApiRest = true;
182 this.elasticsearchMonitoringExporterClient = null;
183 this.httpMonitoringExporterClient =
184 new HttpMonitoringExporterClient(remoteBaseUrl, basicAuthent, token,
185 apiKey, endpoint, keepConnection,
186 group);
187 DbHostConfiguration temp = null;
188 try {
189 temp = new DbHostConfiguration(Configuration.configuration.getHostId());
190 } catch (final WaarpDatabaseException e) {
191 logger.error(e.getMessage());
192 }
193 if (temp == null) {
194 DbHostConfiguration.getLastDateTimeMonitoring(
195 Configuration.configuration.getHostId());
196 try {
197 temp = new DbHostConfiguration(Configuration.configuration.getHostId());
198 } catch (final WaarpDatabaseException e) {
199 logger.error(e.getMessage());
200 }
201 }
202 this.hostConfiguration = temp;
203 lastDateTime = hostConfiguration.getLastDateTimeMonitoring();
204 if (lastDateTime != null) {
205 lastTimestamp = new Timestamp(lastDateTime.getMillis());
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 public MonitorExporterTransfers(final String remoteBaseUrl,
246 final String prefix, final String index,
247 final String username, final String pwd,
248 final String token, final String apiKey,
249 final boolean intervalMonitoringIncluded,
250 final boolean transformLongAsString,
251 final boolean compression) {
252 try {
253 ParametersChecker.checkSanityString(remoteBaseUrl, index);
254 } catch (final InvalidArgumentException e) {
255 throw new IllegalArgumentException(e);
256 }
257 if (ParametersChecker.isEmpty(remoteBaseUrl, index)) {
258 throw new IllegalArgumentException(
259 "RemoteBaseUrl or Index cannot be null");
260 }
261 this.intervalMonitoringIncluded = intervalMonitoringIncluded;
262 this.transformLongAsString = transformLongAsString;
263 this.asApiRest = false;
264 this.httpMonitoringExporterClient = null;
265 final String[] urls = remoteBaseUrl.split(",");
266 final ArrayList<HttpHost> httpHostArray =
267 new ArrayList<HttpHost>(urls.length);
268 for (final String url : urls) {
269 try {
270 final URI finalUri = new URI(url);
271 final String host =
272 finalUri.getHost() == null? "127.0.0.1" : finalUri.getHost();
273 final int port = finalUri.getPort();
274 final String scheme =
275 finalUri.getScheme() == null? "http" : finalUri.getScheme();
276 logger.info("Elasticsearch from {} Host: {} on port {} using {}", url,
277 host, port, scheme);
278 httpHostArray.add(new HttpHost(host, port, scheme));
279 } catch (final URISyntaxException e) {
280 logger.error("URI syntax error: {}", e.getMessage());
281 throw new IllegalArgumentException(e);
282 }
283 }
284 this.elasticsearchMonitoringExporterClient =
285 ElasticsearchMonitoringExporterClientBuilder.getFactory()
286 .createElasticsearchClient(
287 username, pwd, token,
288 apiKey, prefix, index,
289 compression,
290 httpHostArray.toArray(
291 new HttpHost[0]));
292 DbHostConfiguration temp = null;
293 try {
294 temp = new DbHostConfiguration(Configuration.configuration.getHostId());
295 } catch (final WaarpDatabaseException e) {
296 logger.error(e.getMessage());
297 }
298 if (temp == null) {
299 DbHostConfiguration.getLastDateTimeMonitoring(
300 Configuration.configuration.getHostId());
301 try {
302 temp = new DbHostConfiguration(Configuration.configuration.getHostId());
303 } catch (final WaarpDatabaseException e) {
304 logger.error(e.getMessage());
305 }
306 }
307 this.hostConfiguration = temp;
308 lastDateTime = hostConfiguration.getLastDateTimeMonitoring();
309 if (lastDateTime != null) {
310 lastTimestamp = new Timestamp(lastDateTime.getMillis());
311 }
312 }
313
314 @Override
315 public void run() {
316 final DateTime now = new DateTime();
317 final Timestamp timestamp = new Timestamp(now.getMillis());
318 logger.info("Start from {} to {}", lastDateTime, now);
319 final TransferConverter.Order order = TransferConverter.Order.ascId;
320 final List<Filter> filters = new ArrayList<Filter>(2);
321 filters.add(DbTaskRunner.getOwnerFilter());
322 if (lastTimestamp != null) {
323 filters.add(new Filter(TRANSFER_STOP_FIELD, Filter.BETWEEN, lastTimestamp,
324 timestamp));
325 } else {
326 filters.add(new Filter(TRANSFER_STOP_FIELD, "<=", timestamp));
327 }
328 TransferDAO transferDAO = null;
329 List<Transfer> transferList;
330 try {
331 transferDAO = DAO_FACTORY.getTransferDAO();
332 transferList = transferDAO.find(filters, order.column, order.ascend);
333 logger.debug("Get List {}", transferList.size());
334 } catch (final DAOConnectionException e) {
335 logger.error(e.getMessage());
336 throw new InternalServerErrorException(e);
337 } finally {
338 DAOFactory.closeDAO(transferDAO);
339 }
340 if (transferList.isEmpty()) {
341 logger.info("No Transfer from {} to {}", lastDateTime, now);
342 lastDateTime = now;
343 lastTimestamp = timestamp;
344 hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
345 return;
346 }
347 logger.debug("Create Json");
348
349 final ObjectNode monitoredTransfers = JsonHandler.createObjectNode();
350 final ArrayNode resultList = monitoredTransfers.putArray(RESULTS);
351 final String owner = Configuration.configuration.getHostId();
352 for (final Transfer transfer : transferList) {
353 final ObjectNode item = TransferConverter.transferToNode(transfer);
354 final long specialId = item.get(TransferFields.TRANSFER_ID).asLong();
355 final String transferInfo =
356 item.get(TransferFields.TRANSFER_INFO).asText();
357 final ObjectNode root = JsonHandler.getFromString(transferInfo);
358 long followId = Long.MIN_VALUE;
359 long originalSize = -1;
360 if (root != null) {
361 JsonNode node = root.get(TransferArgs.FOLLOW_JSON_KEY);
362 if (node != null) {
363 followId = node.asLong();
364 }
365 node = root.get(DbTaskRunner.JSON_ORIGINALSIZE);
366 if (node != null) {
367 originalSize = node.asLong();
368 }
369 }
370 if (transformLongAsString) {
371 item.put(SPECIAL_ID, Long.toString(specialId));
372 item.put(FOLLOW_ID, Long.toString(followId));
373 item.put(ORIGINAL_SIZE, Long.toString(originalSize));
374 } else {
375 item.put(SPECIAL_ID, specialId);
376 item.put(FOLLOW_ID, followId);
377 item.put(ORIGINAL_SIZE, originalSize);
378 }
379 final String uniqueId =
380 owner + "." + item.get(TransferFields.REQUESTER).asText() + "." +
381 item.get(TransferFields.REQUESTED).asText() + "." + specialId;
382 item.put(UNIQUE_ID, uniqueId);
383 item.put(HOST_ID, owner);
384 item.remove(TransferFields.TRANSFER_ID);
385 if (intervalMonitoringIncluded) {
386 final ObjectNode waarpMonitor = item.putObject(WAARP_MONITOR);
387 waarpMonitor.put(FROM_DATE_TIME,
388 lastDateTime != null? lastDateTime.toString() : "");
389 waarpMonitor.put(TO_DATE_TIME, now.toString());
390 waarpMonitor.put(INDEX_NAME, owner.toLowerCase());
391 }
392 resultList.add(item);
393 }
394 final int size = resultList.size();
395 logger.debug("Create Json {}", size);
396 transferList.clear();
397 if (asApiRest) {
398 if (httpMonitoringExporterClient.post(monitoredTransfers, lastDateTime,
399 now,
400 Configuration.configuration.getHostId())) {
401 logger.info("Transferred from {} to {} = {}", lastDateTime, now, size);
402 lastDateTime = now;
403 lastTimestamp = timestamp;
404 hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
405 } else {
406 logger.error("Not Transferred from {} to {} = {}", lastDateTime, now,
407 size);
408 }
409 } else if (elasticsearchMonitoringExporterClient.post(monitoredTransfers,
410 lastDateTime, now,
411 Configuration.configuration.getHostId())) {
412 logger.info("ES Transferred from {} to {} = {}", lastDateTime, now, size);
413 lastDateTime = now;
414 lastTimestamp = timestamp;
415 hostConfiguration.updateLastDateTimeMonitoring(lastDateTime);
416 } else {
417 logger.error("ES Not Transferred from {} to {} = {}", lastDateTime, now,
418 size);
419 }
420 }
421
422 @Override
423 public void close() throws IOException {
424 if (httpMonitoringExporterClient != null) {
425 httpMonitoringExporterClient.close();
426 }
427 if (elasticsearchMonitoringExporterClient != null) {
428 elasticsearchMonitoringExporterClient.close();
429 }
430 }
431 }