LocalExecServerHandler.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */
package org.waarp.commandexec.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.waarp.commandexec.utils.LocalExecDefaultResult;
import org.waarp.common.crypto.ssl.WaarpSslUtility;
import org.waarp.common.file.FileUtils;
import org.waarp.common.logging.SysErrLogger;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpNettyUtil;
import org.waarp.common.utility.WaarpStringUtils;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.RejectedExecutionException;

/**
 * Handles a server-side channel for LocalExec.
 */
public class LocalExecServerHandler
    extends SimpleChannelInboundHandler<String> {
  private static final String EXCEPTION_WHILE_ANSWERED =
      "Exception while answered: ";
  private static final String EXEC_IN_ERROR_WITH = " Exec in error with ";
  private static final String EXCEPTION = "Exception: ";
  // Fixed delay, but could change if necessary at construction
  private long delay = LocalExecDefaultResult.MAXWAITPROCESS;
  protected final LocalExecServerInitializer factory;
  protected static boolean isShutdown;

  /**
   * Internal Logger
   */
  private static final WaarpLogger logger =
      WaarpLoggerFactory.getLogger(LocalExecServerHandler.class);

  protected boolean answered;

  /**
   * Is the Local Exec Server going Shutdown
   *
   * @param channel associated channel
   *
   * @return True if in Shutdown
   */
  public static boolean isShutdown(final Channel channel) {
    if (isShutdown) {
      channel.writeAndFlush(
          LocalExecDefaultResult.ConnectionRefused.getStatus() + " " +
          LocalExecDefaultResult.ConnectionRefused.getResult() + '\n');
      WaarpNettyUtil.awaitOrInterrupted(
          channel.writeAndFlush(LocalExecDefaultResult.ENDOFCOMMAND + '\n'),
          30000);
      WaarpSslUtility.closingSslChannel(channel);
      return true;
    }
    return false;
  }

  public static void junitSetNotShutdown() {
    isShutdown = false;
  }

  /**
   * Print stack trace
   *
   * @param thread
   * @param stacks
   */
  private static void printStackTrace(final Thread thread,
                                      final StackTraceElement[] stacks) {
    SysErrLogger.FAKE_LOGGER.syserrNoLn(thread + " : ");
    for (int i = 0; i < stacks.length - 1; i++) {
      SysErrLogger.FAKE_LOGGER.syserrNoLn(stacks[i] + " ");
    }
    if (stacks.length > 0) {
      SysErrLogger.FAKE_LOGGER.syserr(stacks[stacks.length - 1]);
    } else {
      SysErrLogger.FAKE_LOGGER.syserr();
    }
  }

  /**
   * Shutdown thread
   */
  private static class GGLEThreadShutdown extends Thread {
    static final long DELAY = 3000;
    final LocalExecServerInitializer factory;

    private GGLEThreadShutdown(final LocalExecServerInitializer factory) {
      this.factory = factory;
    }

    @Override
    public void run() {
      final Timer timer;
      timer = new Timer(true);
      final GGLETimerTask ggleTimerTask = new GGLETimerTask();
      timer.schedule(ggleTimerTask, DELAY);
      factory.releaseResources();
      //FBGEXIT DetectionUtils.SystemExit(0)
    }

  }

  /**
   * TimerTask to terminate the server
   */
  private static class GGLETimerTask extends TimerTask {
    /**
     * Internal Logger
     */
    private static final WaarpLogger logger =
        WaarpLoggerFactory.getLogger(GGLETimerTask.class);

    @Override
    public void run() {
      logger.error("System will force EXIT");
      final Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
      for (final Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
        try {
          printStackTrace(entry.getKey(), entry.getValue());
        } catch (final ArrayIndexOutOfBoundsException e) {
          // ignore
        }
      }
      //FBGEXIT DetectionUtils.SystemExit(0)
    }
  }

  /**
   * Constructor with a specific delay
   *
   * @param newdelay
   */
  public LocalExecServerHandler(final LocalExecServerInitializer factory,
                                final long newdelay) {
    this.factory = factory;
    delay = newdelay;
  }

  @Override
  public void channelActive(final ChannelHandlerContext ctx) {
    if (isShutdown(ctx.channel())) {
      answered = true;
      return;
    }
    answered = false;
    factory.addChannel(ctx.channel());
  }

  /**
   * Change the delay to the specific value. Need to be called before any
   * receive message.
   *
   * @param newdelay
   */
  public final void setNewDelay(final long newdelay) {
    delay = newdelay;
  }

  @Override
  protected void channelRead0(final ChannelHandlerContext ctx,
                              final String msg) {
    answered = false;

    // Generate and write a response.
    String response;
    response = LocalExecDefaultResult.NoStatus.getStatus() + " " +
               LocalExecDefaultResult.NoStatus.getResult();
    ExecuteWatchdog watchdog = null;
    try {
      if (msg.length() == 0) {
        // No command
        response = LocalExecDefaultResult.NoCommand.getStatus() + " " +
                   LocalExecDefaultResult.NoCommand.getResult();
      } else {
        final String[] args = msg.split(" ");
        int cpt = 0;
        long tempDelay;
        try {
          tempDelay = Long.parseLong(args[0]);
          cpt++;
        } catch (final NumberFormatException e) {
          tempDelay = delay;
        }
        if (tempDelay < 0) {
          // Shutdown Order
          isShutdown = true;
          logger.warn("Shutdown order received");
          response = LocalExecDefaultResult.ShutdownOnGoing.getStatus() + " " +
                     LocalExecDefaultResult.ShutdownOnGoing.getResult();
          final Thread thread = new GGLEThreadShutdown(factory);
          thread.start();
          return;
        }
        final String binary = args[cpt++];
        final File exec = new File(binary);
        if (exec.isAbsolute()) {
          // If true file, is it executable
          if (!exec.canExecute()) {
            logger.error("Exec command is not executable: " + msg);
            response = LocalExecDefaultResult.NotExecutable.getStatus() + " " +
                       LocalExecDefaultResult.NotExecutable.getResult();
            return;
          }
        }
        // Create command with parameters
        final CommandLine commandLine = new CommandLine(binary);
        for (; cpt < args.length; cpt++) {
          commandLine.addArgument(args[cpt]);
        }
        final DefaultExecutor defaultExecutor = new DefaultExecutor();
        final ByteArrayOutputStream outputStream;
        outputStream = new ByteArrayOutputStream();
        final PumpStreamHandler pumpStreamHandler =
            new PumpStreamHandler(outputStream);
        defaultExecutor.setStreamHandler(pumpStreamHandler);
        final int[] correctValues = { 0, 1 };
        defaultExecutor.setExitValues(correctValues);
        if (tempDelay > 0) {
          // If delay (max time), then setup Watchdog
          watchdog = new ExecuteWatchdog(tempDelay);
          defaultExecutor.setWatchdog(watchdog);
        }
        int status = -1;
        try {
          // Execute the command
          status = defaultExecutor.execute(commandLine);//NOSONAR
        } catch (final ExecuteException e) {
          if (e.getExitValue() == -559038737) {
            // Cannot run immediately so retry once
            try {
              Thread.sleep(LocalExecDefaultResult.RETRYINMS);
            } catch (final InterruptedException e1) {//NOSONAR
              SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
            }
            try {
              status = defaultExecutor.execute(commandLine);//NOSONAR
            } catch (final ExecuteException e1) {
              try {
                pumpStreamHandler.stop();
              } catch (final IOException ignored) {
                // nothing
              }
              logger.error(EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH +
                           commandLine);
              response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
                         LocalExecDefaultResult.BadExecution.getResult();
              FileUtils.close(outputStream);
              return;
            } catch (final IOException e1) {
              try {
                pumpStreamHandler.stop();
              } catch (final IOException ignored) {
                // nothing
              }
              logger.error(EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH +
                           commandLine);
              response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
                         LocalExecDefaultResult.BadExecution.getResult();
              FileUtils.close(outputStream);
              return;
            }
          } else {
            try {
              pumpStreamHandler.stop();
            } catch (final IOException ignored) {
              // nothing
            }
            logger.error(
                EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH + commandLine);
            response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
                       LocalExecDefaultResult.BadExecution.getResult();
            FileUtils.close(outputStream);
            return;
          }
        } catch (final IOException e) {
          try {
            pumpStreamHandler.stop();
          } catch (final IOException ignored) {
            // nothing
          }
          logger.error(
              EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH + commandLine);
          response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
                     LocalExecDefaultResult.BadExecution.getResult();
          FileUtils.close(outputStream);
          return;
        }
        try {
          pumpStreamHandler.stop();
        } catch (final IOException ignored) {
          // nothing
        }
        if (defaultExecutor.isFailure(status) && watchdog != null &&
            watchdog.killedProcess()) {
          // kill by the watchdoc (time out)
          logger.error("Exec is in Time Out");
          response = LocalExecDefaultResult.TimeOutExecution.getStatus() + " " +
                     LocalExecDefaultResult.TimeOutExecution.getResult();
          FileUtils.close(outputStream);
        } else {
          try {
            response = status + " " +
                       outputStream.toString(WaarpStringUtils.UTF8.name());
          } catch (final UnsupportedEncodingException e) {
            response = status + " " + outputStream;
          }
          FileUtils.close(outputStream);
        }
      }
    } finally {
      // We do not need to write a ByteBuf here.
      // We know the encoder inserted at LocalExecInitializer will do the
      // conversion.
      ctx.channel().writeAndFlush(response + '\n');
      answered = true;
      if (watchdog != null) {
        watchdog.stop();
      }
      logger.info("End of Command: {}:{}", msg, response);
      ctx.channel().writeAndFlush(LocalExecDefaultResult.ENDOFCOMMAND + '\n');
    }
  }

  @Override
  public void exceptionCaught(final ChannelHandlerContext ctx,
                              final Throwable cause) {
    if (!answered) {
      logger.error("Unexpected exception from Outband while not answered.",
                   cause);
    }
    // Look if Nothing to do since execution will stop later on and
    // an error will occur on client side
    // since no message arrived before close (or partially)
    if (cause instanceof CancelledKeyException) {
      // nothing
    } else if (cause instanceof ClosedChannelException) {
      // nothing
    } else if (cause instanceof NullPointerException) {
      if (ctx.channel().isActive()) {
        if (answered) {
          logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
        }
        WaarpSslUtility.closingSslChannel(ctx.channel());
      }
    } else if (cause instanceof IOException) {
      if (ctx.channel().isActive()) {
        if (answered) {
          logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
        }
        WaarpSslUtility.closingSslChannel(ctx.channel());
      }
    } else if (cause instanceof RejectedExecutionException) {
      if (ctx.channel().isActive()) {
        if (answered) {
          logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
        }
        WaarpSslUtility.closingSslChannel(ctx.channel());
      }
    }
  }
}