View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.waarp.openr66.elasticsearch;
22  
23  import co.elastic.clients.elasticsearch.ElasticsearchClient;
24  import co.elastic.clients.elasticsearch.core.BulkRequest;
25  import co.elastic.clients.elasticsearch.core.BulkResponse;
26  import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
27  import co.elastic.clients.elasticsearch.core.bulk.BulkOperation.Builder;
28  import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
29  import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
30  import co.elastic.clients.json.jackson.JacksonJsonpMapper;
31  import co.elastic.clients.transport.ElasticsearchTransport;
32  import co.elastic.clients.transport.rest_client.RestClientTransport;
33  import com.fasterxml.jackson.databind.JsonNode;
34  import com.fasterxml.jackson.databind.node.ArrayNode;
35  import com.fasterxml.jackson.databind.node.ObjectNode;
36  import org.apache.http.Header;
37  import org.apache.http.HttpHost;
38  import org.apache.http.auth.AuthScope;
39  import org.apache.http.auth.UsernamePasswordCredentials;
40  import org.apache.http.client.CredentialsProvider;
41  import org.apache.http.impl.client.BasicCredentialsProvider;
42  import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43  import org.apache.http.message.BasicHeader;
44  import org.apache.http.ssl.SSLContextBuilder;
45  import org.apache.http.ssl.SSLContexts;
46  import org.elasticsearch.client.RestClient;
47  import org.elasticsearch.client.RestClientBuilder;
48  import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
49  import org.joda.time.DateTime;
50  import org.waarp.common.logging.SysErrLogger;
51  import org.waarp.common.logging.WaarpLogger;
52  import org.waarp.common.logging.WaarpLoggerFactory;
53  import org.waarp.common.utility.ParametersChecker;
54  import org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClient;
55  import org.waarp.openr66.protocol.networkhandler.ssl.NetworkSslServerInitializer;
56  
57  import javax.net.ssl.SSLContext;
58  import java.io.IOException;
59  import java.security.KeyManagementException;
60  import java.security.KeyStoreException;
61  import java.security.NoSuchAlgorithmException;
62  import java.security.UnrecoverableKeyException;
63  import java.util.ArrayList;
64  import java.util.Iterator;
65  import java.util.List;
66  
67  import static org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClientBuilder.*;
68  import static org.waarp.openr66.protocol.monitoring.MonitorExporterTransfers.*;
69  
70  /**
71   * Elasticsearch client for Waarp
72   */
73  public class ElasticsearchMonitoringExporterClientImpl
74      implements ElasticsearchMonitoringExporterClient {
75    private static final WaarpLogger logger = WaarpLoggerFactory.getLogger(
76        ElasticsearchMonitoringExporterClientImpl.class);
77  
78    protected final String index;
79    protected final RestClientBuilder builder;
80  
81    protected ElasticsearchTransport transport = null;
82    protected ElasticsearchClient client = null;
83  
84    /**
85     * Note that only one among (username/pwd, token, apikey) is allowed and
86     * will be taken into account.
87     *
88     * @param username username to connect to Elasticsearch if any (Basic
89     *     authentication) (nullable)
90     * @param pwd password to connect to Elasticsearch if any (Basic
91     *     authentication) (nullable)
92     * @param token access token (Bearer Token authorization
93     *     by Header) (nullable)
94     * @param apiKey API Key (Base64 of 'apiId:apiKey') (ApiKey authorization
95     *     by Header) (nullable)
96     * @param prefix as '/prefix' or null if none
97     * @param index as 'waarpr66monitor' as the index name within
98     *     Elasticsearch, including extra dynamic information
99     * @param compression True to compress REST exchanges between the client
100    *     and the Elasticsearch server
101    * @param httpHosts array of HttpHost
102    */
103   public ElasticsearchMonitoringExporterClientImpl(final String username,
104                                                    final String pwd,
105                                                    final String token,
106                                                    final String apiKey,
107                                                    final String prefix,
108                                                    final String index,
109                                                    final boolean compression,
110                                                    final HttpHost... httpHosts) {
111     this.index = index;
112     builder = RestClient.builder(httpHosts).setCompressionEnabled(compression);
113     if (ParametersChecker.isNotEmpty(prefix)) {
114       builder.setPathPrefix(prefix);
115     }
116     int headerLen = 1;
117     if (ParametersChecker.isNotEmpty(apiKey, token)) {
118       headerLen = 2;
119     }
120     final Header[] defaultHeaders = new Header[headerLen];
121     headerLen = 0;
122     if (ParametersChecker.isNotEmpty(token)) {
123       defaultHeaders[headerLen] =
124           new BasicHeader("Authorization", "Bearer " + token);
125       headerLen++;
126     } else if (ParametersChecker.isNotEmpty(apiKey)) {
127       defaultHeaders[headerLen] =
128           new BasicHeader("Authorization", "ApiKey " + apiKey);
129       headerLen++;
130     }
131     if (headerLen > 0) {
132       builder.setDefaultHeaders(defaultHeaders);
133     }
134     boolean tls = false;
135     for (final HttpHost httpHost : httpHosts) {
136       tls |= httpHost.getSchemeName().equalsIgnoreCase("https");
137     }
138     final SSLContext sslContext;
139     if (tls) {
140       try {
141         final SSLContextBuilder sslBuilder = SSLContexts.custom()
142                                                         .loadKeyMaterial(
143                                                             NetworkSslServerInitializer.getWaarpSecureKeyStore()
144                                                                                        .getKeyStore(),
145                                                             NetworkSslServerInitializer.getWaarpSecureKeyStore()
146                                                                                        .getKeyStorePassword())
147                                                         .loadTrustMaterial(
148                                                             NetworkSslServerInitializer.getWaarpSecureKeyStore()
149                                                                                        .getKeyTrustStore(),
150                                                             null);
151         sslContext = sslBuilder.build();
152       } catch (final NoSuchAlgorithmException | KeyStoreException |
153                      UnrecoverableKeyException | KeyManagementException e) {
154         logger.error(e.getMessage());
155         throw new IllegalArgumentException(e);
156       }
157     } else {
158       sslContext = null;
159     }
160     if (ParametersChecker.isNotEmpty(username, pwd)) {
161       final CredentialsProvider credentialsProvider =
162           new BasicCredentialsProvider();
163       credentialsProvider.setCredentials(AuthScope.ANY,
164                                          new UsernamePasswordCredentials(
165                                              username, pwd));
166       builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
167         @Override
168         public HttpAsyncClientBuilder customizeHttpClient(
169             final HttpAsyncClientBuilder httpClientBuilder) {
170           if (sslContext != null) {
171             return httpClientBuilder.setDefaultCredentialsProvider(
172                 credentialsProvider).setSSLContext(sslContext);
173           } else {
174             return httpClientBuilder.setDefaultCredentialsProvider(
175                 credentialsProvider);
176           }
177         }
178       });
179     }
180     logger.info("Elasticsearch client: user {} pwd {} token {} apikey {} " +
181                 "prefix {} index {}", username, pwd, token, apiKey, prefix,
182                 index);
183     createClient();
184   }
185 
186   protected void createClient() {
187     if (client == null) {
188       final RestClient restClient = builder.build();
189       transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
190       client = new ElasticsearchClient(transport);
191     }
192   }
193 
194   @Override
195   public final boolean post(final ObjectNode monitoredTransfers,
196                             final DateTime start, final DateTime stop,
197                             final String serverId) {
198     createClient();
199     final String finalIndex = index.replace(ELASTIC_WAARPHOST, serverId)
200                                    .replaceAll(ELASTIC_DATETIME,
201                                                stop.toString(FORMAT_DATETIME))
202                                    .replaceAll(ELASTIC_DATEHOUR,
203                                                stop.toString(FORMAT_DATEHOUR))
204                                    .replaceAll(ELASTIC_DATE,
205                                                stop.toString(FORMAT_DATE))
206                                    .replaceAll(ELASTIC_YEAR_MONTH,
207                                                stop.toString(FORMAT_YEAR_MONTH))
208                                    .replaceAll(ELASTIC_YEAR,
209                                                stop.toString(FORMAT_YEAR))
210                                    .toLowerCase();
211     logger.debug("Will post to {}", finalIndex);
212     final BulkRequest.Builder bulkRequestBuilder =
213         new BulkRequest.Builder().index(finalIndex);
214     final ArrayNode arrayNode = (ArrayNode) monitoredTransfers.get(RESULTS);
215     final Iterator<JsonNode> iterator = arrayNode.elements();
216     final List<BulkOperation> operations = new ArrayList<>();
217     while (iterator.hasNext()) {
218       final ObjectNode node = (ObjectNode) iterator.next();
219       final IndexOperation.Builder<ObjectNode> indexBuilder =
220           new IndexOperation.Builder();
221       indexBuilder.index(finalIndex);
222       indexBuilder.id(node.get(UNIQUE_ID).asText());
223       indexBuilder.document(node);
224       operations.add(new Builder().index(indexBuilder.build()).build());
225     }
226     final BulkResponse bulkResponse;
227     try {
228       bulkResponse =
229           client.bulk(bulkRequestBuilder.operations(operations).build());
230     } catch (final IOException e) {
231       logger.error(e.getMessage());
232       return false;
233     }
234     logger.debug("ES failure? {} {}", bulkResponse.errors());
235     if (logger.isDebugEnabled() && bulkResponse.errors()) {
236       final List<BulkResponseItem> list = bulkResponse.items();
237       for (BulkResponseItem item : list) {
238         assert item.error() != null;
239         logger.debug("ES item: {}", item.error().reason());
240       }
241     }
242     return !bulkResponse.errors();
243   }
244 
245   @Override
246   public final void close() {
247     try {
248       transport.close();
249     } catch (final IOException e) {
250       SysErrLogger.FAKE_LOGGER.ignoreLog(e);
251     }
252     transport = null;
253     client = null;
254   }
255 }