HttpMonitoringExporterClient.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */

package org.waarp.openr66.protocol.monitoring;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.internal.SocketUtils;
import org.joda.time.DateTime;
import org.waarp.common.future.WaarpFuture;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpNettyUtil;
import org.waarp.common.utility.WaarpStringUtils;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.http.restv2.utils.JsonUtils;
import org.waarp.openr66.protocol.networkhandler.ssl.NetworkSslServerInitializer;

import javax.net.ssl.SSLException;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;

import static org.waarp.openr66.protocol.configuration.Configuration.*;

/**
 * HttpMonitoringExporterClient used by the MonitorExporterTransfers
 */
public class HttpMonitoringExporterClient implements Closeable {
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(HttpMonitoringExporterClient.class);
  public static final String HTTPS = "https";
  public static final String BASIC = "Basic ";
  public static final String BEARER = "Bearer ";
  public static final String API_KEY = "ApiKey ";

  private final URI finalUri;
  private final String host;
  private final boolean keepConnection;
  private final Bootstrap bootstrap;
  private int port;
  private final String basicAuthent;
  private final String token;
  private final String apiKey;
  private Channel remoteRestChannel = null;

  private WaarpFuture futurePost = null;

  /**
   * Note that only one among (basicAuthent, token, apikey) is allowed and
   * will be taken into account.
   *
   * @param remoteBaseUrl as 'http://myhost.com:8080' or 'https://myhost.com:8443'
   * @param basicAuthent Basic Authent in Base64 format to connect to
   *     REST API if any (Basic authentication from 'username:paswwd')
   *     (nullable)
   * @param token access token (Bearer Token authorization
   *     by Header) (nullable)
   * @param apiKey API Key (Base64 of 'apiId:apiKey') (ApiKey authorization
   *     by Header) (nullable)
   * @param endpoint as '/waarpr66monitor' or simply '/'
   * @param keepConnection True to keep the connexion opened, False to release the connexion each time
   * @param group the EventLoopGroup to use
   */
  public HttpMonitoringExporterClient(final String remoteBaseUrl,
                                      final String basicAuthent,
                                      final String token, final String apiKey,
                                      final String endpoint,
                                      final boolean keepConnection,
                                      final EventLoopGroup group) {
    this.keepConnection = keepConnection;
    this.token = token;
    this.basicAuthent = basicAuthent;
    this.apiKey = apiKey;
    final String uri;
    if (remoteBaseUrl.endsWith("/")) {
      uri = remoteBaseUrl + endpoint;
    } else {
      uri = remoteBaseUrl + "/" + endpoint;
    }

    try {
      finalUri = new URI(uri);
    } catch (final URISyntaxException e) {
      logger.error("URI syntax error: {}", e.getMessage());
      throw new IllegalArgumentException(e);
    }
    final String scheme =
        finalUri.getScheme() == null? "http" : finalUri.getScheme();
    host = finalUri.getHost() == null? "127.0.0.1" : finalUri.getHost();
    port = finalUri.getPort();
    if (port == -1) {
      if ("http".equalsIgnoreCase(scheme)) {
        port = 80;
      } else if (HTTPS.equalsIgnoreCase(scheme)) {
        port = 443;
      }
    }

    if (!"http".equalsIgnoreCase(scheme) && !HTTPS.equalsIgnoreCase(scheme)) {
      logger.error("Only HTTP(S) is supported.");
      throw new IllegalArgumentException("Only HTTP(S) is supported.");
    }

    final boolean ssl = HTTPS.equalsIgnoreCase(scheme);
    final SslContext sslCtx;
    if (ssl) {
      try {
        sslCtx = SslContextBuilder.forClient().keyManager(
            NetworkSslServerInitializer.getWaarpSecureKeyStore()
                                       .getKeyManagerFactory()).trustManager(
            InsecureTrustManagerFactory.INSTANCE).build();
      } catch (final SSLException e) {
        logger.error("SslContext error", e);
        throw new IllegalArgumentException(e);
      }
    } else {
      sslCtx = null;
    }

    // Configure the client.
    bootstrap = new Bootstrap();
    WaarpNettyUtil.setBootstrap(bootstrap, group,
                                (int) Configuration.configuration.getTimeoutCon(),
                                configuration.getBlockSize() + 64, true);
    bootstrap.handler(
        new HttpMonitoringExporterClientInitializer(sslCtx, this));
  }

  /**
   * @param monitoredTransfers the Json objet to push as POST
   * @param start the DateTime for the 'from' interval
   * @param stop the DateTime for the 'to' interval
   * @param serverId the serverId that is sending this monitoring information
   *
   * @return True if the POST succeeded
   */
  public final boolean post(final ObjectNode monitoredTransfers,
                            final DateTime start, final DateTime stop,
                            final String serverId) {
    logger.debug("Start Post from {} to {} as {}", start, stop, serverId);
    if (keepConnection && remoteRestChannel != null &&
        !remoteRestChannel.isActive()) {
      remoteRestChannel.close();
      remoteRestChannel = null;
    }
    if (remoteRestChannel == null) {
      final ChannelFuture future =
          bootstrap.connect(SocketUtils.socketAddress(host, port));
      try {
        remoteRestChannel = future.sync().channel();
      } catch (final InterruptedException e) {//NOSONAR
        logger.error(e);
        return false;
      }
    }
    futurePost = new WaarpFuture(true);
    // Prepare Body
    final String body = JsonUtils.nodeToString(monitoredTransfers);
    final int length;
    final byte[] bbody;
    try {
      bbody = body.getBytes(WaarpStringUtils.UTF_8);
      length = body.length();
    } catch (final UnsupportedEncodingException e) {
      logger.error(e.getMessage());
      return false;
    }
    final ByteBuf buf = Unpooled.wrappedBuffer(bbody);
    final HttpHeaders headers = new DefaultHttpHeaders(true);
    // Header set
    headers.set(HttpHeaderNames.HOST, host);
    if (keepConnection) {
      headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    } else {
      headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
    }
    if (basicAuthent != null) {
      headers.set(HttpHeaderNames.AUTHORIZATION, BASIC + basicAuthent);
    } else if (token != null) {
      headers.set(HttpHeaderNames.AUTHORIZATION, BEARER + token);
    } else if (apiKey != null) {
      headers.set(HttpHeaderNames.AUTHORIZATION, API_KEY + apiKey);
    }
    headers.set(MonitorExporterTransfers.HEADER_WAARP_ID, serverId);
    headers.set(MonitorExporterTransfers.HEADER_WAARP_START,
                start == null? "" : start.toString());
    headers.set(MonitorExporterTransfers.HEADER_WAARP_STOP, stop.toString());
    headers.set(HttpHeaderNames.CONTENT_LENGTH, length);
    headers.set(HttpHeaderNames.CONTENT_TYPE,
                HttpHeaderValues.APPLICATION_JSON);
    final HttpRequest request =
        new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
                               finalUri.toASCIIString(), headers);
    // Body set
    logger.debug("Request from {} to {} as {} length {}", start, stop, serverId,
                 length);
    logger.debug("{} {} {} {}", request.method(), request.uri(),
                 request.protocolVersion(), request.headers());
    remoteRestChannel.write(request);
    remoteRestChannel.write(buf);
    remoteRestChannel.writeAndFlush(DefaultLastHttpContent.EMPTY_LAST_CONTENT)
                     .awaitUninterruptibly();
    logger.debug("Ending Post from {} to {} as {}", start, stop, serverId);

    if (!keepConnection) {
      try {
        logger.debug("Wait for Close connection");
        remoteRestChannel.closeFuture().sync();
        remoteRestChannel = null;
      } catch (final InterruptedException e) {//NOSONAR
        logger.error(e);
        // ignore
      }
    }
    futurePost.awaitOrInterruptible();
    final boolean result = futurePost.isSuccess();
    logger.info("End Post from {} to {} as {} with {}", start, stop, serverId,
                result);
    return result;
  }

  public final boolean isKeepConnection() {
    return keepConnection;
  }

  public final void setStatus(final boolean ok) {
    if (ok) {
      futurePost.setSuccess();
    } else {
      futurePost.cancel();
    }
  }

  @Override
  public final void close() throws IOException {
    if (remoteRestChannel != null && !remoteRestChannel.isActive()) {
      remoteRestChannel.close();
      remoteRestChannel = null;
    }
  }
}