1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.commander;
21
22 import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
23 import org.waarp.common.database.exception.WaarpDatabaseException;
24 import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
25 import org.waarp.common.database.exception.WaarpDatabaseSqlException;
26 import org.waarp.common.logging.WaarpLogger;
27 import org.waarp.common.logging.WaarpLoggerFactory;
28 import org.waarp.common.utility.WaarpThreadFactory;
29 import org.waarp.openr66.database.data.DbTaskRunner;
30 import org.waarp.openr66.protocol.configuration.Configuration;
31 import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
32
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.RejectedExecutionHandler;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.SynchronousQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42
43
44
45
46 public class InternalRunner {
47
48
49
50 private static final WaarpLogger logger =
51 WaarpLoggerFactory.getLogger(InternalRunner.class);
52
53 private final ScheduledExecutorService scheduledExecutorService;
54 private ScheduledFuture<?> scheduledFuture;
55 private CommanderInterface commander;
56 private boolean isRunning = true;
57 private final ThreadPoolExecutor threadPoolExecutor;
58 private final NetworkTransaction networkTransaction;
59
60
61
62
63
64
65
66 public InternalRunner()
67 throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
68 commander = new Commander(this, true);
69
70 scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
71 new WaarpThreadFactory("InternalRunner", false));
72 isRunning = true;
73 final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
74 threadPoolExecutor = new ThreadPoolExecutor(
75 Configuration.configuration.getRunnerThread() / 2,
76 Configuration.configuration.getRunnerThread() * 2, 1000,
77 TimeUnit.MILLISECONDS, workQueue,
78 new WaarpThreadFactory("ClientRunner"), new RejectedExecutionHandler() {
79 @Override
80 public final void rejectedExecution(final Runnable runnable,
81 final ThreadPoolExecutor threadPoolExecutor) {
82 logger.debug("Task rescheduled");
83 }
84 });
85 scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
86 Configuration.configuration.getDelayCommander(),
87 Configuration.configuration.getDelayCommander(),
88 TimeUnit.MILLISECONDS);
89 networkTransaction = new NetworkTransaction();
90 }
91
92 public final NetworkTransaction getNetworkTransaction() {
93 return networkTransaction;
94 }
95
96 public final int allowedToSubmit() {
97 final int active = threadPoolExecutor.getActiveCount();
98 if ((isRunning || !Configuration.configuration.isShutdown()) &&
99 (active < Configuration.configuration.getRunnerThread())) {
100 return Configuration.configuration.getRunnerThread() - active;
101 }
102 return 0;
103 }
104
105
106
107
108
109
110
111
112 public final boolean submitTaskRunner(final DbTaskRunner taskRunner) {
113 if (isRunning || !Configuration.configuration.isShutdown()) {
114 logger.debug("Will run {}", taskRunner);
115 final ClientRunner runner =
116 new ClientRunner(networkTransaction, taskRunner, null);
117 if (taskRunner.isSendThrough() && (taskRunner.isRescheduledTransfer() ||
118 taskRunner.isPreTaskStarting())) {
119 runner.setSendThroughMode();
120 taskRunner.checkThroughMode();
121 }
122 try {
123 taskRunner.changeUpdatedInfo(UpdatedInfo.RUNNING);
124 taskRunner.update();
125 } catch (final WaarpDatabaseException e) {
126 logger.error("Error in Commander: {}", e.getMessage());
127 return false;
128 }
129
130 threadPoolExecutor.execute(runner);
131 return true;
132 }
133 return false;
134 }
135
136
137
138
139 public final void prepareStopInternalRunner() {
140 isRunning = false;
141 scheduledFuture.cancel(false);
142 if (commander != null) {
143 commander.finalizeCommander();
144 }
145 scheduledExecutorService.shutdown();
146 threadPoolExecutor.shutdown();
147 }
148
149
150
151
152
153 public final void stopInternalRunner() {
154 isRunning = false;
155 logger.info("Stopping Commander and Runner Tasks");
156 scheduledFuture.cancel(true);
157 if (commander != null) {
158 commander.finalizeCommander();
159 }
160 scheduledExecutorService.shutdownNow();
161 threadPoolExecutor.shutdownNow();
162 networkTransaction.closeAll(false);
163 }
164
165 public final int nbInternalRunner() {
166 return threadPoolExecutor.getActiveCount();
167 }
168
169 public final void reloadInternalRunner()
170 throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
171 scheduledFuture.cancel(false);
172 if (commander != null) {
173 commander.finalizeCommander();
174 }
175 commander = new Commander(this);
176 scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(commander,
177 2 *
178 Configuration.configuration.getDelayCommander(),
179 Configuration.configuration.getDelayCommander(),
180 TimeUnit.MILLISECONDS);
181 }
182 }