1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.protocol.utils;
21
22 import io.netty.channel.ChannelFuture;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.GenericFutureListener;
25 import org.waarp.common.database.DbAdmin;
26 import org.waarp.common.digest.FilesystemBasedDigest;
27 import org.waarp.common.digest.FilesystemBasedDigest.DigestAlgo;
28 import org.waarp.common.file.DataBlock;
29 import org.waarp.common.logging.SysErrLogger;
30 import org.waarp.common.logging.WaarpLogger;
31 import org.waarp.common.logging.WaarpLoggerFactory;
32 import org.waarp.common.utility.WaarpNettyUtil;
33 import org.waarp.common.utility.WaarpShutdownHook;
34 import org.waarp.common.utility.WaarpSystemUtil;
35 import org.waarp.openr66.context.R66FiniteDualStates;
36 import org.waarp.openr66.context.R66Session;
37 import org.waarp.openr66.context.task.localexec.LocalExecClient;
38 import org.waarp.openr66.database.data.DbTaskRunner;
39 import org.waarp.openr66.protocol.configuration.Configuration;
40 import org.waarp.openr66.protocol.configuration.Messages;
41 import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
42 import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
43 import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
44 import org.waarp.openr66.protocol.localhandler.packet.DataPacket;
45 import org.waarp.openr66.protocol.localhandler.packet.EndTransferPacket;
46 import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
47 import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
48 import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
49 import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
50 import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
51 import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
52 import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
53
54 import java.lang.management.ManagementFactory;
55
56 import static org.waarp.openr66.database.DbConstantR66.*;
57
58
59
60
61 public class ChannelUtils extends Thread {
62
63
64
65 private static final WaarpLogger logger =
66 WaarpLoggerFactory.getLogger(ChannelUtils.class);
67
68 public static final Integer NOCHANNEL = Integer.MIN_VALUE;
69
70
71
72
73
74
75 private static int terminateCommandChannels() {
76 if (Configuration.configuration.getServerChannelGroup() == null) {
77 return 0;
78 }
79 final int result =
80 Configuration.configuration.getServerChannelGroup().size();
81 logger.info("ServerChannelGroup: {}", result);
82 Configuration.configuration.getServerChannelGroup().close();
83 return result;
84 }
85
86
87
88
89
90
91
92 private static int terminateClientChannels() {
93 if (Configuration.configuration.getServerConnectedChannelGroup() == null) {
94 return 0;
95 }
96 final int result =
97 Configuration.configuration.getServerConnectedChannelGroup().size();
98 logger.info("ServerConnectedChannelGroup: {}", result);
99 Configuration.configuration.getServerConnectedChannelGroup().close();
100 return result;
101 }
102
103
104
105
106
107
108 private static int terminateHttpChannels() {
109 if (Configuration.configuration.getHttpChannelGroup() == null) {
110 return 0;
111 }
112 final int result = Configuration.configuration.getHttpChannelGroup().size();
113 logger.debug("HttpChannelGroup: {}", result);
114 Configuration.configuration.getHttpChannelGroup().close();
115 return result;
116 }
117
118
119
120
121
122
123
124
125 public static int nbCommandChannels(final Configuration configuration) {
126 int nb = 0;
127 if (Configuration.configuration.getServerConnectedChannelGroup() != null) {
128 nb += configuration.getServerConnectedChannelGroup().size();
129 }
130 if (configuration.getHttpChannelGroup() != null) {
131 nb += configuration.getHttpChannelGroup().size();
132 }
133 return nb;
134 }
135
136
137
138
139
140
141
142
143
144
145 public static ChannelFuture writeBackDataBlock(
146 final LocalChannelReference localChannelReference,
147 final FilesystemBasedDigest digestGlobal, final DataBlock block,
148 final FilesystemBasedDigest digestBlock)
149 throws OpenR66ProtocolPacketException {
150 byte[] md5 = {};
151 final DbTaskRunner runner = localChannelReference.getSession().getRunner();
152 final byte[] dataBlock = block.getByteBlock();
153 final int length = block.getByteCount();
154 if (digestBlock != null) {
155 if (digestGlobal != null) {
156 digestGlobal.Update(dataBlock, 0, length);
157 }
158 digestBlock.Update(dataBlock, 0, length);
159 md5 = digestBlock.Final();
160 } else if (RequestPacket.isSendThroughMode(runner.getMode()) &&
161 RequestPacket.isMD5Mode(runner.getMode())) {
162 final DigestAlgo algo =
163 localChannelReference.getPartner().getDigestAlgo();
164 md5 = FileUtils.getHash(dataBlock, length, algo, digestGlobal);
165 } else if (digestGlobal != null) {
166 digestGlobal.Update(dataBlock, 0, length);
167 }
168 if (runner.getRank() % 100 == 1 ||
169 localChannelReference.getSessionState() != R66FiniteDualStates.DATAS) {
170 localChannelReference.sessionNewState(R66FiniteDualStates.DATAS);
171 }
172 final DataPacket data =
173 new DataPacket(runner.getRank(), dataBlock, length, md5);
174 if (localChannelReference.getSession().isCompressionEnabled()) {
175 R66Session.getCodec().compress(data, localChannelReference.getSession());
176 }
177 if (logger.isDebugEnabled()) {
178 logger.debug("DIGEST {} for {} to {} bytes at rank{} using {} at rank {}",
179 FilesystemBasedDigest.getHex(data.getKey()), length,
180 data.getLengthPacket(), data.getPacketRank(),
181 localChannelReference.getPartner().getDigestAlgo(),
182 runner.getRank());
183 }
184 final ChannelFuture future =
185 writeAbstractLocalPacket(localChannelReference, data, false);
186 runner.incrementRank();
187 return future;
188 }
189
190
191
192
193
194
195
196
197 public static void writeEndTransfer(
198 final LocalChannelReference localChannelReference)
199 throws OpenR66ProtocolPacketException {
200 final EndTransferPacket packet =
201 new EndTransferPacket(LocalPacketFactory.REQUESTPACKET);
202 localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
203 writeAbstractLocalPacket(localChannelReference, packet, false);
204 }
205
206
207
208
209
210
211
212
213
214 public static void writeEndTransfer(
215 final LocalChannelReference localChannelReference, final String hash)
216 throws OpenR66ProtocolPacketException {
217 final EndTransferPacket packet =
218 new EndTransferPacket(LocalPacketFactory.REQUESTPACKET, hash);
219 localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
220 writeAbstractLocalPacket(localChannelReference, packet, false);
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234 public static ChannelFuture writeAbstractLocalPacket(
235 final LocalChannelReference localChannelReference,
236 final AbstractLocalPacket packet, final boolean wait)
237 throws OpenR66ProtocolPacketException {
238 final NetworkPacket networkPacket;
239 try {
240 networkPacket = new NetworkPacket(localChannelReference.getLocalId(),
241 localChannelReference.getRemoteId(),
242 packet, localChannelReference);
243 } catch (final OpenR66ProtocolPacketException e) {
244 logger.error(Messages.getString("ChannelUtils.6") + packet,
245
246 e);
247 throw e;
248 }
249 final boolean addListener = packet instanceof ErrorPacket &&
250 ((ErrorPacket) packet).getCode() ==
251 ErrorPacket.FORWARDCLOSECODE;
252 final ChannelFuture future =
253 localChannelReference.getNetworkChannel().writeAndFlush(networkPacket);
254 if (addListener) {
255 future.addListener(new GenericFutureListener<Future<? super Void>>() {
256
257 @Override
258 public void operationComplete(final Future<? super Void> future) {
259 localChannelReference.close();
260 }
261 });
262 }
263 final NetworkServerHandler nsh =
264 localChannelReference.getNetworkServerHandler();
265 if (nsh != null) {
266 nsh.resetKeepAlive();
267 }
268 final NetworkChannelReference ncr =
269 localChannelReference.getNetworkChannelObject();
270 if (ncr != null) {
271 ncr.use();
272 }
273 if (wait) {
274 WaarpNettyUtil.awaitOrInterrupted(future);
275 }
276 return future;
277 }
278
279
280
281
282 public static void exit() {
283 logger.info("Current launched threads before exit: {}",
284 ManagementFactory.getThreadMXBean().getThreadCount());
285 if (Configuration.configuration.getConstraintLimitHandler() != null) {
286 Configuration.configuration.getConstraintLimitHandler().release();
287 }
288
289 if (admin != null) {
290 TransferUtils.stopSelectedTransfers(admin.getSession(), 0, null, null,
291 null, null, null, null, null, null,
292 null, true, true, true);
293 }
294 Configuration.configuration.setShutdown(true);
295 Configuration.configuration.prepareServerStop();
296 long delay = Configuration.configuration.getTimeoutCon();
297
298 if (Configuration.configuration.getLocalTransaction() != null) {
299 final int nb = Configuration.configuration.getLocalTransaction()
300 .getNumberLocalChannel();
301 Configuration.configuration.getLocalTransaction().shutdownLocalChannels();
302 if (nb == 1) {
303 delay /= 3;
304 }
305 }
306 logger.info("Unbind server network services");
307 Configuration.configuration.unbindServer();
308 logger.info("Exit Shutdown Command");
309 terminateCommandChannels();
310 logger.warn(
311 Messages.getString("ChannelUtils.7") + delay + " ms");
312 try {
313 Thread.sleep(delay);
314 } catch (final InterruptedException e) {
315 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
316 }
317 NetworkTransaction.stopAllEndRetrieve();
318 if (Configuration.configuration.getLocalTransaction() != null) {
319 Configuration.configuration.getLocalTransaction()
320 .debugPrintActiveLocalChannels();
321 }
322 if (Configuration.configuration.getGlobalTrafficShapingHandler() != null) {
323 Configuration.configuration.getGlobalTrafficShapingHandler().release();
324 }
325 logger.info("Exit Shutdown Http");
326 terminateHttpChannels();
327 logger.info("Exit Shutdown Local");
328 if (Configuration.configuration.getLocalTransaction() != null) {
329 Configuration.configuration.getLocalTransaction().closeAll();
330 }
331 logger.info("Exit Shutdown LocalExec");
332 if (Configuration.configuration.isUseLocalExec()) {
333 LocalExecClient.releaseResources();
334 }
335 logger.info("Exit Shutdown Connected Client");
336 terminateClientChannels();
337 logger.info("Exit Shutdown ServerStop");
338 Configuration.configuration.serverStop();
339 logger.info("Exit Shutdown Db Connection");
340 DbAdmin.closeAllConnection();
341 logger.warn(Messages.getString("ChannelUtils.15"));
342 SysErrLogger.FAKE_LOGGER.syserr(
343 Messages.getString("ChannelUtils.15"));
344 WaarpSystemUtil.stopLogger(false);
345 }
346
347
348
349
350
351 @Override
352 public void run() {
353 logger.info("Should restart? {}", WaarpShutdownHook.isRestart());
354 WaarpShutdownHook.terminate(false);
355 }
356
357
358
359
360 public static void startShutdown() {
361 if (WaarpShutdownHook.isInShutdown()) {
362 return;
363 }
364 final Thread thread = new Thread(new ChannelUtils(), "R66 Shutdown Thread");
365 thread.setDaemon(false);
366 thread.start();
367 }
368 }