View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.commander;
21  
22  import org.waarp.common.database.DbPreparedStatement;
23  import org.waarp.common.database.data.AbstractDbData;
24  import org.waarp.common.database.data.AbstractDbData.UpdatedInfo;
25  import org.waarp.common.database.exception.WaarpDatabaseException;
26  import org.waarp.common.database.exception.WaarpDatabaseNoConnectionException;
27  import org.waarp.common.database.exception.WaarpDatabaseNoDataException;
28  import org.waarp.common.database.exception.WaarpDatabaseSqlException;
29  import org.waarp.common.logging.WaarpLogger;
30  import org.waarp.common.logging.WaarpLoggerFactory;
31  import org.waarp.common.utility.WaarpShutdownHook;
32  import org.waarp.openr66.database.data.DbConfiguration;
33  import org.waarp.openr66.database.data.DbHostAuth;
34  import org.waarp.openr66.database.data.DbHostConfiguration;
35  import org.waarp.openr66.database.data.DbMultipleMonitor;
36  import org.waarp.openr66.database.data.DbRule;
37  import org.waarp.openr66.database.data.DbTaskRunner;
38  import org.waarp.openr66.protocol.configuration.Configuration;
39  import org.waarp.openr66.protocol.networkhandler.NetworkTransaction;
40  
41  import static org.waarp.openr66.database.DbConstantR66.*;
42  
43  /**
44   * Commander is responsible to read from database updated data from time to time
45   * in order to achieve new
46   * runner or new configuration updates.
47   */
48  public class Commander implements CommanderInterface {
49    private static final String DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER =
50        "Database Error: Cannot execute Commander";
51  
52    private static final String CONFIG = "Config {}";
53  
54    private static final String DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER =
55        "Database SQL Error: Cannot execute Commander";
56  
57    private static final String
58        DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER =
59        "Database No Connection Error: Cannot execute Commander";
60  
61    /**
62     * Internal Logger
63     */
64    private static final WaarpLogger logger =
65        WaarpLoggerFactory.getLogger(Commander.class);
66  
67    public static final int LIMIT_MAX_SUBMIT = 50000;
68    public static final int LIMIT_SUBMIT = 1000;
69  
70    private InternalRunner internalRunner;
71    private DbPreparedStatement preparedStatementLock;
72    private long totalRuns = 0;
73  
74    /**
75     * Prepare requests that will be executed from time to time
76     *
77     * @param runner
78     *
79     * @throws WaarpDatabaseNoConnectionException
80     * @throws WaarpDatabaseSqlException
81     */
82    public Commander(final InternalRunner runner)
83        throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
84      internalConstructor(runner);
85    }
86  
87    /**
88     * Prepare requests that will be executed from time to time
89     *
90     * @param runner
91     * @param fromStartup True if call from startup of the server
92     *
93     * @throws WaarpDatabaseNoConnectionException
94     * @throws WaarpDatabaseSqlException
95     */
96    public Commander(final InternalRunner runner, final boolean fromStartup)
97        throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
98      internalConstructor(runner);
99      if (fromStartup) {
100       // Change RUNNING or INTERRUPTED to TOSUBMIT since they should be ready
101       DbTaskRunner.resetToSubmit(admin.getSession());
102     }
103   }
104 
105   private void internalConstructor(final InternalRunner runner)
106       throws WaarpDatabaseNoConnectionException, WaarpDatabaseSqlException {
107     try {
108       if (Configuration.configuration.getMultipleMonitors() > 1) {
109         preparedStatementLock = DbMultipleMonitor.getUpdatedPrepareStament(
110             noCommitAdmin.getSession());
111       } else {
112         preparedStatementLock = null;
113       }
114       // Clean tasks (CompleteOK and ALLDONE => DONE)
115       DbTaskRunner.changeFinishedToDone();
116       internalRunner = runner;
117     } finally {
118       if (internalRunner == null) {
119         // An error occurs
120         if (preparedStatementLock != null) {
121           preparedStatementLock.realClose();
122         }
123       } else {
124         if (preparedStatementLock != null) {
125           noCommitAdmin.getSession()
126                        .addLongTermPreparedStatement(preparedStatementLock);
127         }
128       }
129     }
130   }
131 
132   /**
133    * Finalize internal data
134    */
135   @Override
136   public final void finalizeCommander() {
137     if (preparedStatementLock != null) {
138       try {
139         noCommitAdmin.getSession().commit();
140       } catch (final WaarpDatabaseSqlException ignored) {
141         // nothing
142       } catch (final WaarpDatabaseNoConnectionException ignored) {
143         // nothing
144       }
145       preparedStatementLock.realClose();
146       noCommitAdmin.getSession()
147                    .removeLongTermPreparedStatements(preparedStatementLock);
148       // DbConstant.noCommitAdmin.session.removeLongTermPreparedStatements()
149     }
150     // DbConstant.admin.session.removeLongTermPreparedStatements()
151   }
152 
153   @Override
154   public void run() {
155     Thread.currentThread().setName("OpenR66Commander");
156     if (Configuration.configuration.isShutdown()) {
157       // Stop
158       return;
159     }
160     if (admin.getSession() != null && admin.getSession().isDisActive()) {
161       admin.getSession().checkConnectionNoException();
162     }
163     // each time it is runned, it parses all database for updates
164     DbMultipleMonitor multipleMonitor = null;
165     // Open a lock to prevent other "HA" monitors to retrieve access as Commander
166     try {
167       try {
168         if (preparedStatementLock != null) {
169           preparedStatementLock.executeQuery();
170           preparedStatementLock.getNext();
171           multipleMonitor =
172               DbMultipleMonitor.getFromStatement(preparedStatementLock);
173         }
174       } catch (final WaarpDatabaseNoConnectionException e) {
175         logger.error(
176             DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
177             e.getMessage());
178         try {
179           noCommitAdmin.getDbModel()
180                        .validConnection(noCommitAdmin.getSession());
181         } catch (final WaarpDatabaseNoConnectionException ignored) {
182           // nothing
183         }
184         return;
185       } catch (final WaarpDatabaseSqlException e) {
186         logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER, e);
187         try {
188           noCommitAdmin.getDbModel()
189                        .validConnection(noCommitAdmin.getSession());
190         } catch (final WaarpDatabaseNoConnectionException ignored) {
191           // nothing
192         }
193         return;
194       }
195       logger.debug("Before {}", multipleMonitor);
196       boolean shallReturnInCaseError = true;
197       try {
198         // First check Configuration
199         checkConfiguration(multipleMonitor);
200         // check HostConfiguration
201         shallReturnInCaseError = false;
202         checkHostConfiguration(multipleMonitor);
203       } catch (final WaarpDatabaseNoConnectionException e) {
204         try {
205           admin.getDbModel().validConnection(admin.getSession());
206         } catch (final WaarpDatabaseNoConnectionException ignored) {
207           // nothing
208         }
209         logger.error(
210             DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
211             e.getMessage());
212         return;
213       } catch (final WaarpDatabaseSqlException e) {
214         try {
215           admin.getDbModel().validConnection(admin.getSession());
216         } catch (final WaarpDatabaseNoConnectionException ignored) {
217           // nothing
218         }
219         logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
220                      e.getMessage());
221         // XXX no return since table might not be initialized
222         if (shallReturnInCaseError) {
223           return;
224         }
225       } catch (final WaarpDatabaseException e) {
226         try {
227           admin.getDbModel().validConnection(admin.getSession());
228         } catch (final WaarpDatabaseNoConnectionException ignored) {
229           // nothing
230         }
231         logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
232                      e.getMessage());
233         // XXX no return since table might not be initialized
234         if (shallReturnInCaseError) {
235           return;
236         }
237       }
238       // Do not fusion with previous cases since last one could be in error but
239       // still continue
240       try {
241         // ConsistencyCheck HostAuthent
242         checkHostAuthent(multipleMonitor);
243         // Check Rules
244         checkRule(multipleMonitor);
245         if (WaarpShutdownHook.isShutdownStarting()) {
246           // no more task to submit
247           return;
248         }
249 
250         // Lauch Transfer ready to be submited
251         logger.debug("start runner");
252         checkTaskRunner(multipleMonitor);
253       } catch (final WaarpDatabaseNoConnectionException e) {
254         try {
255           admin.getDbModel().validConnection(admin.getSession());
256         } catch (final WaarpDatabaseNoConnectionException ignored) {
257           // nothing
258         }
259         logger.error(
260             DATABASE_NO_CONNECTION_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
261             e.getMessage());
262         return;
263       } catch (final WaarpDatabaseSqlException e) {
264         try {
265           admin.getDbModel().validConnection(admin.getSession());
266         } catch (final WaarpDatabaseNoConnectionException ignored) {
267           // nothing
268         }
269         logger.error(DATABASE_SQL_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
270                      e.getMessage());
271         return;
272       } catch (final WaarpDatabaseNoDataException e) {
273         try {
274           admin.getDbModel().validConnection(admin.getSession());
275         } catch (final WaarpDatabaseNoConnectionException ignored) {
276           // nothing
277         }
278         logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
279                      e.getMessage());
280         return;
281       } catch (final WaarpDatabaseException e) {
282         try {
283           admin.getDbModel().validConnection(admin.getSession());
284         } catch (final WaarpDatabaseNoConnectionException ignored) {
285           // nothing
286         }
287         logger.error(DATABASE_ERROR_CANNOT_EXECUTE_COMMANDER + ": {}",
288                      e.getMessage());
289         return;
290       }
291       logger.debug("end commander");
292     } finally {
293       if (multipleMonitor != null) {
294         try {
295           // Now update and Commit so releasing the lock
296           logger.debug("Update {}", multipleMonitor);
297           multipleMonitor.update();
298           noCommitAdmin.getSession().commit();
299         } catch (final WaarpDatabaseException e) {
300           try {
301             noCommitAdmin.getDbModel()
302                          .validConnection(noCommitAdmin.getSession());
303           } catch (final WaarpDatabaseNoConnectionException ignored) {
304             // nothing
305           }
306         }
307       }
308     }
309   }
310 
311   private void checkConfiguration(final DbMultipleMonitor multipleMonitor)
312       throws WaarpDatabaseException {
313     if (Configuration.configuration.isShutdown()) {
314       // Stop
315       return;
316     }
317     final DbConfiguration[] configurations =
318         DbConfiguration.getUpdatedPrepareStament();
319     int i = 0;
320     while (i < configurations.length) {
321       // should be only one...
322       final DbConfiguration configuration = configurations[i];
323       if (configuration.isOwnConfiguration()) {
324         configuration.updateConfiguration();
325       }
326       if (multipleMonitor != null) {
327         // update the configuration in HA mode
328         if (multipleMonitor.checkUpdateConfig()) {
329           configuration.changeUpdatedInfo(
330               AbstractDbData.UpdatedInfo.NOTUPDATED);
331           configuration.update();
332           logger.debug(CONFIG, multipleMonitor);
333         } else {
334           configuration.update();
335           logger.debug(CONFIG, multipleMonitor);
336         }
337       } else {
338         configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
339         configuration.update();
340       }
341       i++;
342     }
343   }
344 
345   private void checkHostConfiguration(final DbMultipleMonitor multipleMonitor)
346       throws WaarpDatabaseException {
347     if (Configuration.configuration.isShutdown()) {
348       // Stop
349       return;
350     }
351     final DbHostConfiguration[] configurations =
352         DbHostConfiguration.getUpdatedPrepareStament();
353     int i = 0;
354     while (i < configurations.length) {
355       // should be only one...
356       final DbHostConfiguration configuration = configurations[i];
357       if (configuration.isOwnConfiguration()) {
358         configuration.updateConfiguration();
359       }
360       if (multipleMonitor != null) {
361         // update the configuration in HA mode
362         if (multipleMonitor.checkUpdateConfig()) {
363           configuration.changeUpdatedInfo(
364               AbstractDbData.UpdatedInfo.NOTUPDATED);
365           configuration.update();
366           logger.debug(CONFIG, multipleMonitor);
367         } else {
368           configuration.update();
369           logger.debug(CONFIG, multipleMonitor);
370         }
371       } else {
372         configuration.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
373         configuration.update();
374       }
375       i++;
376     }
377   }
378 
379   private void checkHostAuthent(final DbMultipleMonitor multipleMonitor)
380       throws WaarpDatabaseException {
381     if (Configuration.configuration.isShutdown()) {
382       // Stop
383       return;
384     }
385     final DbHostAuth[] auths = DbHostAuth.getUpdatedPreparedStatement();
386     int i = 0;
387     boolean mm = false;
388     boolean lastUpdate = false;
389     while (i < auths.length) {
390       // Maybe multiple
391       final DbHostAuth hostAuth = auths[i];
392       if (multipleMonitor != null) {
393         if (!mm) {
394           // not already set from a previous hostAuth
395           mm = true;
396           lastUpdate = multipleMonitor.checkUpdateHost();
397         } // else already set so no action on multipleMonitor
398 
399         // Update the Host configuration in HA mode
400         if (lastUpdate) {
401           hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
402         } else {
403           // Nothing to do except validate
404         }
405         hostAuth.update();
406         logger.debug("Host {}", multipleMonitor);
407       } else {
408         // Nothing to do except validate
409         hostAuth.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
410         hostAuth.update();
411       }
412       i++;
413     }
414   }
415 
416   private void checkRule(final DbMultipleMonitor multipleMonitor)
417       throws WaarpDatabaseException {
418     if (Configuration.configuration.isShutdown()) {
419       // Stop
420       return;
421     }
422     final DbRule[] rules = DbRule.getUpdatedPrepareStament();
423     int i = 0;
424     boolean mm = false;
425     boolean lastUpdate = false;
426     while (i < rules.length) {
427       final DbRule rule = rules[i];
428       if (multipleMonitor != null) {
429         if (!mm) {
430           // not already set from a previous hostAuth
431           mm = true;
432           lastUpdate = multipleMonitor.checkUpdateRule();
433         } // else already set so no action on multipleMonitor
434         // Update the Rules in HA mode
435         if (lastUpdate) {
436           rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
437         } else {
438           // Nothing to do except validate
439         }
440         rule.update();
441         logger.debug("Rule {}", multipleMonitor);
442       } else {
443         // Nothing to do except validate
444         rule.changeUpdatedInfo(AbstractDbData.UpdatedInfo.NOTUPDATED);
445         rule.update();
446       }
447       i++;
448     }
449   }
450 
451   private void checkTaskRunner(final DbMultipleMonitor multipleMonitor)
452       throws WaarpDatabaseException {
453     if (Configuration.configuration.isShutdown()) {
454       // Stop
455       return;
456     }
457     // No specific HA mode since the other servers will wait for the commit on Lock
458     final int maxRunnable =
459         Math.min(Configuration.configuration.getRunnerThread(),
460                  internalRunner.allowedToSubmit());
461     if (maxRunnable > 0) {
462       final DbTaskRunner[] tasks =
463           DbTaskRunner.getSelectFromInfoPrepareStatement(UpdatedInfo.TOSUBMIT,
464                                                          true, maxRunnable);
465       logger.info("TaskRunner to launch: {} (launched: {}, active: {}) {}",
466                   tasks.length, totalRuns, internalRunner.nbInternalRunner(),
467                   NetworkTransaction.hashStatus());
468       int i = 0;
469       while (i < tasks.length) {
470         if (WaarpShutdownHook.isShutdownStarting()) {
471           logger.info("Will not start transfers, server is in shutdown.");
472           return;
473         }
474         final DbTaskRunner taskRunner = tasks[i];
475         i++;
476 
477         logger.debug("get a task: {}", taskRunner);
478         // Launch if possible this task
479         final String key =
480             taskRunner.getRequested() + ' ' + taskRunner.getRequester() + ' ' +
481             taskRunner.getSpecialId();
482         if (Configuration.configuration.getLocalTransaction()
483                                        .getFromRequest(key) != null) {
484           // already running
485           continue;
486         }
487         if (taskRunner.isRequestOnRequested()) {
488           // cannot schedule a request where the host is the requested host
489           taskRunner.changeUpdatedInfo(UpdatedInfo.INTERRUPTED);
490           try {
491             taskRunner.update();
492           } catch (final WaarpDatabaseNoDataException e) {
493             logger.warn("Update failed, no transfer found");
494           }
495           continue;
496         }
497         // last check: number can have raised up since Commander checks
498         if (i > (tasks.length - 10) && internalRunner.nbInternalRunner() >=
499                                        Configuration.configuration.getRunnerThread()) {
500           break;
501         }
502         if (internalRunner.submitTaskRunner(taskRunner)) {
503           totalRuns++;
504         } else {
505           break;
506         }
507       }
508     }
509   }
510 }