View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.protocol.utils;
21  
22  import io.netty.channel.ChannelFuture;
23  import io.netty.util.concurrent.Future;
24  import io.netty.util.concurrent.GenericFutureListener;
25  import org.waarp.common.database.DbAdmin;
26  import org.waarp.common.digest.FilesystemBasedDigest;
27  import org.waarp.common.digest.FilesystemBasedDigest.DigestAlgo;
28  import org.waarp.common.file.DataBlock;
29  import org.waarp.common.logging.SysErrLogger;
30  import org.waarp.common.logging.WaarpLogger;
31  import org.waarp.common.logging.WaarpLoggerFactory;
32  import org.waarp.common.utility.WaarpNettyUtil;
33  import org.waarp.common.utility.WaarpShutdownHook;
34  import org.waarp.common.utility.WaarpSystemUtil;
35  import org.waarp.openr66.context.R66FiniteDualStates;
36  import org.waarp.openr66.context.R66Session;
37  import org.waarp.openr66.context.task.localexec.LocalExecClient;
38  import org.waarp.openr66.database.data.DbTaskRunner;
39  import org.waarp.openr66.protocol.configuration.Configuration;
40  import org.waarp.openr66.protocol.configuration.Messages;
41  import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
42  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
43  import org.waarp.openr66.protocol.localhandler.packet.AbstractLocalPacket;
44  import org.waarp.openr66.protocol.localhandler.packet.DataPacket;
45  import org.waarp.openr66.protocol.localhandler.packet.EndTransferPacket;
46  import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
47  import org.waarp.openr66.protocol.localhandler.packet.LocalPacketFactory;
48  import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
49  import org.waarp.openr66.protocol.networkhandler.NetworkChannelReference;
50  import org.waarp.openr66.protocol.networkhandler.NetworkServerHandler;
51  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
52  import org.waarp.openr66.protocol.networkhandler.packet.NetworkPacket;
53  
54  import java.lang.management.ManagementFactory;
55  
56  import static org.waarp.openr66.database.DbConstantR66.*;
57  
58  /**
59   * Channel Utils
60   */
61  public class ChannelUtils extends Thread {
62    /**
63     * Internal Logger
64     */
65    private static final WaarpLogger logger =
66        WaarpLoggerFactory.getLogger(ChannelUtils.class);
67  
68    public static final Integer NOCHANNEL = Integer.MIN_VALUE;
69  
70    /**
71     * Terminate all registered channels
72     *
73     * @return the number of previously registered network channels
74     */
75    private static int terminateCommandChannels() {
76      if (Configuration.configuration.getServerChannelGroup() == null) {
77        return 0;
78      }
79      final int result =
80          Configuration.configuration.getServerChannelGroup().size();
81      logger.info("ServerChannelGroup: {}", result);
82      Configuration.configuration.getServerChannelGroup().close();
83      return result;
84    }
85  
86    /**
87     * Terminate all registered connected client channels
88     *
89     * @return the number of previously registered network connected client
90     *     channels
91     */
92    private static int terminateClientChannels() {
93      if (Configuration.configuration.getServerConnectedChannelGroup() == null) {
94        return 0;
95      }
96      final int result =
97          Configuration.configuration.getServerConnectedChannelGroup().size();
98      logger.info("ServerConnectedChannelGroup: {}", result);
99      Configuration.configuration.getServerConnectedChannelGroup().close();
100     return result;
101   }
102 
103   /**
104    * Terminate all registered Http channels
105    *
106    * @return the number of previously registered http network channels
107    */
108   private static int terminateHttpChannels() {
109     if (Configuration.configuration.getHttpChannelGroup() == null) {
110       return 0;
111     }
112     final int result = Configuration.configuration.getHttpChannelGroup().size();
113     logger.debug("HttpChannelGroup: {}", result);
114     Configuration.configuration.getHttpChannelGroup().close();
115     return result;
116   }
117 
118   /**
119    * Return the current number of network connections
120    *
121    * @param configuration
122    *
123    * @return the current number of network connections
124    */
125   public static int nbCommandChannels(final Configuration configuration) {
126     int nb = 0;
127     if (Configuration.configuration.getServerConnectedChannelGroup() != null) {
128       nb += configuration.getServerConnectedChannelGroup().size();
129     }
130     if (configuration.getHttpChannelGroup() != null) {
131       nb += configuration.getHttpChannelGroup().size();
132     }
133     return nb;
134   }
135 
136   /**
137    * @param localChannelReference
138    * @param digestGlobal
139    * @param block
140    *
141    * @return the ChannelFuture of this write operation
142    *
143    * @throws OpenR66ProtocolPacketException
144    */
145   public static ChannelFuture writeBackDataBlock(
146       final LocalChannelReference localChannelReference,
147       final FilesystemBasedDigest digestGlobal, final DataBlock block,
148       final FilesystemBasedDigest digestBlock)
149       throws OpenR66ProtocolPacketException {
150     byte[] md5 = {};
151     final DbTaskRunner runner = localChannelReference.getSession().getRunner();
152     final byte[] dataBlock = block.getByteBlock();
153     final int length = block.getByteCount();
154     if (digestBlock != null) {
155       if (digestGlobal != null) {
156         digestGlobal.Update(dataBlock, 0, length);
157       }
158       digestBlock.Update(dataBlock, 0, length);
159       md5 = digestBlock.Final();
160     } else if (RequestPacket.isSendThroughMode(runner.getMode()) &&
161                RequestPacket.isMD5Mode(runner.getMode())) {
162       final DigestAlgo algo =
163           localChannelReference.getPartner().getDigestAlgo();
164       md5 = FileUtils.getHash(dataBlock, length, algo, digestGlobal);
165     } else if (digestGlobal != null) {
166       digestGlobal.Update(dataBlock, 0, length);
167     }
168     if (runner.getRank() % 100 == 1 ||
169         localChannelReference.getSessionState() != R66FiniteDualStates.DATAS) {
170       localChannelReference.sessionNewState(R66FiniteDualStates.DATAS);
171     }
172     final DataPacket data =
173         new DataPacket(runner.getRank(), dataBlock, length, md5);
174     if (localChannelReference.getSession().isCompressionEnabled()) {
175       R66Session.getCodec().compress(data, localChannelReference.getSession());
176     }
177     if (logger.isDebugEnabled()) {
178       logger.debug("DIGEST {} for {} to {} bytes at rank{} using {} at rank {}",
179                    FilesystemBasedDigest.getHex(data.getKey()), length,
180                    data.getLengthPacket(), data.getPacketRank(),
181                    localChannelReference.getPartner().getDigestAlgo(),
182                    runner.getRank());
183     }
184     final ChannelFuture future =
185         writeAbstractLocalPacket(localChannelReference, data, false);
186     runner.incrementRank();
187     return future;
188   }
189 
190   /**
191    * Write the EndTransfer
192    *
193    * @param localChannelReference
194    *
195    * @throws OpenR66ProtocolPacketException
196    */
197   public static void writeEndTransfer(
198       final LocalChannelReference localChannelReference)
199       throws OpenR66ProtocolPacketException {
200     final EndTransferPacket packet =
201         new EndTransferPacket(LocalPacketFactory.REQUESTPACKET);
202     localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
203     writeAbstractLocalPacket(localChannelReference, packet, false);
204   }
205 
206   /**
207    * Write the EndTransfer plus Global Hash
208    *
209    * @param localChannelReference
210    * @param hash
211    *
212    * @throws OpenR66ProtocolPacketException
213    */
214   public static void writeEndTransfer(
215       final LocalChannelReference localChannelReference, final String hash)
216       throws OpenR66ProtocolPacketException {
217     final EndTransferPacket packet =
218         new EndTransferPacket(LocalPacketFactory.REQUESTPACKET, hash);
219     localChannelReference.sessionNewState(R66FiniteDualStates.ENDTRANSFERS);
220     writeAbstractLocalPacket(localChannelReference, packet, false);
221   }
222 
223   /**
224    * Write an AbstractLocalPacket to the network Channel
225    *
226    * @param localChannelReference
227    * @param packet
228    * @param wait
229    *
230    * @return the ChannelFuture on write operation
231    *
232    * @throws OpenR66ProtocolPacketException
233    */
234   public static ChannelFuture writeAbstractLocalPacket(
235       final LocalChannelReference localChannelReference,
236       final AbstractLocalPacket packet, final boolean wait)
237       throws OpenR66ProtocolPacketException {
238     final NetworkPacket networkPacket;
239     try {
240       networkPacket = new NetworkPacket(localChannelReference.getLocalId(),
241                                         localChannelReference.getRemoteId(),
242                                         packet, localChannelReference);
243     } catch (final OpenR66ProtocolPacketException e) {
244       logger.error(Messages.getString("ChannelUtils.6") + packet,
245                    //$NON-NLS-1$
246                    e);
247       throw e;
248     }
249     final boolean addListener = packet instanceof ErrorPacket &&
250                                 ((ErrorPacket) packet).getCode() ==
251                                 ErrorPacket.FORWARDCLOSECODE;
252     final ChannelFuture future =
253         localChannelReference.getNetworkChannel().writeAndFlush(networkPacket);
254     if (addListener) {
255       future.addListener(new GenericFutureListener<Future<? super Void>>() {
256 
257         @Override
258         public void operationComplete(final Future<? super Void> future) {
259           localChannelReference.close();
260         }
261       });
262     }
263     final NetworkServerHandler nsh =
264         localChannelReference.getNetworkServerHandler();
265     if (nsh != null) {
266       nsh.resetKeepAlive();
267     }
268     final NetworkChannelReference ncr =
269         localChannelReference.getNetworkChannelObject();
270     if (ncr != null) {
271       ncr.use();
272     }
273     if (wait) {
274       WaarpNettyUtil.awaitOrInterrupted(future);
275     }
276     return future;
277   }
278 
279   /**
280    * Exit global ChannelFactory
281    */
282   public static void exit() {
283     logger.info("Current launched threads before exit: {}",
284                 ManagementFactory.getThreadMXBean().getThreadCount());
285     if (Configuration.configuration.getConstraintLimitHandler() != null) {
286       Configuration.configuration.getConstraintLimitHandler().release();
287     }
288     // First try to StopAll
289     if (admin != null) {
290       TransferUtils.stopSelectedTransfers(admin.getSession(), 0, null, null,
291                                           null, null, null, null, null, null,
292                                           null, true, true, true);
293     }
294     Configuration.configuration.setShutdown(true);
295     Configuration.configuration.prepareServerStop();
296     long delay = Configuration.configuration.getTimeoutCon();
297     // Inform others that shutdown
298     if (Configuration.configuration.getLocalTransaction() != null) {
299       final int nb = Configuration.configuration.getLocalTransaction()
300                                                 .getNumberLocalChannel();
301       Configuration.configuration.getLocalTransaction().shutdownLocalChannels();
302       if (nb == 1) {
303         delay /= 3;
304       }
305     }
306     logger.info("Unbind server network services");
307     Configuration.configuration.unbindServer();
308     logger.info("Exit Shutdown Command");
309     terminateCommandChannels();
310     logger.warn(
311         Messages.getString("ChannelUtils.7") + delay + " ms"); //$NON-NLS-1$
312     try {
313       Thread.sleep(delay);
314     } catch (final InterruptedException e) {//NOSONAR
315       SysErrLogger.FAKE_LOGGER.ignoreLog(e);
316     }
317     NetworkTransaction.stopAllEndRetrieve();
318     if (Configuration.configuration.getLocalTransaction() != null) {
319       Configuration.configuration.getLocalTransaction()
320                                  .debugPrintActiveLocalChannels();
321     }
322     if (Configuration.configuration.getGlobalTrafficShapingHandler() != null) {
323       Configuration.configuration.getGlobalTrafficShapingHandler().release();
324     }
325     logger.info("Exit Shutdown Http");
326     terminateHttpChannels();
327     logger.info("Exit Shutdown Local");
328     if (Configuration.configuration.getLocalTransaction() != null) {
329       Configuration.configuration.getLocalTransaction().closeAll();
330     }
331     logger.info("Exit Shutdown LocalExec");
332     if (Configuration.configuration.isUseLocalExec()) {
333       LocalExecClient.releaseResources();
334     }
335     logger.info("Exit Shutdown Connected Client");
336     terminateClientChannels();
337     logger.info("Exit Shutdown ServerStop");
338     Configuration.configuration.serverStop();
339     logger.info("Exit Shutdown Db Connection");
340     DbAdmin.closeAllConnection();
341     logger.warn(Messages.getString("ChannelUtils.15")); //$NON-NLS-1$
342     SysErrLogger.FAKE_LOGGER.syserr(
343         Messages.getString("ChannelUtils.15")); //$NON-NLS-1$
344     WaarpSystemUtil.stopLogger(false);
345   }
346 
347   /**
348    * This function is the top function to be called when the server is to be
349    * shutdown.
350    */
351   @Override
352   public void run() {
353     logger.info("Should restart? {}", WaarpShutdownHook.isRestart());
354     WaarpShutdownHook.terminate(false);
355   }
356 
357   /**
358    * Start Shutdown
359    */
360   public static void startShutdown() {
361     if (WaarpShutdownHook.isInShutdown()) {
362       return;
363     }
364     final Thread thread = new Thread(new ChannelUtils(), "R66 Shutdown Thread");
365     thread.setDaemon(false);
366     thread.start();
367   }
368 }