1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.ftp.core.data.handler;
21
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelException;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.SimpleChannelInboundHandler;
28 import org.waarp.common.crypto.ssl.WaarpSslUtility;
29 import org.waarp.common.exception.FileTransferException;
30 import org.waarp.common.exception.InvalidArgumentException;
31 import org.waarp.common.file.DataBlock;
32 import org.waarp.common.logging.SysErrLogger;
33 import org.waarp.common.logging.WaarpLogger;
34 import org.waarp.common.logging.WaarpLoggerFactory;
35 import org.waarp.common.utility.WaarpNettyUtil;
36 import org.waarp.common.utility.WaarpStringUtils;
37 import org.waarp.ftp.core.config.FtpConfiguration;
38 import org.waarp.ftp.core.config.FtpInternalConfiguration;
39 import org.waarp.ftp.core.control.NetworkHandler;
40 import org.waarp.ftp.core.data.FtpTransfer;
41 import org.waarp.ftp.core.data.FtpTransferControl;
42 import org.waarp.ftp.core.exception.FtpNoConnectionException;
43 import org.waarp.ftp.core.exception.FtpNoFileException;
44 import org.waarp.ftp.core.exception.FtpNoTransferException;
45 import org.waarp.ftp.core.session.FtpSession;
46 import org.waarp.ftp.core.utils.FtpChannelUtils;
47
48 import java.io.IOException;
49 import java.net.BindException;
50 import java.net.ConnectException;
51 import java.nio.channels.CancelledKeyException;
52 import java.nio.channels.ClosedChannelException;
53 import java.nio.channels.NotYetConnectedException;
54
55
56
57
58 public class DataNetworkHandler extends SimpleChannelInboundHandler<DataBlock> {
59
60
61
62 private static final WaarpLogger logger =
63 WaarpLoggerFactory.getLogger(DataNetworkHandler.class);
64
65
66
67
68 private DataBusinessHandler dataBusinessHandler;
69
70
71
72
73 protected final FtpConfiguration configuration;
74
75
76
77
78 private final boolean isActive;
79
80
81
82
83 protected FtpSession session;
84
85
86
87
88 private Channel dataChannel;
89
90
91
92
93 private ChannelPipeline channelPipeline;
94
95
96
97
98 private FtpTransfer ftpTransfer;
99
100
101
102
103
104
105
106
107 public DataNetworkHandler(final FtpConfiguration configuration,
108 final DataBusinessHandler handler,
109 final boolean active) {
110 this.configuration = configuration;
111 dataBusinessHandler = handler;
112 dataBusinessHandler.setDataNetworkHandler(this);
113 isActive = active;
114 }
115
116
117
118
119
120
121 public DataBusinessHandler getDataBusinessHandler()
122 throws FtpNoConnectionException {
123 if (dataBusinessHandler == null) {
124 throw new FtpNoConnectionException("No Data Connection active");
125 }
126 return dataBusinessHandler;
127 }
128
129
130
131
132 public final FtpSession getFtpSession() {
133 return session;
134 }
135
136
137
138
139 public final NetworkHandler getNetworkHandler() {
140 return session.getBusinessHandler().getNetworkHandler();
141 }
142
143
144
145
146
147
148 @Override
149 public void channelInactive(final ChannelHandlerContext ctx)
150 throws Exception {
151 logger.debug("Data Channel closed with a session ? {}", session != null);
152 if (session != null) {
153 if (!session.getDataConn().checkCorrectChannel(ctx.channel())) {
154 for (int i = 0; i < FtpInternalConfiguration.RETRYNB * 10; i++) {
155 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
156 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
157 break;
158 }
159 }
160 }
161 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
162 session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
163 } else {
164 session.getDataConn().getFtpTransferControl()
165 .setTransferAbortedFromInternal(true);
166 }
167 try {
168 getDataBusinessHandler().executeChannelClosed();
169
170 getDataBusinessHandler().clear();
171 } catch (final FtpNoConnectionException ignored) {
172
173 }
174 session.getDataConn().unbindData();
175 dataBusinessHandler = null;
176 channelPipeline = null;
177 dataChannel = null;
178 }
179 super.channelInactive(ctx);
180 }
181
182 protected final void setSession(final Channel channel) {
183 if (session != null) {
184 return;
185 }
186
187 session = configuration.getFtpSession(channel, isActive);
188 if (session == null) {
189
190 logger.error("Session not found for {}!", isActive? "Active" : "Passive");
191 WaarpSslUtility.closingSslChannel(channel);
192
193
194 }
195 }
196
197
198
199
200 @Override
201 public void channelActive(final ChannelHandlerContext ctx) {
202 final Channel channel = ctx.channel();
203 channel.config().setAutoRead(false);
204 if (session == null) {
205 setSession(channel);
206 }
207 logger.debug("Data Channel opened as {}", channel);
208 if (session == null) {
209 logger.debug(
210 "DataChannel immediately closed since no session is assigned");
211 WaarpSslUtility.closingSslChannel(ctx.channel());
212 return;
213 }
214 channelPipeline = ctx.pipeline();
215 dataChannel = channel;
216 dataBusinessHandler.setFtpSession(getFtpSession());
217 FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
218 logger.debug("DataChannel connected: {}", session.getReplyCode());
219 if (session.getCurrentCommand() != null && session.getReplyCode() != null &&
220 session.getReplyCode().getCode() >= 400) {
221
222 switch (session.getCurrentCommand().getCode()) {
223 case RETR:
224 case APPE:
225 case STOR:
226 case STOU:
227
228 logger.info(
229 "DataChannel immediately closed since {} is not ok at startup",
230 session.getCurrentCommand().getCode());
231 WaarpSslUtility.closingSslChannel(ctx.channel());
232 session.getDataConn().getFtpTransferControl()
233 .setOpenedDataChannel(null, this);
234 return;
235 default:
236 break;
237 }
238 }
239 if (isStillAlive()) {
240 try {
241 setCorrectCodec();
242 } catch (final FtpNoConnectionException e) {
243 logger.error(e.getMessage());
244 }
245 unlockModeCodec();
246 session.getDataConn().getFtpTransferControl()
247 .setOpenedDataChannel(channel, this);
248 logger.debug("DataChannel fully configured");
249 } else {
250
251 logger.info("Connected but no more alive so will disconnect");
252 session.getDataConn().getFtpTransferControl()
253 .setOpenedDataChannel(null, this);
254 }
255 }
256
257
258
259
260
261 public final void setCorrectCodec() throws FtpNoConnectionException {
262 if (channelPipeline == null) {
263 logger.error("No channelPipeline defined while session is ", session);
264 throw new FtpNoConnectionException("No Data socket opened");
265 }
266 final FtpDataModeCodec modeCodec =
267 (FtpDataModeCodec) channelPipeline.get(FtpDataInitializer.CODEC_MODE);
268 final FtpDataTypeCodec typeCodec =
269 (FtpDataTypeCodec) channelPipeline.get(FtpDataInitializer.CODEC_TYPE);
270 final FtpDataStructureCodec structureCodec =
271 (FtpDataStructureCodec) channelPipeline.get(
272 FtpDataInitializer.CODEC_STRUCTURE);
273 if (modeCodec == null || typeCodec == null || structureCodec == null) {
274 return;
275 }
276 modeCodec.setMode(session.getDataConn().getMode());
277 modeCodec.setStructure(session.getDataConn().getStructure());
278 typeCodec.setFullType(session.getDataConn().getType(),
279 session.getDataConn().getSubType());
280 structureCodec.setStructure(session.getDataConn().getStructure());
281 logger.debug("codec setup");
282 }
283
284
285
286
287 public final void unlockModeCodec() {
288 final FtpDataModeCodec modeCodec =
289 (FtpDataModeCodec) channelPipeline.get(FtpDataInitializer.CODEC_MODE);
290 modeCodec.setCodecReady();
291 }
292
293
294
295
296
297 @Override
298 public void exceptionCaught(final ChannelHandlerContext ctx,
299 final Throwable cause) {
300 if (session == null) {
301 logger.info("Error without any session active {}", cause);
302 return;
303 }
304 if (cause instanceof ConnectException) {
305 final ConnectException e2 = (ConnectException) cause;
306 logger.warn("Connection impossible since {}", e2.getMessage());
307 } else if (cause instanceof ChannelException) {
308 final ChannelException e2 = (ChannelException) cause;
309 logger.warn("Connection (example: timeout) impossible since {}",
310 e2.getMessage());
311 } else if (cause instanceof ClosedChannelException) {
312 logger.debug("Connection closed before end");
313 } else if (cause instanceof InvalidArgumentException) {
314 final InvalidArgumentException e2 = (InvalidArgumentException) cause;
315 logger.warn("Bad configuration in Codec in {}", e2.getMessage());
316 } else if (cause instanceof NullPointerException) {
317 final NullPointerException e2 = (NullPointerException) cause;
318 logger.warn("Null pointer Exception", e2);
319 try {
320 if (dataBusinessHandler != null) {
321 dataBusinessHandler.exceptionLocalCaught(cause);
322 if (session.getDataConn() != null &&
323 session.getDataConn().checkCorrectChannel(ctx.channel())) {
324 session.getDataConn().getFtpTransferControl()
325 .setTransferAbortedFromInternal(true);
326 }
327 }
328 } catch (final NullPointerException ignored) {
329
330 }
331 return;
332 } else if (cause instanceof CancelledKeyException) {
333 final CancelledKeyException e2 = (CancelledKeyException) cause;
334 logger.warn("Connection aborted since {}", e2.getMessage());
335
336
337 return;
338 } else if (cause instanceof IOException) {
339 final IOException e2 = (IOException) cause;
340 logger.warn("Connection aborted since {}", e2.getMessage());
341 } else if (cause instanceof NotYetConnectedException) {
342 final NotYetConnectedException e2 = (NotYetConnectedException) cause;
343 logger.debug("Ignore this exception {}", e2.getMessage());
344 return;
345 } else if (cause instanceof BindException) {
346 final BindException e2 = (BindException) cause;
347 logger.warn("Address already in use {}", e2.getMessage());
348 } else {
349 logger.warn("Unexpected exception from Outband: {}", cause.getMessage(),
350 cause);
351 }
352 if (dataBusinessHandler != null) {
353 dataBusinessHandler.exceptionLocalCaught(cause);
354 }
355 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
356 session.getDataConn().getFtpTransferControl()
357 .setTransferAbortedFromInternal(true);
358 }
359 }
360
361 public final synchronized void setFtpTransfer(final FtpTransfer ftpTransfer) {
362 this.ftpTransfer = ftpTransfer;
363 }
364
365
366
367
368 @Override
369 public synchronized void channelRead0(final ChannelHandlerContext ctx,
370 final DataBlock dataBlock) {
371 if (ftpTransfer == null) {
372 for (int i = 0; i < 20; i++) {
373 try {
374 ftpTransfer = session.getDataConn().getFtpTransferControl()
375 .getExecutingFtpTransfer();
376 if (ftpTransfer != null) {
377 break;
378 }
379 } catch (final FtpNoTransferException e) {
380 try {
381 Thread.sleep(WaarpNettyUtil.SIMPLE_DELAY_MS);
382 } catch (final InterruptedException e1) {
383 SysErrLogger.FAKE_LOGGER.ignoreLog(e1);
384 break;
385 }
386 }
387 }
388 if (ftpTransfer == null) {
389 logger.info("No ExecutionFtpTransfer found");
390 session.getDataConn().getFtpTransferControl()
391 .setTransferAbortedFromInternal(true);
392 return;
393 }
394 }
395 if (isStillAlive()) {
396 try {
397 ftpTransfer.getFtpFile().writeDataBlock(dataBlock);
398 } catch (final FtpNoFileException e1) {
399 logger.warn(e1);
400 session.getDataConn().getFtpTransferControl()
401 .setTransferAbortedFromInternal(true);
402 } catch (final FileTransferException e1) {
403 logger.warn("File Transfer Exception: {}", e1.getMessage());
404 session.getDataConn().getFtpTransferControl()
405 .setTransferAbortedFromInternal(true);
406 }
407 } else {
408
409 session.getDataConn().getFtpTransferControl()
410 .setTransferAbortedFromInternal(true);
411 WaarpSslUtility.closingSslChannel(ctx.channel());
412 }
413 }
414
415
416
417
418
419
420
421
422 public final boolean writeMessage(final String message) {
423 final DataBlock dataBlock = new DataBlock();
424 dataBlock.setEOF(true);
425 dataBlock.setBlock(message.getBytes(WaarpStringUtils.UTF8));
426 final ChannelFuture future;
427 if (logger.isDebugEnabled()) {
428 logger.debug("Will write: {}", message);
429 }
430 future = dataChannel.writeAndFlush(dataBlock);
431 WaarpNettyUtil.awaitOrInterrupted(future);
432 logger.debug("Write result: {} {}", future.isSuccess(), future.cause());
433 return future.isSuccess();
434 }
435
436
437
438
439
440
441
442
443 private boolean isStillAlive() {
444 if (session.getConfiguration().isShutdown()) {
445 session.setExitErrorCode("Service is going down: disconnect");
446 return false;
447 }
448 return true;
449 }
450 }