ElasticsearchMonitoringExporterClientImpl.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */

package org.waarp.openr66.elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation.Builder;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.joda.time.DateTime;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.ParametersChecker;
import org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClient;
import org.waarp.openr66.protocol.networkhandler.ssl.NetworkSslServerInitializer;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.waarp.openr66.protocol.monitoring.ElasticsearchMonitoringExporterClientBuilder.*;
import static org.waarp.openr66.protocol.monitoring.MonitorExporterTransfers.*;

/**
 * Elasticsearch client for Waarp
 */
public class ElasticsearchMonitoringExporterClientImpl
    implements ElasticsearchMonitoringExporterClient {
  private static final WaarpLogger logger = WaarpLoggerFactory.getLogger(
      ElasticsearchMonitoringExporterClientImpl.class);

  protected final String index;
  protected final RestClientBuilder builder;

  protected ElasticsearchTransport transport = null;
  protected ElasticsearchClient client = null;

  /**
   * Note that only one among (username/pwd, token, apikey) is allowed and
   * will be taken into account.
   *
   * @param username username to connect to Elasticsearch if any (Basic
   *     authentication) (nullable)
   * @param pwd password to connect to Elasticsearch if any (Basic
   *     authentication) (nullable)
   * @param token access token (Bearer Token authorization
   *     by Header) (nullable)
   * @param apiKey API Key (Base64 of 'apiId:apiKey') (ApiKey authorization
   *     by Header) (nullable)
   * @param prefix as '/prefix' or null if none
   * @param index as 'waarpr66monitor' as the index name within
   *     Elasticsearch, including extra dynamic information
   * @param compression True to compress REST exchanges between the client
   *     and the Elasticsearch server
   * @param httpHosts array of HttpHost
   */
  public ElasticsearchMonitoringExporterClientImpl(final String username,
                                                   final String pwd,
                                                   final String token,
                                                   final String apiKey,
                                                   final String prefix,
                                                   final String index,
                                                   final boolean compression,
                                                   final HttpHost... httpHosts) {
    this.index = index;
    builder = RestClient.builder(httpHosts).setCompressionEnabled(compression);
    if (ParametersChecker.isNotEmpty(prefix)) {
      builder.setPathPrefix(prefix);
    }
    int headerLen = 1;
    if (ParametersChecker.isNotEmpty(apiKey, token)) {
      headerLen = 2;
    }
    final Header[] defaultHeaders = new Header[headerLen];
    headerLen = 0;
    if (ParametersChecker.isNotEmpty(token)) {
      defaultHeaders[headerLen] =
          new BasicHeader("Authorization", "Bearer " + token);
      headerLen++;
    } else if (ParametersChecker.isNotEmpty(apiKey)) {
      defaultHeaders[headerLen] =
          new BasicHeader("Authorization", "ApiKey " + apiKey);
      headerLen++;
    }
    if (headerLen > 0) {
      builder.setDefaultHeaders(defaultHeaders);
    }
    boolean tls = false;
    for (final HttpHost httpHost : httpHosts) {
      tls |= httpHost.getSchemeName().equalsIgnoreCase("https");
    }
    final SSLContext sslContext;
    if (tls) {
      try {
        final SSLContextBuilder sslBuilder = SSLContexts.custom()
                                                        .loadKeyMaterial(
                                                            NetworkSslServerInitializer.getWaarpSecureKeyStore()
                                                                                       .getKeyStore(),
                                                            NetworkSslServerInitializer.getWaarpSecureKeyStore()
                                                                                       .getKeyStorePassword())
                                                        .loadTrustMaterial(
                                                            NetworkSslServerInitializer.getWaarpSecureKeyStore()
                                                                                       .getKeyTrustStore(),
                                                            null);
        sslContext = sslBuilder.build();
      } catch (final NoSuchAlgorithmException | KeyStoreException |
                     UnrecoverableKeyException | KeyManagementException e) {
        logger.error(e.getMessage());
        throw new IllegalArgumentException(e);
      }
    } else {
      sslContext = null;
    }
    if (ParametersChecker.isNotEmpty(username, pwd)) {
      final CredentialsProvider credentialsProvider =
          new BasicCredentialsProvider();
      credentialsProvider.setCredentials(AuthScope.ANY,
                                         new UsernamePasswordCredentials(
                                             username, pwd));
      builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
            final HttpAsyncClientBuilder httpClientBuilder) {
          if (sslContext != null) {
            return httpClientBuilder.setDefaultCredentialsProvider(
                credentialsProvider).setSSLContext(sslContext);
          } else {
            return httpClientBuilder.setDefaultCredentialsProvider(
                credentialsProvider);
          }
        }
      });
    }
    logger.info("Elasticsearch client: user {} pwd {} token {} apikey {} " +
                "prefix {} index {}", username, pwd, token, apiKey, prefix,
                index);
    createClient();
  }

  protected void createClient() {
    if (client == null) {
      final RestClient restClient = builder.build();
      transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
      client = new ElasticsearchClient(transport);
    }
  }

  @Override
  public final boolean post(final ObjectNode monitoredTransfers,
                            final DateTime start, final DateTime stop,
                            final String serverId) {
    createClient();
    final String finalIndex = index.replace(ELASTIC_WAARPHOST, serverId)
                                   .replaceAll(ELASTIC_DATETIME,
                                               stop.toString(FORMAT_DATETIME))
                                   .replaceAll(ELASTIC_DATEHOUR,
                                               stop.toString(FORMAT_DATEHOUR))
                                   .replaceAll(ELASTIC_DATE,
                                               stop.toString(FORMAT_DATE))
                                   .replaceAll(ELASTIC_YEAR_MONTH,
                                               stop.toString(FORMAT_YEAR_MONTH))
                                   .replaceAll(ELASTIC_YEAR,
                                               stop.toString(FORMAT_YEAR))
                                   .toLowerCase();
    logger.debug("Will post to {}", finalIndex);
    final BulkRequest.Builder bulkRequestBuilder =
        new BulkRequest.Builder().index(finalIndex);
    final ArrayNode arrayNode = (ArrayNode) monitoredTransfers.get(RESULTS);
    final Iterator<JsonNode> iterator = arrayNode.elements();
    final List<BulkOperation> operations = new ArrayList<>();
    while (iterator.hasNext()) {
      final ObjectNode node = (ObjectNode) iterator.next();
      final IndexOperation.Builder<ObjectNode> indexBuilder =
          new IndexOperation.Builder();
      indexBuilder.index(finalIndex);
      indexBuilder.id(node.get(UNIQUE_ID).asText());
      indexBuilder.document(node);
      operations.add(new Builder().index(indexBuilder.build()).build());
    }
    final BulkResponse bulkResponse;
    try {
      bulkResponse =
          client.bulk(bulkRequestBuilder.operations(operations).build());
    } catch (final IOException e) {
      logger.error(e.getMessage());
      return false;
    }
    logger.debug("ES failure? {} {}", bulkResponse.errors());
    if (logger.isDebugEnabled() && bulkResponse.errors()) {
      final List<BulkResponseItem> list = bulkResponse.items();
      for (BulkResponseItem item : list) {
        assert item.error() != null;
        logger.debug("ES item: {}", item.error().reason());
      }
    }
    return !bulkResponse.errors();
  }

  @Override
  public final void close() {
    try {
      transport.close();
    } catch (final IOException e) {
      SysErrLogger.FAKE_LOGGER.ignoreLog(e);
    }
    transport = null;
    client = null;
  }
}