View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.commander;
21  
22  import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
23  import org.waarp.common.database.exception.WaarpDatabaseException;
24  import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
25  import org.waarp.common.database.exception.WaarpDatabaseSqlException;
26  import org.waarp.common.logging.WaarpLogger;
27  import org.waarp.common.logging.WaarpLoggerFactory;
28  import org.waarp.common.utility.WaarpThreadFactory;
29  import org.waarp.openr66.database.data.DbTaskRunner;
30  import org.waarp.openr66.protocol.configuration.Configuration;
31  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
32  
33  import java.util.concurrent.BlockingQueue;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.RejectedExecutionHandler;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.ScheduledFuture;
38  import java.util.concurrent.SynchronousQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  /**
43   * This class launch and control the Commander and enable TaskRunner job
44   * submissions
45   */
46  public class InternalRunner {
47    /**
48     * Internal Logger
49     */
50    private static final WaarpLogger logger =
51        WaarpLoggerFactory.getLogger(InternalRunner.class);
52  
53    private final ScheduledExecutorService scheduledExecutorService;
54    private ScheduledFuture<?> scheduledFuture;
55    private CommanderInterface commander;
56    private boolean isRunning = true;
57    private final ThreadPoolExecutor threadPoolExecutor;
58    private final NetworkTransaction networkTransaction;
59  
60    /**
61     * Create the structure to enable submission by database
62     *
63     * @throws WaarpDatabaseNoConnectionException
64     * @throws WaarpDatabaseSqlException
65     */
66    public InternalRunner()
67        throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
68      commander = new Commander(this, true);
69      // This is not daemon intentionally
70      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
71          new WaarpThreadFactory("InternalRunner", false));
72      isRunning = true;
73      final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
74      threadPoolExecutor = new ThreadPoolExecutor(
75          Configuration.configuration.getRunnerThread() / 2,
76          Configuration.configuration.getRunnerThread() * 2, 1000,
77          TimeUnit.MILLISECONDS, workQueue,
78          new WaarpThreadFactory("ClientRunner"), new RejectedExecutionHandler() {
79        @Override
80        public final void rejectedExecution(final Runnable runnable,
81                                            final ThreadPoolExecutor threadPoolExecutor) {
82          logger.debug("Task rescheduled");
83        }
84      });
85      scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
86                                                                        Configuration.configuration.getDelayCommander(),
87                                                                        Configuration.configuration.getDelayCommander(),
88                                                                        TimeUnit.MILLISECONDS);
89      networkTransaction = new NetworkTransaction();
90    }
91  
92    public final NetworkTransaction getNetworkTransaction() {
93      return networkTransaction;
94    }
95  
96    public final int allowedToSubmit() {
97      final int active = threadPoolExecutor.getActiveCount();
98      if ((isRunning || !Configuration.configuration.isShutdown()) &&
99          (active < Configuration.configuration.getRunnerThread())) {
100       return Configuration.configuration.getRunnerThread() - active;
101     }
102     return 0;
103   }
104 
105   /**
106    * Submit a task
107    *
108    * @param taskRunner
109    *
110    * @return True if launched, False if not since exceeding capacity
111    */
112   public final boolean submitTaskRunner(final DbTaskRunner taskRunner) {
113     if (isRunning || !Configuration.configuration.isShutdown()) {
114       logger.debug("Will run {}", taskRunner);
115       final ClientRunner runner =
116           new ClientRunner(networkTransaction, taskRunner, null);
117       if (taskRunner.isSendThrough() && (taskRunner.isRescheduledTransfer() ||
118                                          taskRunner.isPreTaskStarting())) {
119         runner.setSendThroughMode();
120         taskRunner.checkThroughMode();
121       }
122       try {
123         taskRunner.changeUpdatedInfo(UpdatedInfo.RUNNING);
124         taskRunner.update();
125       } catch (final WaarpDatabaseException e) {
126         logger.error("Error in Commander: {}", e.getMessage());
127         return false;
128       }
129       // create the client, connect and run
130       threadPoolExecutor.execute(runner);
131       return true;
132     }
133     return false;
134   }
135 
136   /**
137    * First step while shutting down the service
138    */
139   public final void prepareStopInternalRunner() {
140     isRunning = false;
141     scheduledFuture.cancel(false);
142     if (commander != null) {
143       commander.finalizeCommander();
144     }
145     scheduledExecutorService.shutdown();
146     threadPoolExecutor.shutdown();
147   }
148 
149   /**
150    * This should be called when the server is shutting down, after stopping
151    * active requests if possible.
152    */
153   public final void stopInternalRunner() {
154     isRunning = false;
155     logger.info("Stopping Commander and Runner Tasks");
156     scheduledFuture.cancel(true);
157     if (commander != null) {
158       commander.finalizeCommander();
159     }
160     scheduledExecutorService.shutdownNow();
161     threadPoolExecutor.shutdownNow();
162     networkTransaction.closeAll(false);
163   }
164 
165   public final int nbInternalRunner() {
166     return threadPoolExecutor.getActiveCount();
167   }
168 
169   public final void reloadInternalRunner()
170       throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
171     scheduledFuture.cancel(false);
172     if (commander != null) {
173       commander.finalizeCommander();
174     }
175     commander = new Commander(this);
176     scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
177                                                                       2 *
178                                                                       Configuration.configuration.getDelayCommander(),
179                                                                       Configuration.configuration.getDelayCommander(),
180                                                                       TimeUnit.MILLISECONDS);
181   }
182 }