View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.common.cpu;
21  
22  import io.netty.handler.traffic.AbstractTrafficShapingHandler;
23  import org.waarp.common.logging.SysErrLogger;
24  import org.waarp.common.logging.WaarpLogger;
25  import org.waarp.common.logging.WaarpLoggerFactory;
26  import org.waarp.common.utility.ThreadLocalRandom;
27  
28  import java.util.LinkedList;
29  import java.util.concurrent.ScheduledThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * Abstract class for Constraint Limit Handler for Waarp project
34   */
35  public abstract class WaarpConstraintLimitHandler implements Runnable {
36    /**
37     * Internal Logger
38     */
39    private static final WaarpLogger logger =
40        WaarpLoggerFactory.getLogger(WaarpConstraintLimitHandler.class);
41  
42    private static final String NOALERT = "noAlert";
43    public static final long LOWBANDWIDTH_DEFAULT = 1048576;
44    public String lastAlert = NOALERT;
45    private boolean constraintInactive = true;
46    private boolean useCpuLimits;
47    private boolean useBandwidthLimit;
48  
49    private final ThreadLocalRandom random = ThreadLocalRandom.current();
50    private CpuManagementInterface cpuManagement;
51    private double cpuLimit = 1.0; // was 0.8
52    private int channelLimit; // was 1000
53    private boolean isServer;
54    private double lastLA;
55    private long lastTime;
56  
57    // Dynamic throttling
58    private long waitForNetOp = 1000;
59    private long timeoutCon = 10000;
60    private double highCpuLimit; // was 0.8
61    private double lowCpuLimit; // was 0.5
62    private double percentageDecreaseRatio = 0.25;
63    private long delay = 1000;
64    private long limitLowBandwidth = LOWBANDWIDTH_DEFAULT;
65    private AbstractTrafficShapingHandler handler;
66    private ScheduledThreadPoolExecutor executor;
67  
68    private static class CurLimits {
69      long read;
70      long write;
71  
72      private CurLimits(final long read, final long write) {
73        this.read = read;
74        this.write = write;
75      }
76    }
77  
78    private final LinkedList<CurLimits> curLimits = new LinkedList<CurLimits>();
79    private int nbSinceLastDecrease;
80    private static final int PAYLOAD = 5;
81    // 5 seconds of payload when new high cpu
82  
83    /**
84     * Empty constructor
85     */
86    public WaarpConstraintLimitHandler() {
87      // Do nothing except setup standard value for inactivity
88      if (cpuManagement == null) {
89        cpuManagement = new CpuManagementNoInfo();
90      }
91    }
92  
93    /**
94     * This constructor enables only throttling bandwidth with cpu usage
95     *
96     * @param waitForNetOp 1000 ms as wait for a network operation
97     * @param timeOutCon 10000 ms as timeout limit
98     * @param useJdkCpuLimit True to use JDK Cpu native or False for
99     *     JavaSysMon
100    * @param lowcpuLimit for proactive cpu limitation (throttling
101    *     bandwidth)
102    *     (0<= x < 1 & highcpulimit)
103    * @param highcpuLimit for proactive cpu limitation (throttling
104    *     bandwidth)
105    *     (0<= x <= 1) 0 meaning no
106    *     throttle activated
107    * @param percentageDecrease for proactive cpu limitation,
108    *     throttling
109    *     bandwidth reduction (0 < x < 1) as 0.25
110    *     for 25% of reduction
111    * @param handler the GlobalTrafficShapingHandler associated (null
112    *     to have
113    *     no proactive cpu
114    *     limitation)
115    * @param delay the delay between 2 tests for proactive cpu
116    *     limitation
117    * @param limitLowBandwidth the minimal bandwidth (read or write) to
118    *     apply
119    *     when decreasing bandwidth (low
120    *     limit = 4096)
121    */
122   public WaarpConstraintLimitHandler(final long waitForNetOp,
123                                      final long timeOutCon,
124                                      final boolean useJdkCpuLimit,
125                                      final double lowcpuLimit,
126                                      final double highcpuLimit,
127                                      final double percentageDecrease,
128                                      final AbstractTrafficShapingHandler handler,
129                                      final long delay,
130                                      final long limitLowBandwidth) {
131     this(waitForNetOp, timeOutCon, true, useJdkCpuLimit, 0, 0, lowcpuLimit,
132          highcpuLimit, percentageDecrease, handler, delay, limitLowBandwidth);
133   }
134 
135   /**
136    * This constructor enables only Connection check ability
137    *
138    * @param useCpuLimit True to enable cpuLimit on connection check
139    * @param useJdKCpuLimit True to use JDK Cpu native or False for
140    *     JavaSysMon
141    * @param cpulimit high cpu limit (0<= x < 1) to refuse new
142    *     connections
143    * @param channellimit number of connection limit (0<= x)
144    */
145   public WaarpConstraintLimitHandler(final long waitForNetOp,
146                                      final long timeOutCon,
147                                      final boolean useCpuLimit,
148                                      final boolean useJdKCpuLimit,
149                                      final double cpulimit,
150                                      final int channellimit) {
151     this(waitForNetOp, timeOutCon, useCpuLimit, useJdKCpuLimit, cpulimit,
152          channellimit, 0, 0, 0.01, null, 1000000, LOWBANDWIDTH_DEFAULT);
153   }
154 
155   /**
156    * This constructor enables both Connection check ability and throttling
157    * bandwidth with cpu usage
158    *
159    * @param waitForNetOp2 1000 ms as wait for a network operation
160    * @param timeOutCon2 10000 ms as timeout limit
161    * @param useCpuLimit True to enable cpuLimit on connection check
162    * @param useJdKCpuLimit True to use JDK Cpu native or False for
163    *     JavaSysMon
164    * @param cpulimit high cpu limit (0<= x < 1) to refuse new
165    *     connections
166    * @param channellimit number of connection limit (0<= x)
167    * @param lowcpuLimit for proactive cpu limitation (throttling
168    *     bandwidth)
169    *     (0<= x < 1 & highcpulimit)
170    * @param highcpuLimit for proactive cpu limitation (throttling
171    *     bandwidth)
172    *     (0<= x <= 1) 0 meaning no
173    *     throttle activated
174    * @param percentageDecrease for proactive cpu limitation,
175    *     throttling
176    *     bandwidth reduction (0 < x < 1) as 0.25
177    *     for 25% of reduction
178    * @param handler the GlobalTrafficShapingHandler associated (null
179    *     to have
180    *     no proactive cpu
181    *     limitation)
182    * @param delay the delay between 2 tests for proactive cpu
183    *     limitation
184    * @param limitLowBandwidth the minimal bandwidth (read or write) to
185    *     apply
186    *     when decreasing bandwidth (low
187    *     limit = 4096)
188    */
189   public WaarpConstraintLimitHandler(final long waitForNetOp2,
190                                      final long timeOutCon2,
191                                      final boolean useCpuLimit,
192                                      final boolean useJdKCpuLimit,
193                                      final double cpulimit,
194                                      final int channellimit,
195                                      final double lowcpuLimit,
196                                      final double highcpuLimit,
197                                      final double percentageDecrease,
198                                      final AbstractTrafficShapingHandler handler,
199                                      final long delay,
200                                      final long limitLowBandwidth) {
201     useCpuLimits = useCpuLimit;
202     waitForNetOp = waitForNetOp2;
203     timeoutCon = timeOutCon2;
204     lowCpuLimit = lowcpuLimit;
205     highCpuLimit = highcpuLimit;
206     this.limitLowBandwidth = limitLowBandwidth;
207     if (this.limitLowBandwidth < LOWBANDWIDTH_DEFAULT) {
208       this.limitLowBandwidth = LOWBANDWIDTH_DEFAULT;
209     }
210     this.delay = delay;
211     if (lowCpuLimit <= 0) {
212       lowCpuLimit = highCpuLimit / 2;
213     }
214     percentageDecreaseRatio = percentageDecrease;
215     if (percentageDecreaseRatio <= 0) {
216       percentageDecreaseRatio = 0.01;
217     } else if (percentageDecreaseRatio >= 1) {
218       percentageDecreaseRatio /= 100;
219     }
220     if (delay < waitForNetOp >> 1) {
221       this.delay = waitForNetOp;
222     }
223     this.handler = handler;
224     if (useCpuLimits || highCpuLimit > 0) {
225       if (useJdKCpuLimit) {
226         try {
227           cpuManagement = new CpuManagement();
228           constraintInactive = false;
229         } catch (final UnsupportedOperationException e) {
230           cpuManagement = new CpuManagementNoInfo();
231           constraintInactive = true;
232         }
233       } else {
234         cpuManagement = new CpuManagementSysmon();
235         constraintInactive = false;
236       }
237     } else {
238       // no test at all
239       constraintInactive = true;
240       cpuManagement = new CpuManagementNoInfo();
241     }
242     useBandwidthLimit = highcpuLimit > 0;
243     cpuLimit = cpulimit;
244     channelLimit = channellimit;
245     lastTime = System.currentTimeMillis();
246     if (this.handler != null && !constraintInactive && !useBandwidthLimit) {
247       executor = new ScheduledThreadPoolExecutor(1);
248       executor.scheduleWithFixedDelay(this, this.delay, this.delay,
249                                       TimeUnit.MILLISECONDS);
250     }
251   }
252 
253   /**
254    * Release the resources
255    */
256   public void release() {
257     if (executor != null) {
258       executor.shutdownNow();
259     }
260   }
261 
262   /**
263    * To explicitly set this handler as server mode
264    *
265    * @param isServer
266    */
267   public void setServer(final boolean isServer) {
268     this.isServer = isServer;
269   }
270 
271   private double getLastLA() {
272     final long newTime = System.currentTimeMillis();
273     // first check if last test was done too shortly
274     // If last test was wrong, then redo the test
275     if (newTime - lastTime < waitForNetOp >> 1 && lastLA <= cpuLimit) {
276       // last test was OK, so Continue
277       return lastLA;
278     }
279     lastTime = newTime;
280     lastLA = cpuManagement.getLoadAverage();
281     return lastLA;
282   }
283 
284   /**
285    * @return True if one of the limit is exceeded. Always False if not a
286    *     server mode
287    */
288   public boolean checkConstraints() {
289     if (!isServer) {
290       return false;
291     }
292     if (useCpuLimits && cpuLimit < 1 && cpuLimit > 0) {
293       getLastLA();
294       if (lastLA <= cpuLimit) {
295         lastAlert = NOALERT;
296         return false;
297       }
298       lastAlert = "CPU Constraint: " + lastLA + " > " + cpuLimit;
299       logger.info(lastAlert);
300       return true;
301     }
302     if (channelLimit > 0) {
303       final int nb = getNumberLocalChannel();
304       if (channelLimit < nb) {
305         lastAlert = "LocalNetwork Constraint: " + nb + " > " + channelLimit;
306         logger.info(lastAlert);
307         return true;
308       }
309     }
310     lastAlert = NOALERT;
311     return false;
312   }
313 
314   /**
315    * @return the current number of active Local Channel
316    */
317   protected abstract int getNumberLocalChannel();
318 
319   /**
320    * Same as checkConstraints except that the thread will sleep some time
321    * proportionally to the current Load (if
322    * CPU related)
323    *
324    * @param step the current step in retry
325    *
326    * @return True if one of the limit is exceeded. Always False if not a
327    *     server mode
328    */
329   public boolean checkConstraintsSleep(final int step) {
330     if (!isServer) {
331       return false;
332     }
333     long delayNew = waitForNetOp >> 1;
334     if (useCpuLimits && cpuLimit < 1 && cpuLimit > 0) {
335       final long newTime = System.currentTimeMillis();
336       // first check if last test was done too shortly
337       if (newTime - lastTime < delayNew) {
338         // If last test was wrong, then wait a bit then redo the test
339         if (lastLA > cpuLimit) {
340           final double sleep =
341               lastLA * delayNew * (step + 1) * random.nextFloat();
342           final long shorttime = ((long) sleep / 10) * 10;
343           if (shorttime >= 10) {
344             try {
345               Thread.sleep(shorttime);
346             } catch (final InterruptedException ignore) {//NOSONAR
347               SysErrLogger.FAKE_LOGGER.ignoreLog(ignore);
348             }
349           }
350         } else {
351           // last test was OK, so Continue
352           lastAlert = NOALERT;
353           return false;
354         }
355       }
356     }
357     if (checkConstraints()) {
358       delayNew = getSleepTime() * (step + 1);
359       try {
360         Thread.sleep(delayNew);
361       } catch (final InterruptedException ignore) {//NOSONAR
362         SysErrLogger.FAKE_LOGGER.ignoreLog(ignore);
363       }
364       return true;
365     } else {
366       lastAlert = NOALERT;
367       return false;
368     }
369   }
370 
371   /**
372    * @return a time below TIMEOUTCON with a random
373    */
374   public long getSleepTime() {
375     return (((long) (timeoutCon * random.nextFloat()) + 5000) / 10) * 10;
376   }
377 
378   /**
379    * @return the cpuLimit
380    */
381   public double getCpuLimit() {
382     return cpuLimit;
383   }
384 
385   /**
386    * @param cpuLimit the cpuLimit to set
387    */
388   public void setCpuLimit(final double cpuLimit) {
389     this.cpuLimit = cpuLimit;
390   }
391 
392   /**
393    * @return the channelLimit
394    */
395   public int getChannelLimit() {
396     return channelLimit;
397   }
398 
399   /**
400    * @param channelLimit the channelLimit to set
401    */
402   public void setChannelLimit(final int channelLimit) {
403     this.channelLimit = channelLimit;
404   }
405 
406   /**
407    * Get the current setting on Read Limit (supposed to be not the value in
408    * the
409    * handler but in the
410    * configuration)
411    *
412    * @return the current setting on Read Limit
413    */
414   protected abstract long getReadLimit();
415 
416   /**
417    * Get the current setting on Write Limit (supposed to be not the value in
418    * the
419    * handler but in the
420    * configuration)
421    *
422    * @return the current setting on Write Limit
423    */
424   protected abstract long getWriteLimit();
425 
426   /**
427    * Set the handler
428    *
429    * @param handler
430    */
431   public void setHandler(final AbstractTrafficShapingHandler handler) {
432     this.handler = handler;
433     if (!constraintInactive && this.handler != null && useBandwidthLimit) {
434       if (executor != null) {
435         executor.shutdownNow();
436       }
437       logger.debug("Activate Throttle bandwidth according to CPU usage");
438       executor = new ScheduledThreadPoolExecutor(1);
439       executor.scheduleWithFixedDelay(this, delay, delay,
440                                       TimeUnit.MILLISECONDS);
441     } else {
442       if (executor != null) {
443         executor.shutdownNow();
444         executor = null;
445       }
446     }
447   }
448 
449   /**
450    * Check every delay if the current cpu usage needs to relax or to
451    * constraint the bandwidth
452    */
453   @Override
454   public void run() {
455     if (constraintInactive) {
456       return;
457     }
458     final double curLA = getLastLA();
459     if (!useBandwidthLimit) {
460       return;
461     }
462     if (curLA > highCpuLimit) {
463       final CurLimits curlimit;
464       if (curLimits.isEmpty()) {
465         // get current limit setting
466         curlimit = new CurLimits(getReadLimit(), getWriteLimit());
467         if (curlimit.read == 0) {
468           // take the current bandwidth
469           curlimit.read = handler.trafficCounter().lastReadThroughput();
470           if (curlimit.read < limitLowBandwidth) {
471             curlimit.read = 0;
472           }
473         }
474         if (curlimit.write == 0) {
475           // take the current bandwidth
476           curlimit.write = handler.trafficCounter().lastWriteThroughput();
477           if (curlimit.write < limitLowBandwidth) {
478             curlimit.write = 0;
479           }
480         }
481       } else {
482         curlimit = curLimits.getLast();
483       }
484       long newread = (long) (curlimit.read * (1 - percentageDecreaseRatio));
485       if (newread < limitLowBandwidth) {
486         newread = limitLowBandwidth;
487       }
488       long newwrite = (long) (curlimit.write * (1 - percentageDecreaseRatio));
489       if (newwrite < limitLowBandwidth) {
490         newwrite = limitLowBandwidth;
491       }
492       final CurLimits newlimit = new CurLimits(newread, newwrite);
493       if (curLimits.isEmpty() || curlimit.read != newread ||
494           curlimit.write != newwrite) {
495         // Not same limit so add this limit
496         curLimits.add(newlimit);
497         logger.info("Set new low limit since CPU = {} {}:{}", curLA, newwrite,
498                     newread);
499         handler.configure(newlimit.write, newlimit.read);
500         nbSinceLastDecrease += PAYLOAD;
501       }
502     } else if (curLA < lowCpuLimit) {
503       if (curLimits.isEmpty()) {
504         // nothing to do
505         return;
506       }
507       if (nbSinceLastDecrease > 0) {
508         nbSinceLastDecrease--;
509         // wait a bit more in case
510         return;
511       }
512       nbSinceLastDecrease = 0;
513       curLimits.pollLast();
514       final CurLimits newlimit;
515       if (curLimits.isEmpty()) {
516         // reset to default limits
517         final long newread = getReadLimit();
518         final long newwrite = getWriteLimit();
519         logger.info("Restore limit since CPU = {} {}:{}", curLA, newwrite,
520                     newread);
521         handler.configure(newwrite, newread);
522       } else {
523         // set next upper values
524         newlimit = curLimits.getLast();
525         final long newread = newlimit.read;
526         final long newwrite = newlimit.write;
527         logger.info("Set new upper limit since CPU = {} {}:{}", curLA, newwrite,
528                     newread);
529         handler.configure(newwrite, newread);
530         // give extra payload to prevent a brutal return to normal
531         nbSinceLastDecrease = PAYLOAD;
532       }
533     }
534   }
535 }