1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.waarp.ftp.core.control;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.concurrent.RejectedExecutionException;
24
25 import io.netty.channel.Channel;
26 import io.netty.channel.ChannelException;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelHandler;
29 import io.netty.channel.ChannelHandlerContext;
30 import io.netty.channel.SimpleChannelInboundHandler;
31 import io.netty.handler.ssl.SslHandler;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.concurrent.GenericFutureListener;
34
35 import org.waarp.common.command.ReplyCode;
36 import org.waarp.common.command.exception.CommandAbstractException;
37 import org.waarp.common.command.exception.Reply421Exception;
38 import org.waarp.common.command.exception.Reply503Exception;
39 import org.waarp.common.crypto.ssl.WaarpSslUtility;
40 import org.waarp.common.logging.WaarpLogger;
41 import org.waarp.common.logging.WaarpLoggerFactory;
42 import org.waarp.ftp.core.command.AbstractCommand;
43 import org.waarp.ftp.core.command.FtpCommandCode;
44 import org.waarp.ftp.core.command.access.USER;
45 import org.waarp.ftp.core.command.internal.ConnectionCommand;
46 import org.waarp.ftp.core.command.internal.IncorrectCommand;
47 import org.waarp.ftp.core.control.ftps.FtpsInitializer;
48 import org.waarp.ftp.core.data.FtpTransferControl;
49 import org.waarp.ftp.core.exception.FtpNoConnectionException;
50 import org.waarp.ftp.core.session.FtpSession;
51 import org.waarp.ftp.core.utils.FtpChannelUtils;
52
53
54
55
56
57
58
59
60 public class NetworkHandler extends SimpleChannelInboundHandler<String> {
61
62
63
64 private static final WaarpLogger logger = WaarpLoggerFactory.getLogger(NetworkHandler.class);
65
66
67
68
69 private final BusinessHandler businessHandler;
70
71
72
73
74 private final FtpSession session;
75
76
77
78
79 private Channel controlChannel = null;
80
81
82
83 private volatile ChannelHandlerContext ctx;
84
85
86
87
88
89
90 public NetworkHandler(FtpSession session) {
91 super();
92 this.session = session;
93 businessHandler = session.getBusinessHandler();
94 businessHandler.setNetworkHandler(this);
95 }
96
97
98
99
100 public BusinessHandler getBusinessHandler() {
101 return businessHandler;
102 }
103
104
105
106
107 public FtpSession getFtpSession() {
108 return session;
109 }
110
111
112
113
114
115 public Channel getControlChannel() {
116 return controlChannel;
117 }
118
119
120
121
122
123 @Override
124 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
125 if (session == null || session.getDataConn() == null ||
126 session.getDataConn().getFtpTransferControl() == null) {
127 super.channelInactive(ctx);
128 return;
129 }
130
131
132 int limit = 100;
133 while (session.getDataConn().getFtpTransferControl()
134 .isFtpTransferExecuting()) {
135 Thread.sleep(10);
136 limit--;
137 if (limit <= 0) {
138 logger.warn("Waiting for transfer finished but 1s is not enough");
139 break;
140 }
141 }
142 businessHandler.executeChannelClosed();
143
144 businessHandler.clear();
145 session.clear();
146 super.channelInactive(ctx);
147 }
148
149
150
151
152
153 @Override
154 public void channelActive(ChannelHandlerContext ctx) throws Exception {
155 this.ctx = ctx;
156 Channel channel = ctx.channel();
157 controlChannel = channel;
158 session.setControlConnected();
159 FtpChannelUtils.addCommandChannel(channel, session.getConfiguration());
160 if (isStillAlive(ctx)) {
161
162 AbstractCommand command = new ConnectionCommand(getFtpSession());
163 session.setNextCommand(command);
164
165 businessHandler.executeChannelConnected(channel);
166
167 messageRunAnswer(ctx);
168 getFtpSession().setReady(true);
169 }
170 }
171
172
173
174
175
176
177 private boolean isStillAlive(ChannelHandlerContext ctx) {
178 if (session.getConfiguration().isShutdown()) {
179 session.setExitErrorCode("Service is going down: disconnect");
180 writeFinalAnswer(ctx);
181 return false;
182 }
183 return true;
184 }
185
186
187
188
189
190
191 @Override
192 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
193 this.ctx = ctx;
194 Throwable e1 = cause;
195 Channel channel = ctx.channel();
196 if (session == null) {
197
198 logger.warn("NO SESSION", e1);
199 return;
200 }
201 if (e1 instanceof ConnectException) {
202 ConnectException e2 = (ConnectException) e1;
203 logger.warn("Connection impossible since {} with Channel {}", e2
204 .getMessage(), channel);
205 } else if (e1 instanceof ChannelException) {
206 ChannelException e2 = (ChannelException) e1;
207 logger
208 .warn(
209 "Connection (example: timeout) impossible since {} with Channel {}",
210 e2.getMessage(), channel);
211 } else if (e1 instanceof ClosedChannelException) {
212 logger.debug("Connection closed before end");
213 session.setExitErrorCode("Internal error: disconnect");
214 if (channel.isActive()) {
215 writeFinalAnswer(ctx);
216 }
217 return;
218 } else if (e1 instanceof CommandAbstractException) {
219
220 CommandAbstractException e2 = (CommandAbstractException) e1;
221 logger.warn("Command Error Reply {}", e2.getMessage());
222 session.setReplyCode(e2);
223 businessHandler.afterRunCommandKo(e2);
224 if (channel.isActive()) {
225 writeFinalAnswer(ctx);
226 }
227 return;
228 } else if (e1 instanceof NullPointerException) {
229 NullPointerException e2 = (NullPointerException) e1;
230 logger.warn("Null pointer Exception: " + ctx.channel().toString(), e2);
231 try {
232 if (session != null) {
233 session.setExitErrorCode("Internal error: disconnect");
234 if (businessHandler != null &&
235 session.getDataConn() != null) {
236 businessHandler.exceptionLocalCaught(e1);
237 if (channel.isActive()) {
238 writeFinalAnswer(ctx);
239 }
240 }
241 }
242 } catch (NullPointerException e3) {
243 }
244 return;
245 } else if (e1 instanceof IOException) {
246 IOException e2 = (IOException) e1;
247 logger.warn("Connection aborted since {} with Channel {}", e2
248 .getMessage(), channel);
249 logger.warn(e1);
250 } else if (e1 instanceof RejectedExecutionException) {
251 logger.debug("Rejected execution (shutdown) from {}", channel);
252 return;
253 } else {
254 logger.warn("Unexpected exception from Outband Ref Channel: " +
255 channel.toString() + " Exception: " + e1.getMessage(), e1);
256 }
257 session.setExitErrorCode("Internal error: disconnect");
258 businessHandler.exceptionLocalCaught(e1);
259 if (channel.isActive()) {
260 writeFinalAnswer(ctx);
261 }
262 }
263
264
265
266
267
268 @Override
269 public void channelRead0(ChannelHandlerContext ctx, String e) {
270 this.ctx = ctx;
271 if (isStillAlive(ctx)) {
272
273 if (!session.isReady()) {
274 session.setReplyCode(ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION, null);
275 businessHandler.afterRunCommandKo(new Reply421Exception(session.getReplyCode().getMesg()));
276 writeIntermediateAnswer(ctx);
277 return;
278 }
279 String message = e;
280 AbstractCommand command = FtpCommandCode.getFromLine(getFtpSession(), message);
281 logger.debug("RECVMSG: {} CMD: {} " + command.getCode(), message, command.getCommand());
282
283 if (!FtpCommandCode.isSpecialCommand(command.getCode())) {
284
285
286
287 FtpTransferControl control = session.getDataConn().getFtpTransferControl();
288 boolean notFinished = control.waitFtpTransferExecuting();
289 if (notFinished) {
290 session.setReplyCode(
291 ReplyCode.REPLY_503_BAD_SEQUENCE_OF_COMMANDS,
292 "Previous transfer command is not finished yet");
293 businessHandler.afterRunCommandKo(
294 new Reply503Exception(session.getReplyCode().getMesg()));
295 writeIntermediateAnswer(ctx);
296 return;
297 }
298 }
299
300 session.setReplyCode(ReplyCode.REPLY_200_COMMAND_OKAY, null);
301
302 if (FtpCommandCode.isSslOrAuthCommand(command.getCode())) {
303 session.setNextCommand(command);
304 messageRunAnswer(ctx);
305 return;
306 }
307 if (session.getCurrentCommand().isNextCommandValid(command)) {
308 logger.debug("Previous: " + session.getCurrentCommand().getCode() +
309 " Next: " + command.getCode());
310 session.setNextCommand(command);
311 messageRunAnswer(ctx);
312 } else {
313 if (!session.getAuth().isIdentified()) {
314 session.setReplyCode(ReplyCode.REPLY_530_NOT_LOGGED_IN, null);
315 session.setNextCommand(new USER());
316 writeFinalAnswer(ctx);
317 return;
318 }
319 command = new IncorrectCommand();
320 command.setArgs(getFtpSession(), message, null,
321 FtpCommandCode.IncorrectSequence);
322 session.setNextCommand(command);
323 messageRunAnswer(ctx);
324 }
325 }
326 }
327
328
329
330
331
332
333 private boolean writeFinalAnswer(ChannelHandlerContext ctx) {
334 if (session.getReplyCode() == ReplyCode.REPLY_421_SERVICE_NOT_AVAILABLE_CLOSING_CONTROL_CONNECTION
335 ||
336 session.getReplyCode() == ReplyCode.REPLY_221_CLOSING_CONTROL_CONNECTION) {
337 session.getDataConn().getFtpTransferControl().clear();
338 writeIntermediateAnswer(ctx).addListener(WaarpSslUtility.SSLCLOSE);
339 return true;
340 }
341 writeIntermediateAnswer(ctx);
342 session.setCurrentCommandFinished();
343 return false;
344 }
345
346
347
348
349
350
351 public ChannelFuture writeIntermediateAnswer(ChannelHandlerContext ctx) {
352 logger.debug("Answer: " + session.getAnswer());
353 return ctx.writeAndFlush(session.getAnswer());
354 }
355
356
357
358
359
360
361 public ChannelFuture writeIntermediateAnswer() {
362 return writeIntermediateAnswer(ctx);
363 }
364
365
366
367
368
369
370
371 protected void callForSnmp(String error1, String error2) {
372
373 }
374
375
376
377
378 private void messageRunAnswer(final ChannelHandlerContext ctx) {
379 boolean error = false;
380 logger.debug("Code: " + session.getCurrentCommand().getCode());
381 try {
382 businessHandler.beforeRunCommand();
383 AbstractCommand command = session.getCurrentCommand();
384 logger.debug("Run {}", command.getCommand());
385 command.exec();
386 businessHandler.afterRunCommandOk();
387 } catch (CommandAbstractException e) {
388 logger.debug("Command in error", e);
389 error = true;
390 session.setReplyCode(e);
391 businessHandler.afterRunCommandKo(e);
392 }
393 logger.debug("Code: " + session.getCurrentCommand().getCode() +
394 " [" + session.getReplyCode() + "]");
395 if (error) {
396 if (session.getCurrentCommand().getCode() != FtpCommandCode.INTERNALSHUTDOWN) {
397 writeFinalAnswer(ctx);
398 }
399
400 if (session.getDataConn().isActive()) {
401 logger.debug("Closing DataChannel while command is in error");
402 try {
403 session.getDataConn().getCurrentDataChannel().close();
404 } catch (FtpNoConnectionException e) {
405
406 }
407 }
408 return;
409 }
410 if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH ||
411 session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
412 controlChannel.config().setAutoRead(false);
413 ChannelFuture future = writeIntermediateAnswer(ctx);
414 session.setCurrentCommandFinished();
415 if (session.getCurrentCommand().getCode() == FtpCommandCode.AUTH) {
416 logger.debug("SSL to be added to pipeline");
417 ChannelHandler sslHandler = ctx.pipeline().first();
418 if (sslHandler instanceof SslHandler) {
419 logger.debug("Already got a SslHandler");
420 } else {
421 logger.debug("Add Explicitely SSL support to Command");
422
423 sslHandler =
424 FtpsInitializer.waarpSslContextFactory.initInitializer(true,
425 FtpsInitializer.waarpSslContextFactory.needClientAuthentication());
426 session.prepareSsl();
427 WaarpSslUtility.addSslHandler(future, ctx.pipeline(), sslHandler,
428 new GenericFutureListener<Future<? super Channel>>() {
429 public void operationComplete(Future<? super Channel> future) throws Exception {
430 if (!future.isSuccess()) {
431 String error2 = future.cause() != null ?
432 future.cause().getMessage() : "During Handshake";
433 logger.error("Cannot finalize Ssl Command channel " + error2);
434 callForSnmp("SSL Connection Error", error2);
435 session.setSsl(false);
436 ctx.close();
437 } else {
438 logger.debug("End of initialization of SSL and command channel: "
439 + ctx.channel());
440 session.setSsl(true);
441 }
442 }
443 });
444 }
445 } else if (session.getCurrentCommand().getCode() == FtpCommandCode.CCC) {
446 logger.debug("SSL to be removed from pipeline");
447
448 session.prepareSsl();
449 WaarpSslUtility.removingSslHandler(future, controlChannel, false);
450 }
451 } else if (session.getCurrentCommand().getCode() != FtpCommandCode.INTERNALSHUTDOWN) {
452 writeFinalAnswer(ctx);
453 }
454 }
455 }