1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.waarp.openr66.elasticsearch;
22
23 import co.elastic.clients.elasticsearch.ElasticsearchClient;
24 import co.elastic.clients.elasticsearch.core.BulkRequest;
25 import co.elastic.clients.elasticsearch.core.BulkResponse;
26 import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
27 import co.elastic.clients.elasticsearch.core.bulk.BulkOperation.Builder;
28 import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
29 import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
30 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
31 import co.elastic.clients.transport.ElasticsearchTransport;
32 import co.elastic.clients.transport.rest_client.RestClientTransport;
33 import com.fasterxml.jackson.databind.JsonNode;
34 import com.fasterxml.jackson.databind.node.ArrayNode;
35 import com.fasterxml.jackson.databind.node.ObjectNode;
36 import org.apache.http.Header;
37 import org.apache.http.HttpHost;
38 import org.apache.http.auth.AuthScope;
39 import org.apache.http.auth.UsernamePasswordCredentials;
40 import org.apache.http.client.CredentialsProvider;
41 import org.apache.http.impl.client.BasicCredentialsProvider;
42 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43 import org.apache.http.message.BasicHeader;
44 import org.apache.http.ssl.SSLContextBuilder;
45 import org.apache.http.ssl.SSLContexts;
46 import org.elasticsearch.client.RestClient;
47 import org.elasticsearch.client.RestClientBuilder;
48 import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
49 import org.joda.time.DateTime;
50 import org.waarp.common.logging.SysErrLogger;
51 import org.waarp.common.logging.WaarpLogger;
52 import org.waarp.common.logging.WaarpLoggerFactory;
53 import org.waarp.common.utility.ParametersChecker;
54 import org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClient;
55 import org.waarp.openr66.protocol.networkhandler.ssl.NetworkSslServerInitializer;
56
57 import javax.net.ssl.SSLContext;
58 import java.io.IOException;
59 import java.security.KeyManagementException;
60 import java.security.KeyStoreException;
61 import java.security.NoSuchAlgorithmException;
62 import java.security.UnrecoverableKeyException;
63 import java.util.ArrayList;
64 import java.util.Iterator;
65 import java.util.List;
66
67 import static org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClientBuilder.*;
68 import static org.waarp.openr66.protocol.monitoring.MonitorExporterTransfers.*;
69
70
71
72
73 public class ElasticsearchMonitoringExporterClientImpl
74 implements ElasticsearchMonitoringExporterClient {
75 private static final WaarpLogger logger = WaarpLoggerFactory.getLogger(
76 ElasticsearchMonitoringExporterClientImpl.class);
77
78 protected final String index;
79 protected final RestClientBuilder builder;
80
81 protected ElasticsearchTransport transport = null;
82 protected ElasticsearchClient client = null;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 public ElasticsearchMonitoringExporterClientImpl(final String username,
104 final String pwd,
105 final String token,
106 final String apiKey,
107 final String prefix,
108 final String index,
109 final boolean compression,
110 final HttpHost... httpHosts) {
111 this.index = index;
112 builder = RestClient.builder(httpHosts).setCompressionEnabled(compression);
113 if (ParametersChecker.isNotEmpty(prefix)) {
114 builder.setPathPrefix(prefix);
115 }
116 int headerLen = 1;
117 if (ParametersChecker.isNotEmpty(apiKey, token)) {
118 headerLen = 2;
119 }
120 final Header[] defaultHeaders = new Header[headerLen];
121 headerLen = 0;
122 if (ParametersChecker.isNotEmpty(token)) {
123 defaultHeaders[headerLen] =
124 new BasicHeader("Authorization", "Bearer " + token);
125 headerLen++;
126 } else if (ParametersChecker.isNotEmpty(apiKey)) {
127 defaultHeaders[headerLen] =
128 new BasicHeader("Authorization", "ApiKey " + apiKey);
129 headerLen++;
130 }
131 if (headerLen > 0) {
132 builder.setDefaultHeaders(defaultHeaders);
133 }
134 boolean tls = false;
135 for (final HttpHost httpHost : httpHosts) {
136 tls |= httpHost.getSchemeName().equalsIgnoreCase("https");
137 }
138 final SSLContext sslContext;
139 if (tls) {
140 try {
141 final SSLContextBuilder sslBuilder = SSLContexts.custom()
142 .loadKeyMaterial(
143 NetworkSslServerInitializer.getWaarpSecureKeyStore()
144 .getKeyStore(),
145 NetworkSslServerInitializer.getWaarpSecureKeyStore()
146 .getKeyStorePassword())
147 .loadTrustMaterial(
148 NetworkSslServerInitializer.getWaarpSecureKeyStore()
149 .getKeyTrustStore(),
150 null);
151 sslContext = sslBuilder.build();
152 } catch (final NoSuchAlgorithmException | KeyStoreException |
153 UnrecoverableKeyException | KeyManagementException e) {
154 logger.error(e.getMessage());
155 throw new IllegalArgumentException(e);
156 }
157 } else {
158 sslContext = null;
159 }
160 if (ParametersChecker.isNotEmpty(username, pwd)) {
161 final CredentialsProvider credentialsProvider =
162 new BasicCredentialsProvider();
163 credentialsProvider.setCredentials(AuthScope.ANY,
164 new UsernamePasswordCredentials(
165 username, pwd));
166 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
167 @Override
168 public HttpAsyncClientBuilder customizeHttpClient(
169 final HttpAsyncClientBuilder httpClientBuilder) {
170 if (sslContext != null) {
171 return httpClientBuilder.setDefaultCredentialsProvider(
172 credentialsProvider).setSSLContext(sslContext);
173 } else {
174 return httpClientBuilder.setDefaultCredentialsProvider(
175 credentialsProvider);
176 }
177 }
178 });
179 }
180 logger.info("Elasticsearch client: user {} pwd {} token {} apikey {} " +
181 "prefix {} index {}", username, pwd, token, apiKey, prefix,
182 index);
183 createClient();
184 }
185
186 protected void createClient() {
187 if (client == null) {
188 final RestClient restClient = builder.build();
189 transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
190 client = new ElasticsearchClient(transport);
191 }
192 }
193
194 @Override
195 public final boolean post(final ObjectNode monitoredTransfers,
196 final DateTime start, final DateTime stop,
197 final String serverId) {
198 createClient();
199 final String finalIndex = index.replace(ELASTIC_WAARPHOST, serverId)
200 .replaceAll(ELASTIC_DATETIME,
201 stop.toString(FORMAT_DATETIME))
202 .replaceAll(ELASTIC_DATEHOUR,
203 stop.toString(FORMAT_DATEHOUR))
204 .replaceAll(ELASTIC_DATE,
205 stop.toString(FORMAT_DATE))
206 .replaceAll(ELASTIC_YEAR_MONTH,
207 stop.toString(FORMAT_YEAR_MONTH))
208 .replaceAll(ELASTIC_YEAR,
209 stop.toString(FORMAT_YEAR))
210 .toLowerCase();
211 logger.debug("Will post to {}", finalIndex);
212 final BulkRequest.Builder bulkRequestBuilder =
213 new BulkRequest.Builder().index(finalIndex);
214 final ArrayNode arrayNode = (ArrayNode) monitoredTransfers.get(RESULTS);
215 final Iterator<JsonNode> iterator = arrayNode.elements();
216 final List<BulkOperation> operations = new ArrayList<>();
217 while (iterator.hasNext()) {
218 final ObjectNode node = (ObjectNode) iterator.next();
219 final IndexOperation.Builder<ObjectNode> indexBuilder =
220 new IndexOperation.Builder();
221 indexBuilder.index(finalIndex);
222 indexBuilder.id(node.get(UNIQUE_ID).asText());
223 indexBuilder.document(node);
224 operations.add(new Builder().index(indexBuilder.build()).build());
225 }
226 final BulkResponse bulkResponse;
227 try {
228 bulkResponse =
229 client.bulk(bulkRequestBuilder.operations(operations).build());
230 } catch (final IOException e) {
231 logger.error(e.getMessage());
232 return false;
233 }
234 logger.debug("ES failure? {} {}", bulkResponse.errors());
235 if (logger.isDebugEnabled() && bulkResponse.errors()) {
236 final List<BulkResponseItem> list = bulkResponse.items();
237 for (BulkResponseItem item : list) {
238 assert item.error() != null;
239 logger.debug("ES item: {}", item.error().reason());
240 }
241 }
242 return !bulkResponse.errors();
243 }
244
245 @Override
246 public final void close() {
247 try {
248 transport.close();
249 } catch (final IOException e) {
250 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
251 }
252 transport = null;
253 client = null;
254 }
255 }