1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.ftp.core.config;
21
22 import io.netty.bootstrap.Bootstrap;
23 import io.netty.bootstrap.ServerBootstrap;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.EventLoopGroup;
28 import io.netty.channel.group.ChannelGroup;
29 import io.netty.channel.group.DefaultChannelGroup;
30 import io.netty.channel.nio.NioEventLoopGroup;
31 import io.netty.handler.traffic.ChannelTrafficShapingHandler;
32 import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
33 import io.netty.util.concurrent.EventExecutorGroup;
34 import org.waarp.common.command.exception.Reply425Exception;
35 import org.waarp.common.crypto.ssl.WaarpSslUtility;
36 import org.waarp.common.logging.WaarpLogger;
37 import org.waarp.common.logging.WaarpLoggerFactory;
38 import org.waarp.common.utility.DetectionUtils;
39 import org.waarp.common.utility.WaarpNettyUtil;
40 import org.waarp.common.utility.WaarpShutdownHook;
41 import org.waarp.common.utility.WaarpThreadFactory;
42 import org.waarp.ftp.core.control.FtpInitializer;
43 import org.waarp.ftp.core.control.ftps.FtpsInitializer;
44 import org.waarp.ftp.core.data.handler.FtpDataInitializer;
45 import org.waarp.ftp.core.data.handler.ftps.FtpsDataInitializer;
46 import org.waarp.ftp.core.exception.FtpNoConnectionException;
47 import org.waarp.ftp.core.session.FtpSession;
48 import org.waarp.ftp.core.session.FtpSessionReference;
49 import org.waarp.ftp.core.utils.FtpChannelUtils;
50 import org.waarp.ftp.core.utils.FtpShutdownHook;
51
52 import java.net.InetAddress;
53 import java.net.InetSocketAddress;
54 import java.util.concurrent.ConcurrentHashMap;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.atomic.AtomicInteger;
59
60
61
62
63 public class FtpInternalConfiguration {
64
65
66
67
68 private static final WaarpLogger logger =
69 WaarpLoggerFactory.getLogger(FtpInternalConfiguration.class);
70
71
72
73
74
75 public static final long RETRYINMS = 10;
76
77
78
79
80 public static final int RETRYNB = 10;
81
82
83
84
85 static Boolean isUnix;
86
87
88
89
90
91
92 private ChannelGroup commandChannelGroup;
93
94
95
96
97 private final EventLoopGroup execServer;
98
99
100
101
102 private final EventLoopGroup execWorker;
103
104
105
106
107
108 private ChannelGroup dataChannelGroup;
109
110
111
112
113 private final EventLoopGroup execCommandEvent;
114
115
116
117
118 private final EventLoopGroup execDataEvent;
119
120
121
122
123 private final EventLoopGroup execDataServer;
124
125
126
127
128 private final EventLoopGroup execDataWorker;
129
130
131
132
133 private final FtpSessionReference ftpSessionReference =
134 new FtpSessionReference();
135
136
137
138
139 private Bootstrap activeBootstrap;
140
141
142
143
144 private ServerBootstrap passiveBootstrap;
145
146
147
148
149 private final ScheduledExecutorService executorService =
150 Executors.newScheduledThreadPool(2,
151 new WaarpThreadFactory("TimerTrafficFtp",
152 false));
153
154
155
156
157 private FtpGlobalTrafficShapingHandler globalTrafficShapingHandler;
158
159
160
161
162 private boolean usingNativeSsl;
163
164
165
166
167 private boolean acceptAuthProt;
168
169
170
171 private Bootstrap activeSslBootstrap;
172
173
174
175
176 private ServerBootstrap passiveSslBootstrap;
177
178
179
180
181 public static class BindAddress {
182
183
184
185 public final Channel parent;
186
187
188
189
190 public final AtomicInteger nbBind = new AtomicInteger();
191
192
193
194
195
196
197 public BindAddress(final Channel channel) {
198 parent = channel;
199 nbBind.set(0);
200 }
201 }
202
203
204
205
206 private final ConcurrentHashMap<InetSocketAddress, BindAddress>
207 hashBindPassiveDataConn =
208 new ConcurrentHashMap<InetSocketAddress, BindAddress>();
209
210
211
212
213 private final FtpConfiguration configuration;
214
215
216
217
218
219
220 public FtpInternalConfiguration(final FtpConfiguration configuration) {
221 this.configuration = configuration;
222 isUnix = !DetectionUtils.isWindows();
223 configuration.getShutdownConfiguration().timeout =
224 configuration.getTimeoutCon();
225 new FtpShutdownHook(configuration.getShutdownConfiguration(),
226 configuration);
227 execCommandEvent = new NioEventLoopGroup(configuration.getClientThread(),
228 new WaarpThreadFactory("Command"));
229 execDataEvent = new NioEventLoopGroup(configuration.getClientThread(),
230 new WaarpThreadFactory("Data"));
231 execServer = new NioEventLoopGroup(configuration.getServerThread(),
232 new WaarpThreadFactory("CommandServer"));
233 execWorker = new NioEventLoopGroup(configuration.getClientThread(),
234 new WaarpThreadFactory("CommandWorker"));
235 execDataServer = new NioEventLoopGroup(configuration.getServerThread(),
236 new WaarpThreadFactory(
237 "DataServer"));
238 execDataWorker = new NioEventLoopGroup(configuration.getClientThread() * 2,
239 new WaarpThreadFactory(
240 "DataWorker"));
241 }
242
243
244
245
246
247
248 public final void serverStartup() throws FtpNoConnectionException {
249 logger.debug("Start groups");
250
251 commandChannelGroup =
252 new DefaultChannelGroup(configuration.fromClass.getName(),
253 execWorker.next());
254
255 dataChannelGroup =
256 new DefaultChannelGroup(configuration.fromClass.getName() + ".data",
257 execWorker.next());
258
259 logger.debug("Start data connections");
260
261 passiveBootstrap = new ServerBootstrap();
262 WaarpNettyUtil.setServerBootstrap(passiveBootstrap, execDataServer,
263 execDataWorker,
264 (int) configuration.getTimeoutCon(),
265 configuration.getBlocksize() + 1024,
266 true);
267 if (usingNativeSsl) {
268 passiveBootstrap.childHandler(
269 new FtpsDataInitializer(configuration.dataBusinessHandler,
270 configuration, false));
271 } else {
272 passiveBootstrap.childHandler(
273 new FtpDataInitializer(configuration.dataBusinessHandler,
274 configuration, false));
275 }
276 if (acceptAuthProt) {
277 passiveSslBootstrap = new ServerBootstrap();
278 WaarpNettyUtil.setServerBootstrap(passiveSslBootstrap, execDataServer,
279 execDataWorker,
280 (int) configuration.getTimeoutCon(),
281 configuration.getBlocksize() + 1024,
282 true);
283 passiveSslBootstrap.childHandler(
284 new FtpsDataInitializer(configuration.dataBusinessHandler,
285 configuration, false));
286 } else {
287 passiveSslBootstrap = passiveBootstrap;
288 }
289
290
291 activeBootstrap = new Bootstrap();
292 WaarpNettyUtil.setBootstrap(activeBootstrap, execDataWorker,
293 (int) configuration.getTimeoutCon(),
294 configuration.getBlocksize() + 1024, true);
295 if (usingNativeSsl) {
296 activeBootstrap.handler(
297 new FtpsDataInitializer(configuration.dataBusinessHandler,
298 configuration, true));
299 } else {
300 activeBootstrap.handler(
301 new FtpDataInitializer(configuration.dataBusinessHandler,
302 configuration, true));
303 }
304 if (acceptAuthProt) {
305 activeSslBootstrap = new Bootstrap();
306 WaarpNettyUtil.setBootstrap(activeSslBootstrap, execDataWorker,
307 (int) configuration.getTimeoutCon(),
308 configuration.getBlocksize() + 1024, true);
309 activeSslBootstrap.handler(
310 new FtpsDataInitializer(configuration.dataBusinessHandler,
311 configuration, true));
312 } else {
313 activeSslBootstrap = activeBootstrap;
314 }
315
316 logger.debug("Start command connections {}", configuration.getServerPort());
317
318
319
320
321 final ServerBootstrap serverBootstrap = new ServerBootstrap();
322 WaarpNettyUtil.setServerBootstrap(serverBootstrap, execServer, execWorker,
323 (int) configuration.getTimeoutCon(),
324 configuration.getBlocksize(), true);
325 if (usingNativeSsl) {
326 serverBootstrap.childHandler(
327 new FtpsInitializer(configuration.businessHandler, configuration));
328 } else {
329 serverBootstrap.childHandler(
330 new FtpInitializer(configuration.businessHandler, configuration));
331 }
332 final InetSocketAddress socketAddress =
333 new InetSocketAddress(configuration.getServerPort());
334 ChannelFuture future = serverBootstrap.bind(socketAddress);
335 try {
336 future = future.sync();
337 } catch (final InterruptedException e) {
338 logger.error("Cannot start command conections: {}", e.getMessage());
339 throw new FtpNoConnectionException("Can't initiate the FTP server", e);
340 }
341 if (!future.isSuccess()) {
342 logger.error("Cannot start command conections");
343 throw new FtpNoConnectionException("Can't initiate the FTP server");
344 }
345 FtpChannelUtils.addCommandChannel(future.channel(), configuration);
346
347
348 configuration.getShutdownConfiguration().timeout =
349 configuration.getTimeoutCon();
350 WaarpShutdownHook.addShutdownHook();
351
352 globalTrafficShapingHandler =
353 new FtpGlobalTrafficShapingHandler(executorService,
354 configuration.getServerGlobalWriteLimit(),
355 configuration.getServerGlobalReadLimit(),
356 configuration.getServerChannelWriteLimit(),
357 configuration.getServerChannelReadLimit(),
358 configuration.getDelayLimit());
359 }
360
361
362
363
364 public final ExecutorService getWorker() {
365 return execWorker;
366 }
367
368
369
370
371
372
373
374
375 public final void setNewFtpSession(final InetAddress ipOnly,
376 final InetSocketAddress fullIp,
377 final FtpSession session) {
378 ftpSessionReference.setNewFtpSession(ipOnly, fullIp, session);
379 }
380
381
382
383
384
385
386
387
388 public final FtpSession getFtpSession(final Channel channel) {
389 return ftpSessionReference.getPassiveFtpSession(channel);
390 }
391
392
393
394
395
396
397
398 public final void delFtpSession(final InetAddress ipOnly,
399 final InetSocketAddress fullIp) {
400 ftpSessionReference.delFtpSession(ipOnly, fullIp);
401 }
402
403
404
405
406 public final int getNumberSessions() {
407 return ftpSessionReference.sessionsNumber();
408 }
409
410
411
412
413
414
415 public final FtpSession findPassiveFtpSession(final Channel channel) {
416 return ftpSessionReference.findPassive(channel);
417 }
418
419
420
421
422
423
424
425
426
427 public final void bindPassive(final InetSocketAddress address,
428 final boolean ssl) throws Reply425Exception {
429 configuration.bindLock();
430 try {
431 BindAddress bindAddress = hashBindPassiveDataConn.get(address);
432 if (bindAddress == null) {
433 logger.debug("Bind really to {}", address);
434 final Channel parentChannel;
435 try {
436 final ChannelFuture future;
437 if (ssl) {
438 future = passiveSslBootstrap.bind(address);
439 } else {
440 future = passiveBootstrap.bind(address);
441 }
442 if (future.await(configuration.getTimeoutCon())) {
443 parentChannel = future.sync().channel();
444 } else {
445 logger.warn("Cannot open passive connection due to Timeout");
446 throw new Reply425Exception(
447 "Cannot open a Passive Connection due to Timeout");
448 }
449 } catch (final ChannelException e) {
450 logger.warn("Cannot open passive connection {}", e.getMessage());
451 throw new Reply425Exception("Cannot open a Passive Connection");
452 } catch (final InterruptedException e) {
453 logger.warn("Cannot open passive connection {}", e.getMessage());
454 throw new Reply425Exception("Cannot open a Passive Connection");
455 }
456 bindAddress = new BindAddress(parentChannel);
457 FtpChannelUtils.addDataChannel(parentChannel, configuration);
458 hashBindPassiveDataConn.put(address, bindAddress);
459 }
460 bindAddress.nbBind.getAndIncrement();
461 logger.debug("Bind number to {} is {}", address, bindAddress.nbBind);
462 } finally {
463 configuration.bindUnlock();
464 }
465 }
466
467
468
469
470
471
472
473
474
475
476
477
478 public final void unbindPassive(final InetSocketAddress address) {
479 configuration.bindLock();
480 try {
481 final BindAddress bindAddress = hashBindPassiveDataConn.get(address);
482 if (bindAddress != null) {
483 bindAddress.nbBind.getAndDecrement();
484 logger.debug("Bind number to {} left is {}", address,
485 bindAddress.nbBind);
486 if (bindAddress.nbBind.get() == 0) {
487 final ChannelFuture future =
488 WaarpSslUtility.closingSslChannel(bindAddress.parent);
489 hashBindPassiveDataConn.remove(address);
490 future.awaitUninterruptibly();
491 }
492 } else {
493 logger.warn("No Bind to {}", address);
494 }
495 } finally {
496 configuration.bindUnlock();
497 }
498 }
499
500
501
502
503 public final int getNbBindedPassive() {
504 return hashBindPassiveDataConn.size();
505 }
506
507
508
509
510
511
512 public final EventExecutorGroup getExecutor() {
513 return execCommandEvent;
514 }
515
516
517
518
519
520
521 public final EventExecutorGroup getDataExecutor() {
522 return execDataEvent;
523 }
524
525
526
527
528
529
530 public final Bootstrap getActiveBootstrap(final boolean ssl) {
531 if (ssl) {
532 return activeSslBootstrap;
533 } else {
534 return activeBootstrap;
535 }
536 }
537
538
539
540
541 public final ChannelGroup getCommandChannelGroup() {
542 return commandChannelGroup;
543 }
544
545
546
547
548 public final ChannelGroup getDataChannelGroup() {
549 return dataChannelGroup;
550 }
551
552
553
554
555 public final FtpGlobalTrafficShapingHandler getGlobalTrafficShapingHandler() {
556 return globalTrafficShapingHandler;
557 }
558
559
560
561
562 public final ChannelTrafficShapingHandler newChannelTrafficShapingHandler() {
563 if (configuration.getServerChannelWriteLimit() == 0 &&
564 configuration.getServerChannelReadLimit() == 0) {
565 return null;
566 }
567 if (globalTrafficShapingHandler instanceof GlobalChannelTrafficShapingHandler) {
568 return null;
569 }
570 return new FtpChannelTrafficShapingHandler(
571 configuration.getServerChannelWriteLimit(),
572 configuration.getServerChannelReadLimit(),
573 configuration.getDelayLimit());
574 }
575
576 public final void releaseResources() {
577 WaarpSslUtility.forceCloseAllSslChannels();
578 execWorker.shutdownGracefully();
579 execDataWorker.shutdownGracefully();
580 execServer.shutdownGracefully();
581 execDataServer.shutdownGracefully();
582 if (globalTrafficShapingHandler != null) {
583 globalTrafficShapingHandler.release();
584 }
585 executorService.shutdown();
586 }
587
588 public final boolean isAcceptAuthProt() {
589 return acceptAuthProt;
590 }
591
592
593
594
595 public final boolean isUsingNativeSsl() {
596 return usingNativeSsl;
597 }
598
599
600
601
602 public final void setUsingNativeSsl(final boolean usingNativeSsl) {
603 this.usingNativeSsl = usingNativeSsl;
604 }
605
606
607
608
609 public final void setAcceptAuthProt(final boolean acceptAuthProt) {
610 this.acceptAuthProt = acceptAuthProt;
611 }
612
613 }