1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.ftp.core.control;
21
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelException;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelHandler;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.SimpleChannelInboundHandler;
28 import io.netty.handler.ssl.SslHandler;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.GenericFutureListener;
31 import org.waarp.common.command.ReplyCode;
32 import org.waarp.common.command.exception.CommandAbstractException;
33 import org.waarp.common.command.exception.Reply421Exception;
34 import org.waarp.common.command.exception.Reply503Exception;
35 import org.waarp.common.crypto.ssl.WaarpSslUtility;
36 import org.waarp.common.logging.WaarpLogger;
37 import org.waarp.common.logging.WaarpLoggerFactory;
38 import org.waarp.common.utility.WaarpNettyUtil;
39 import org.waarp.ftp.core.command.AbstractCommand;
40 import org.waarp.ftp.core.command.FtpCommandCode;
41 import org.waarp.ftp.core.command.access.USER;
42 import org.waarp.ftp.core.command.internal.ConnectionCommand;
43 import org.waarp.ftp.core.command.internal.IncorrectCommand;
44 import org.waarp.ftp.core.control.ftps.FtpsInitializer;
45 import org.waarp.ftp.core.data.FtpTransferControl;
46 import org.waarp.ftp.core.exception.FtpNoConnectionException;
47 import org.waarp.ftp.core.session.FtpSession;
48 import org.waarp.ftp.core.utils.FtpChannelUtils;
49
50 import java.io.IOException;
51 import java.net.BindException;
52 import java.net.ConnectException;
53 import java.nio.channels.ClosedChannelException;
54 import java.util.concurrent.RejectedExecutionException;
55
56
57
58
59
60
61 public class NetworkHandler extends SimpleChannelInboundHandler<String> {
62 private static final String INTERNAL_ERROR_DISCONNECT =
63 "Internal error: disconnect";
64
65
66
67
68 private static final WaarpLogger logger =
69 WaarpLoggerFactory.getLogger(NetworkHandler.class);
70
71
72
73
74 private final BusinessHandler businessHandler;
75
76
77
78
79 private final FtpSession session;
80
81
82
83
84 private Channel controlChannel;
85
86
87
88 private ChannelHandlerContext ctx;
89
90
91
92
93
94
95 public NetworkHandler(final FtpSession session) {
96 this.session = session;
97 businessHandler = session.getBusinessHandler();
98 businessHandler.setNetworkHandler(this);
99 }
100
101
102
103
104 public final BusinessHandler getBusinessHandler() {
105 return businessHandler;
106 }
107
108
109
110
111 public final FtpSession getFtpSession() {
112 return session;
113 }
114
115
116
117
118 public final Channel getControlChannel() {
119 return controlChannel;
120 }
121
122
123
124
125 @Override
126 public void channelInactive(final ChannelHandlerContext ctx)
127 throws Exception {
128 if (session == null || session.getDataConn() == null ||
129 session.getDataConn().getFtpTransferControl() == null) {
130 super.channelInactive(ctx);
131 return;
132 }
133
134
135 int limit = 200;
136 while (session.getDataConn().getFtpTransferControl()
137 .isFtpTransferExecuting()) {
138 Thread.sleep(WaarpNettyUtil.MINIMAL_DELAY_MS);
139 limit--;
140 if (limit <= 0) {
141 logger.warn("Waiting for transfer finished but 2s is not enough");
142 break;
143 }
144 }
145 businessHandler.executeChannelClosed();
146
147 businessHandler.clear();
148 session.clear();
149 super.channelInactive(ctx);
150 }
151
152
153
154
155 @Override
156 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
157 this.ctx = ctx;
158 final Channel channel = ctx.channel();
159 controlChannel = channel;
160 session.setControlConnected();
161 FtpChannelUtils.addCommandChannel(channel, session.getConfiguration());
162 if (isStillAlive(ctx)) {
163
164 final AbstractCommand command = new ConnectionCommand(getFtpSession());
165 session.setNextCommand(command);
166
167 businessHandler.executeChannelConnected(channel);
168
169 messageRunAnswer(ctx);
170 getFtpSession().setReady(true);
171 }
172 }
173
174
175
176
177
178
179
180
181 private boolean isStillAlive(final ChannelHandlerContext ctx) {
182 if (session.getConfiguration().isShutdown()) {
183 session.setExitErrorCode("Service is going down: disconnect");
184 writeFinalAnswer(ctx);
185 return false;
186 }
187 return true;
188 }
189
190
191
192
193
194
195 @Override
196 public void exceptionCaught(final ChannelHandlerContext ctx,
197 final Throwable cause) {
198 this.ctx = ctx;
199 final Channel channel = ctx.channel();
200 if (session == null) {
201
202 logger.warn("NO SESSION", cause);
203 return;
204 }
205 if (cause instanceof ConnectException) {
206 final ConnectException e2 = (ConnectException) cause;
207 logger.warn("Connection impossible since {} with Channel {}",
208 e2.getMessage(), channel);
209 } else if (cause instanceof ChannelException) {
210 final ChannelException e2 = (ChannelException) cause;
211 logger.warn(
212 "Connection (example: timeout) impossible since {} with Channel {}",
213 e2.getMessage(), channel);
214 } else if (cause instanceof ClosedChannelException) {
215 logger.debug("Connection closed before end");
216 session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
217 if (channel.isActive()) {
218 writeFinalAnswer(ctx);
219 }
220 return;
221 } else if (cause instanceof CommandAbstractException) {
222
223 final CommandAbstractException e2 = (CommandAbstractException) cause;
224 logger.warn("Command Error Reply {}", e2.getMessage());
225 session.setReplyCode(e2);
226 businessHandler.afterRunCommandKo(e2);
227 if (channel.isActive()) {
228 writeFinalAnswer(ctx);
229 }
230 return;
231 } else if (cause instanceof NullPointerException) {
232 final NullPointerException e2 = (NullPointerException) cause;
233 logger.warn("Null pointer Exception: " + ctx.channel(), e2);
234 try {
235 session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
236 if (businessHandler != null && session.getDataConn() != null) {
237 businessHandler.exceptionLocalCaught(cause);
238 if (channel.isActive()) {
239 writeFinalAnswer(ctx);
240 }
241 }
242 } catch (final NullPointerException ignored) {
243
244 }
245 return;
246 } else if (cause instanceof BindException) {
247 final BindException e2 = (BindException) cause;
248 logger.warn("Connection aborted since {} with Channel {}",
249 e2.getMessage(), channel);
250 logger.debug("DEBUG", cause);
251 } else if (cause instanceof IOException) {
252 final IOException e2 = (IOException) cause;
253 logger.warn("Connection aborted since {} with Channel {}",
254 e2.getMessage(), channel);
255 logger.debug("DEBUG", cause);
256 } else if (cause instanceof RejectedExecutionException) {
257 logger.debug("Rejected execution (shutdown) from {}", channel);
258 return;
259 } else {
260 logger.warn("Unexpected exception from Outband Ref Channel: " + channel +
261 " Exception: " + cause.getMessage(), cause);
262 }
263 session.setExitErrorCode(INTERNAL_ERROR_DISCONNECT);
264 businessHandler.exceptionLocalCaught(cause);
265 if (channel.isActive()) {
266 writeFinalAnswer(ctx);
267 }
268 }
269
270
271
272
273 @Override
274 public void channelRead0(final ChannelHandlerContext ctx, final String e) {
275 this.ctx = ctx;
276 if (isStillAlive(ctx)) {
277
278 if (!session.isReady()) {
279 session.setReplyCode(
280 ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION,
281 null);
282 businessHandler.afterRunCommandKo(
283 new Reply421Exception(session.getReplyCode().getMesg()));
284 writeIntermediateAnswer(ctx);
285 return;
286 }
287 AbstractCommand command = FtpCommandCode.getFromLine(getFtpSession(), e);
288 logger.debug("RECVMSG: {} CMD: {} {}", command.getCode(), e,
289 command.getCommand());
290
291 if (!FtpCommandCode.isSpecialCommand(command.getCode())) {
292
293
294
295 final FtpTransferControl control =
296 session.getDataConn().getFtpTransferControl();
297 final boolean notFinished = control.waitFtpTransferExecuting();
298 if (notFinished) {
299 session.setReplyCode(ReplyCode.REPLY_503_BAD_SEQUENCE_OF_COMMANDS,
300 "Previous transfer command is not finished yet");
301 businessHandler.afterRunCommandKo(
302 new Reply503Exception(session.getReplyCode().getMesg()));
303 writeIntermediateAnswer(ctx);
304 return;
305 }
306 }
307
308 session.setReplyCode(ReplyCode.REPLY_200_COMMAND_OKAY, null);
309
310 if (FtpCommandCode.isSslOrAuthCommand(command.getCode())) {
311 session.setNextCommand(command);
312 messageRunAnswer(ctx);
313 return;
314 }
315 if (session.getCurrentCommand().isNextCommandValid(command)) {
316 logger.debug("Previous: {} Next: {}",
317 session.getCurrentCommand().getCode(), command.getCode());
318 session.setNextCommand(command);
319 messageRunAnswer(ctx);
320 } else {
321 if (!session.getAuth().isIdentified()) {
322 session.setReplyCode(ReplyCode.REPLY_530_NOT_LOGGED_IN, null);
323 session.setNextCommand(new USER());
324 writeFinalAnswer(ctx);
325 return;
326 }
327 command = new IncorrectCommand();
328 command.setArgs(getFtpSession(), e, null,
329 FtpCommandCode.IncorrectSequence);
330 session.setNextCommand(command);
331 messageRunAnswer(ctx);
332 }
333 }
334 }
335
336
337
338
339
340
341
342
343 private boolean writeFinalAnswer(final ChannelHandlerContext ctx) {
344 if (session.getReplyCode() ==
345 ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION ||
346 session.getReplyCode() ==
347 ReplyCode.REPLY_221_CLOSING_CONTROL_CONNECTION) {
348 session.getDataConn().getFtpTransferControl().clear();
349 writeIntermediateAnswer(ctx).addListener(WaarpSslUtility.SSLCLOSE);
350 return true;
351 }
352 writeIntermediateAnswer(ctx);
353 session.setCurrentCommandFinished();
354 return false;
355 }
356
357
358
359
360
361
362
363 public final ChannelFuture writeIntermediateAnswer(
364 final ChannelHandlerContext ctx) {
365 logger.debug("Answer: {}", session.getAnswer());
366 return ctx.writeAndFlush(session.getAnswer());
367 }
368
369
370
371
372
373
374
375 public final ChannelFuture writeIntermediateAnswer() {
376 return writeIntermediateAnswer(ctx);
377 }
378
379
380
381
382
383
384
385 protected void callForSnmp(final String error1, final String error2) {
386
387 }
388
389
390
391
392 private void messageRunAnswer(final ChannelHandlerContext ctx) {
393 boolean error = false;
394 logger.debug("Code: {}", session.getCurrentCommand().getCode());
395 try {
396 businessHandler.beforeRunCommand();
397 final AbstractCommand command = session.getCurrentCommand();
398 logger.debug("Run {}", command.getCommand());
399 command.exec();
400 businessHandler.afterRunCommandOk();
401 } catch (final CommandAbstractException e) {
402 logger.debug("Command in error", e);
403 error = true;
404 session.setReplyCode(e);
405 businessHandler.afterRunCommandKo(e);
406 }
407 logger.debug("Code: {} [{}]", session.getCurrentCommand().getCode(),
408 session.getReplyCode());
409 if (error) {
410 if (session.getCurrentCommand().getCode() !=
411 FtpCommandCode.INTERNALSHUTDOWN) {
412 writeFinalAnswer(ctx);
413 }
414
415 if (session.getDataConn().isActive()) {
416 logger.debug("Closing DataChannel while command is in error");
417 try {
418 session.getDataConn().getCurrentDataChannel().close();
419 } catch (final FtpNoConnectionException e) {
420
421 }
422 }
423 return;
424 }
425 if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH ||
426 session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
427 controlChannel.config().setAutoRead(false);
428 final ChannelFuture future = writeIntermediateAnswer(ctx);
429 session.setCurrentCommandFinished();
430 if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH) {
431 logger.debug("SSL to be added to pipeline");
432 ChannelHandler sslHandler = ctx.pipeline().first();
433 if (sslHandler instanceof SslHandler) {
434 logger.debug("Already got a SslHandler");
435 } else {
436 logger.debug("Add Explicitely SSL support to Command");
437
438 sslHandler =
439 FtpsInitializer.waarpSslContextFactory.createHandlerServer(
440 FtpsInitializer.waarpSslContextFactory.needClientAuthentication(),
441 false, ctx.channel());
442 session.prepareSsl();
443 WaarpSslUtility.addSslHandler(future, ctx.pipeline(), sslHandler,
444 new GenericFutureListener<Future<? super Channel>>() {
445 @Override
446 public final void operationComplete(
447 final Future<? super Channel> future) {
448 if (!future.isSuccess()) {
449 final String error2 =
450 future.cause() != null?
451 future.cause()
452 .getMessage() :
453 "During Handshake";
454 logger.error(
455 "Cannot finalize Ssl Command channel {}",
456 error2, future.cause());
457 callForSnmp(
458 "SSL Connection Error",
459 error2);
460 session.setSsl(false);
461 ctx.close();
462 } else {
463 logger.debug(
464 "End of initialization of SSL and command channel: {}",
465 ctx.channel());
466 session.setSsl(true);
467 }
468 }
469 });
470 }
471 } else if (session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
472 logger.debug("SSL to be removed from pipeline");
473
474 session.prepareSsl();
475 WaarpSslUtility.removingSslHandler(future, controlChannel, false);
476 }
477 } else if (session.getCurrentCommand().getCode() !=
478 FtpCommandCode.INTERNALSHUTDOWN) {
479 writeFinalAnswer(ctx);
480 }
481 }
482 }