1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.waarp.ftp.core.data.handler;
19
20 import java.io.IOException;
21 import java.net.BindException;
22 import java.net.ConnectException;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.NotYetConnectedException;
26
27 import io.netty.buffer.ByteBuf;
28 import io.netty.buffer.Unpooled;
29 import io.netty.channel.Channel;
30 import io.netty.channel.ChannelException;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.SimpleChannelInboundHandler;
35
36 import org.waarp.common.crypto.ssl.WaarpSslUtility;
37 import org.waarp.common.exception.FileTransferException;
38 import org.waarp.common.exception.InvalidArgumentException;
39 import org.waarp.common.file.DataBlock;
40 import org.waarp.common.logging.WaarpLogger;
41 import org.waarp.common.logging.WaarpLoggerFactory;
42 import org.waarp.common.utility.WaarpStringUtils;
43 import org.waarp.ftp.core.config.FtpConfiguration;
44 import org.waarp.ftp.core.config.FtpInternalConfiguration;
45 import org.waarp.ftp.core.control.NetworkHandler;
46 import org.waarp.ftp.core.data.FtpTransfer;
47 import org.waarp.ftp.core.data.FtpTransferControl;
48 import org.waarp.ftp.core.exception.FtpNoConnectionException;
49 import org.waarp.ftp.core.exception.FtpNoFileException;
50 import org.waarp.ftp.core.exception.FtpNoTransferException;
51 import org.waarp.ftp.core.session.FtpSession;
52 import org.waarp.ftp.core.utils.FtpChannelUtils;
53
54
55
56
57
58
59
60 public class DataNetworkHandler extends SimpleChannelInboundHandler<DataBlock> {
61
62
63
64 private static final WaarpLogger logger = WaarpLoggerFactory
65 .getLogger(DataNetworkHandler.class);
66
67
68
69
70 private DataBusinessHandler dataBusinessHandler = null;
71
72
73
74
75 protected final FtpConfiguration configuration;
76
77
78
79
80 private final boolean isActive;
81
82
83
84
85 protected FtpSession session = null;
86
87
88
89
90 private Channel dataChannel = null;
91
92
93
94
95 private ChannelPipeline channelPipeline = null;
96
97
98
99
100 private volatile FtpTransfer ftpTransfer = null;
101
102
103
104
105
106
107
108
109 public DataNetworkHandler(FtpConfiguration configuration,
110 DataBusinessHandler handler, boolean active) {
111 super();
112 this.configuration = configuration;
113 dataBusinessHandler = handler;
114 dataBusinessHandler.setDataNetworkHandler(this);
115 isActive = active;
116 }
117
118
119
120
121
122 public DataBusinessHandler getDataBusinessHandler()
123 throws FtpNoConnectionException {
124 if (dataBusinessHandler == null) {
125 throw new FtpNoConnectionException("No Data Connection active");
126 }
127 return dataBusinessHandler;
128 }
129
130
131
132
133 public FtpSession getFtpSession() {
134 return session;
135 }
136
137
138
139
140
141 public NetworkHandler getNetworkHandler() {
142 return session.getBusinessHandler().getNetworkHandler();
143 }
144
145
146
147
148
149
150 @Override
151 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
152 logger.debug("Data Channel closed with a session ? "+(session !=null));
153 if (session != null) {
154 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
155 session.getDataConn().getFtpTransferControl().setPreEndOfTransfer();
156 } else {
157 session.getDataConn().getFtpTransferControl().setTransferAbortedFromInternal(true);
158 }
159 session.getDataConn().unbindPassive();
160 try {
161 getDataBusinessHandler().executeChannelClosed();
162
163 getDataBusinessHandler().clear();
164 } catch (FtpNoConnectionException e1) {
165 }
166 dataBusinessHandler = null;
167 channelPipeline = null;
168 dataChannel = null;
169 }
170 super.channelInactive(ctx);
171 }
172
173 protected void setSession(Channel channel) {
174
175 for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
176 session = configuration.getFtpSession(channel, isActive);
177 if (session == null) {
178 logger.warn("Session not found at try " + i);
179 try {
180 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
181 } catch (InterruptedException e1) {
182 break;
183 }
184 } else {
185 break;
186 }
187 }
188 if (session == null) {
189
190 logger.error("Session not found!");
191 WaarpSslUtility.closingSslChannel(channel);
192
193
194 return;
195 }
196 }
197
198
199
200
201
202 @Override
203 public void channelActive(ChannelHandlerContext ctx) throws Exception {
204 Channel channel = ctx.channel();
205 channel.config().setAutoRead(false);
206 if (session == null) {
207 setSession(channel);
208 }
209 logger.debug("Data Channel opened as "+channel);
210 if (session == null) {
211 logger.debug("DataChannel immediately closed since no session is assigned");
212 WaarpSslUtility.closingSslChannel(ctx.channel());
213 return;
214 }
215 channelPipeline = ctx.pipeline();
216 dataChannel = channel;
217 dataBusinessHandler.setFtpSession(getFtpSession());
218 FtpChannelUtils.addDataChannel(channel, session.getConfiguration());
219 logger.debug("DataChannel connected: " + session.getReplyCode());
220 if (session.getReplyCode().getCode() >= 400) {
221
222 switch (session.getCurrentCommand().getCode()) {
223 case RETR:
224 case APPE:
225 case STOR:
226 case STOU:
227
228 logger.debug("DataChannel immediately closed since " + session.getCurrentCommand().getCode()
229 + " is not ok at startup");
230 WaarpSslUtility.closingSslChannel(ctx.channel());
231 return;
232 default:
233 break;
234 }
235 }
236 if (isStillAlive()) {
237 setCorrectCodec();
238 unlockModeCodec();
239 session.getDataConn().getFtpTransferControl().setOpenedDataChannel(channel, this);
240 logger.debug("DataChannel fully configured");
241 } else {
242
243 logger.debug("Connected but no more alive so will disconnect");
244 session.getDataConn().getFtpTransferControl().setOpenedDataChannel(null, this);
245 return;
246 }
247 }
248
249
250
251
252 public void setCorrectCodec() {
253 FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
254 .get(FtpDataInitializer.CODEC_MODE);
255 FtpDataTypeCodec typeCodec = (FtpDataTypeCodec) channelPipeline
256 .get(FtpDataInitializer.CODEC_TYPE);
257 FtpDataStructureCodec structureCodec = (FtpDataStructureCodec) channelPipeline
258 .get(FtpDataInitializer.CODEC_STRUCTURE);
259 if (modeCodec == null || typeCodec == null || structureCodec == null) {
260 return;
261 }
262 modeCodec.setMode(session.getDataConn().getMode());
263 modeCodec.setStructure(session.getDataConn().getStructure());
264 typeCodec.setFullType(session.getDataConn().getType(), session
265 .getDataConn().getSubType());
266 structureCodec.setStructure(session.getDataConn().getStructure());
267 logger.debug("codec setup");
268 }
269
270
271
272
273
274 public void unlockModeCodec() {
275 FtpDataModeCodec modeCodec = (FtpDataModeCodec) channelPipeline
276 .get(FtpDataInitializer.CODEC_MODE);
277 modeCodec.setCodecReady();
278 }
279
280
281
282
283
284 @Override
285 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
286 if (session == null) {
287 logger.debug("Error without any session active {}", cause);
288 return;
289 }
290 Throwable e1 = cause;
291 if (e1 instanceof ConnectException) {
292 ConnectException e2 = (ConnectException) e1;
293 logger.warn("Connection impossible since {}", e2.getMessage());
294 } else if (e1 instanceof ChannelException) {
295 ChannelException e2 = (ChannelException) e1;
296 logger.warn("Connection (example: timeout) impossible since {}", e2
297 .getMessage());
298 } else if (e1 instanceof ClosedChannelException) {
299 logger.debug("Connection closed before end");
300 } else if (e1 instanceof InvalidArgumentException) {
301 InvalidArgumentException e2 = (InvalidArgumentException) e1;
302 logger.warn("Bad configuration in Codec in {}", e2.getMessage());
303 } else if (e1 instanceof NullPointerException) {
304 NullPointerException e2 = (NullPointerException) e1;
305 logger.warn("Null pointer Exception", e2);
306 try {
307 if (dataBusinessHandler != null) {
308 dataBusinessHandler.exceptionLocalCaught(e1);
309 if (session.getDataConn() != null) {
310 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
311 session.getDataConn().getFtpTransferControl()
312 .setTransferAbortedFromInternal(true);
313 }
314 }
315 }
316 } catch (NullPointerException e3) {
317 }
318 return;
319 } else if (e1 instanceof CancelledKeyException) {
320 CancelledKeyException e2 = (CancelledKeyException) e1;
321 logger.warn("Connection aborted since {}", e2.getMessage());
322
323
324 return;
325 } else if (e1 instanceof IOException) {
326 IOException e2 = (IOException) e1;
327 logger.warn("Connection aborted since {}", e2.getMessage());
328 } else if (e1 instanceof NotYetConnectedException) {
329 NotYetConnectedException e2 = (NotYetConnectedException) e1;
330 logger.debug("Ignore this exception {}", e2.getMessage());
331 return;
332 } else if (e1 instanceof BindException) {
333 BindException e2 = (BindException) e1;
334 logger.warn("Address already in use {}", e2.getMessage());
335 } else if (e1 instanceof ConnectException) {
336 ConnectException e2 = (ConnectException) e1;
337 logger.warn("Timeout occurs {}", e2.getMessage());
338 } else {
339 logger.warn("Unexpected exception from Outband: {}", e1.getMessage(), e1);
340 }
341 if (dataBusinessHandler != null) {
342 dataBusinessHandler.exceptionLocalCaught(e1);
343 }
344 if (session.getDataConn().checkCorrectChannel(ctx.channel())) {
345 session.getDataConn().getFtpTransferControl()
346 .setTransferAbortedFromInternal(true);
347 }
348 }
349
350 public void setFtpTransfer(FtpTransfer ftpTransfer) {
351 this.ftpTransfer = ftpTransfer;
352 }
353
354
355
356
357 @Override
358 public void channelRead0(ChannelHandlerContext ctx, DataBlock dataBlock) {
359 if (ftpTransfer == null) {
360 try {
361 ftpTransfer = session.getDataConn().getFtpTransferControl().getExecutingFtpTransfer();
362 } catch (FtpNoTransferException e) {
363 logger.debug(e);
364 session.getDataConn().getFtpTransferControl()
365 .setTransferAbortedFromInternal(true);
366 }
367 if (ftpTransfer == null) {
368 logger.debug("No ExecutionFtpTransfer found");
369 session.getDataConn().getFtpTransferControl()
370 .setTransferAbortedFromInternal(true);
371 return;
372 }
373 }
374 try {
375 if (isStillAlive()) {
376 try {
377 ftpTransfer.getFtpFile().writeDataBlock(dataBlock);
378 } catch (FtpNoFileException e1) {
379 logger.debug(e1);
380 session.getDataConn().getFtpTransferControl()
381 .setTransferAbortedFromInternal(true);
382 return;
383 } catch (FileTransferException e1) {
384 logger.debug(e1);
385 session.getDataConn().getFtpTransferControl()
386 .setTransferAbortedFromInternal(true);
387 }
388 } else {
389
390 session.getDataConn().getFtpTransferControl()
391 .setTransferAbortedFromInternal(true);
392 WaarpSslUtility.closingSslChannel(ctx.channel());
393 }
394 } finally {
395 dataBlock.getBlock().release();
396 }
397 }
398
399
400
401
402
403
404
405 public boolean writeMessage(String message) {
406 DataBlock dataBlock = new DataBlock();
407 dataBlock.setEOF(true);
408 ByteBuf buffer = Unpooled.wrappedBuffer(message.getBytes(WaarpStringUtils.UTF8));
409 dataBlock.setBlock(buffer);
410 ChannelFuture future;
411 logger.debug("Will write: " + buffer.toString(WaarpStringUtils.UTF8));
412 try {
413 future = dataChannel.writeAndFlush(dataBlock);
414 future.await(FtpConfiguration.getDATATIMEOUTCON());
415 } catch (InterruptedException e) {
416 logger.debug("Interrupted", e);
417 return false;
418 }
419 logger.debug("Write result: " + future.isSuccess(), future.cause());
420 return future.isSuccess();
421 }
422
423
424
425
426
427
428 private boolean isStillAlive() {
429 if (session.getConfiguration().isShutdown()) {
430 session.setExitErrorCode("Service is going down: disconnect");
431 return false;
432 }
433 return true;
434 }
435 }