1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.commandexec.server;
21
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import org.apache.commons.exec.CommandLine;
26 import org.apache.commons.exec.DefaultExecutor;
27 import org.apache.commons.exec.ExecuteException;
28 import org.apache.commons.exec.ExecuteWatchdog;
29 import org.apache.commons.exec.PumpStreamHandler;
30 import org.waarp.commandexec.utils.LocalExecDefaultResult;
31 import org.waarp.common.crypto.ssl.WaarpSslUtility;
32 import org.waarp.common.file.FileUtils;
33 import org.waarp.common.logging.SysErrLogger;
34 import org.waarp.common.logging.WaarpLogger;
35 import org.waarp.common.logging.WaarpLoggerFactory;
36 import org.waarp.common.utility.WaarpNettyUtil;
37 import org.waarp.common.utility.WaarpStringUtils;
38
39 import java.io.ByteArrayOutputStream;
40 import java.io.File;
41 import java.io.IOException;
42 import java.io.UnsupportedEncodingException;
43 import java.nio.channels.CancelledKeyException;
44 import java.nio.channels.ClosedChannelException;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Timer;
48 import java.util.TimerTask;
49 import java.util.concurrent.RejectedExecutionException;
50
51
52
53
54 public class LocalExecServerHandler
55 extends SimpleChannelInboundHandler<String> {
56 private static final String EXCEPTION_WHILE_ANSWERED =
57 "Exception while answered: ";
58 private static final String EXEC_IN_ERROR_WITH = " Exec in error with ";
59 private static final String EXCEPTION = "Exception: ";
60
61 private long delay = LocalExecDefaultResult.MAXWAITPROCESS;
62 protected final LocalExecServerInitializer factory;
63 protected static boolean isShutdown;
64
65
66
67
68 private static final WaarpLogger logger =
69 WaarpLoggerFactory.getLogger(LocalExecServerHandler.class);
70
71 protected boolean answered;
72
73
74
75
76
77
78
79
80 public static boolean isShutdown(final Channel channel) {
81 if (isShutdown) {
82 channel.writeAndFlush(
83 LocalExecDefaultResult.ConnectionRefused.getStatus() + " " +
84 LocalExecDefaultResult.ConnectionRefused.getResult() + '\n');
85 WaarpNettyUtil.awaitOrInterrupted(
86 channel.writeAndFlush(LocalExecDefaultResult.ENDOFCOMMAND + '\n'),
87 30000);
88 WaarpSslUtility.closingSslChannel(channel);
89 return true;
90 }
91 return false;
92 }
93
94 public static void junitSetNotShutdown() {
95 isShutdown = false;
96 }
97
98
99
100
101
102
103
104 private static void printStackTrace(final Thread thread,
105 final StackTraceElement[] stacks) {
106 SysErrLogger.FAKE_LOGGER.syserrNoLn(thread + " : ");
107 for (int i = 0; i < stacks.length - 1; i++) {
108 SysErrLogger.FAKE_LOGGER.syserrNoLn(stacks[i] + " ");
109 }
110 if (stacks.length > 0) {
111 SysErrLogger.FAKE_LOGGER.syserr(stacks[stacks.length - 1]);
112 } else {
113 SysErrLogger.FAKE_LOGGER.syserr();
114 }
115 }
116
117
118
119
120 private static class GGLEThreadShutdown extends Thread {
121 static final long DELAY = 3000;
122 final LocalExecServerInitializer factory;
123
124 private GGLEThreadShutdown(final LocalExecServerInitializer factory) {
125 this.factory = factory;
126 }
127
128 @Override
129 public void run() {
130 final Timer timer;
131 timer = new Timer(true);
132 final GGLETimerTask ggleTimerTask = new GGLETimerTask();
133 timer.schedule(ggleTimerTask, DELAY);
134 factory.releaseResources();
135
136 }
137
138 }
139
140
141
142
143 private static class GGLETimerTask extends TimerTask {
144
145
146
147 private static final WaarpLogger logger =
148 WaarpLoggerFactory.getLogger(GGLETimerTask.class);
149
150 @Override
151 public void run() {
152 logger.error("System will force EXIT");
153 final Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
154 for (final Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
155 try {
156 printStackTrace(entry.getKey(), entry.getValue());
157 } catch (final ArrayIndexOutOfBoundsException e) {
158
159 }
160 }
161
162 }
163 }
164
165
166
167
168
169
170 public LocalExecServerHandler(final LocalExecServerInitializer factory,
171 final long newdelay) {
172 this.factory = factory;
173 delay = newdelay;
174 }
175
176 @Override
177 public void channelActive(final ChannelHandlerContext ctx) {
178 if (isShutdown(ctx.channel())) {
179 answered = true;
180 return;
181 }
182 answered = false;
183 factory.addChannel(ctx.channel());
184 }
185
186
187
188
189
190
191
192 public final void setNewDelay(final long newdelay) {
193 delay = newdelay;
194 }
195
196 @Override
197 protected void channelRead0(final ChannelHandlerContext ctx,
198 final String msg) {
199 answered = false;
200
201
202 String response;
203 response = LocalExecDefaultResult.NoStatus.getStatus() + " " +
204 LocalExecDefaultResult.NoStatus.getResult();
205 ExecuteWatchdog watchdog = null;
206 try {
207 if (msg.length() == 0) {
208
209 response = LocalExecDefaultResult.NoCommand.getStatus() + " " +
210 LocalExecDefaultResult.NoCommand.getResult();
211 } else {
212 final String[] args = msg.split(" ");
213 int cpt = 0;
214 long tempDelay;
215 try {
216 tempDelay = Long.parseLong(args[0]);
217 cpt++;
218 } catch (final NumberFormatException e) {
219 tempDelay = delay;
220 }
221 if (tempDelay < 0) {
222
223 isShutdown = true;
224 logger.warn("Shutdown order received");
225 response = LocalExecDefaultResult.ShutdownOnGoing.getStatus() + " " +
226 LocalExecDefaultResult.ShutdownOnGoing.getResult();
227 final Thread thread = new GGLEThreadShutdown(factory);
228 thread.start();
229 return;
230 }
231 final String binary = args[cpt++];
232 final File exec = new File(binary);
233 if (exec.isAbsolute()) {
234
235 if (!exec.canExecute()) {
236 logger.error("Exec command is not executable: " + msg);
237 response = LocalExecDefaultResult.NotExecutable.getStatus() + " " +
238 LocalExecDefaultResult.NotExecutable.getResult();
239 return;
240 }
241 }
242
243 final CommandLine commandLine = new CommandLine(binary);
244 for (; cpt < args.length; cpt++) {
245 commandLine.addArgument(args[cpt]);
246 }
247 final DefaultExecutor defaultExecutor = new DefaultExecutor();
248 final ByteArrayOutputStream outputStream;
249 outputStream = new ByteArrayOutputStream();
250 final PumpStreamHandler pumpStreamHandler =
251 new PumpStreamHandler(outputStream);
252 defaultExecutor.setStreamHandler(pumpStreamHandler);
253 final int[] correctValues = { 0, 1 };
254 defaultExecutor.setExitValues(correctValues);
255 if (tempDelay > 0) {
256
257 watchdog = new ExecuteWatchdog(tempDelay);
258 defaultExecutor.setWatchdog(watchdog);
259 }
260 int status = -1;
261 try {
262
263 status = defaultExecutor.execute(commandLine);
264 } catch (final ExecuteException e) {
265 if (e.getExitValue() == -559038737) {
266
267 try {
268 Thread.sleep(LocalExecDefaultResult.RETRYINMS);
269 } catch (final InterruptedException e1) {
270 SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
271 }
272 try {
273 status = defaultExecutor.execute(commandLine);
274 } catch (final ExecuteException e1) {
275 try {
276 pumpStreamHandler.stop();
277 } catch (final IOException ignored) {
278
279 }
280 logger.error(EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH +
281 commandLine);
282 response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
283 LocalExecDefaultResult.BadExecution.getResult();
284 FileUtils.close(outputStream);
285 return;
286 } catch (final IOException e1) {
287 try {
288 pumpStreamHandler.stop();
289 } catch (final IOException ignored) {
290
291 }
292 logger.error(EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH +
293 commandLine);
294 response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
295 LocalExecDefaultResult.BadExecution.getResult();
296 FileUtils.close(outputStream);
297 return;
298 }
299 } else {
300 try {
301 pumpStreamHandler.stop();
302 } catch (final IOException ignored) {
303
304 }
305 logger.error(
306 EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH + commandLine);
307 response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
308 LocalExecDefaultResult.BadExecution.getResult();
309 FileUtils.close(outputStream);
310 return;
311 }
312 } catch (final IOException e) {
313 try {
314 pumpStreamHandler.stop();
315 } catch (final IOException ignored) {
316
317 }
318 logger.error(
319 EXCEPTION + e.getMessage() + EXEC_IN_ERROR_WITH + commandLine);
320 response = LocalExecDefaultResult.BadExecution.getStatus() + " " +
321 LocalExecDefaultResult.BadExecution.getResult();
322 FileUtils.close(outputStream);
323 return;
324 }
325 try {
326 pumpStreamHandler.stop();
327 } catch (final IOException ignored) {
328
329 }
330 if (defaultExecutor.isFailure(status) && watchdog != null &&
331 watchdog.killedProcess()) {
332
333 logger.error("Exec is in Time Out");
334 response = LocalExecDefaultResult.TimeOutExecution.getStatus() + " " +
335 LocalExecDefaultResult.TimeOutExecution.getResult();
336 FileUtils.close(outputStream);
337 } else {
338 try {
339 response = status + " " +
340 outputStream.toString(WaarpStringUtils.UTF8.name());
341 } catch (final UnsupportedEncodingException e) {
342 response = status + " " + outputStream;
343 }
344 FileUtils.close(outputStream);
345 }
346 }
347 } finally {
348
349
350
351 ctx.channel().writeAndFlush(response + '\n');
352 answered = true;
353 if (watchdog != null) {
354 watchdog.stop();
355 }
356 logger.info("End of Command: {}:{}", msg, response);
357 ctx.channel().writeAndFlush(LocalExecDefaultResult.ENDOFCOMMAND + '\n');
358 }
359 }
360
361 @Override
362 public void exceptionCaught(final ChannelHandlerContext ctx,
363 final Throwable cause) {
364 if (!answered) {
365 logger.error("Unexpected exception from Outband while not answered.",
366 cause);
367 }
368
369
370
371 if (cause instanceof CancelledKeyException) {
372
373 } else if (cause instanceof ClosedChannelException) {
374
375 } else if (cause instanceof NullPointerException) {
376 if (ctx.channel().isActive()) {
377 if (answered) {
378 logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
379 }
380 WaarpSslUtility.closingSslChannel(ctx.channel());
381 }
382 } else if (cause instanceof IOException) {
383 if (ctx.channel().isActive()) {
384 if (answered) {
385 logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
386 }
387 WaarpSslUtility.closingSslChannel(ctx.channel());
388 }
389 } else if (cause instanceof RejectedExecutionException) {
390 if (ctx.channel().isActive()) {
391 if (answered) {
392 logger.debug(EXCEPTION_WHILE_ANSWERED, cause);
393 }
394 WaarpSslUtility.closingSslChannel(ctx.channel());
395 }
396 }
397 }
398 }