1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.common.cpu;
21
22 import io.netty.handler.traffic.AbstractTrafficShapingHandler;
23 import org.waarp.common.logging.SysErrLogger;
24 import org.waarp.common.logging.WaarpLogger;
25 import org.waarp.common.logging.WaarpLoggerFactory;
26 import org.waarp.common.utility.ThreadLocalRandom;
27
28 import java.util.LinkedList;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32
33
34
35 public abstract class WaarpConstraintLimitHandler implements Runnable {
36
37
38
39 private static final WaarpLogger logger =
40 WaarpLoggerFactory.getLogger(WaarpConstraintLimitHandler.class);
41
42 private static final String NOALERT = "noAlert";
43 public static final long LOWBANDWIDTH_DEFAULT = 1048576;
44 public String lastAlert = NOALERT;
45 private boolean constraintInactive = true;
46 private boolean useCpuLimits;
47 private boolean useBandwidthLimit;
48
49 private final ThreadLocalRandom random = ThreadLocalRandom.current();
50 private CpuManagementInterface cpuManagement;
51 private double cpuLimit = 1.0;
52 private int channelLimit;
53 private boolean isServer;
54 private double lastLA;
55 private long lastTime;
56
57
58 private long waitForNetOp = 1000;
59 private long timeoutCon = 10000;
60 private double highCpuLimit;
61 private double lowCpuLimit;
62 private double percentageDecreaseRatio = 0.25;
63 private long delay = 1000;
64 private long limitLowBandwidth = LOWBANDWIDTH_DEFAULT;
65 private AbstractTrafficShapingHandler handler;
66 private ScheduledThreadPoolExecutor executor;
67
68 private static class CurLimits {
69 long read;
70 long write;
71
72 private CurLimits(final long read, final long write) {
73 this.read = read;
74 this.write = write;
75 }
76 }
77
78 private final LinkedList<CurLimits> curLimits = new LinkedList<CurLimits>();
79 private int nbSinceLastDecrease;
80 private static final int PAYLOAD = 5;
81
82
83
84
85
86 public WaarpConstraintLimitHandler() {
87
88 if (cpuManagement == null) {
89 cpuManagement = new CpuManagementNoInfo();
90 }
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public WaarpConstraintLimitHandler(final long waitForNetOp,
123 final long timeOutCon,
124 final boolean useJdkCpuLimit,
125 final double lowcpuLimit,
126 final double highcpuLimit,
127 final double percentageDecrease,
128 final AbstractTrafficShapingHandler handler,
129 final long delay,
130 final long limitLowBandwidth) {
131 this(waitForNetOp, timeOutCon, true, useJdkCpuLimit, 0, 0, lowcpuLimit,
132 highcpuLimit, percentageDecrease, handler, delay, limitLowBandwidth);
133 }
134
135
136
137
138
139
140
141
142
143
144
145 public WaarpConstraintLimitHandler(final long waitForNetOp,
146 final long timeOutCon,
147 final boolean useCpuLimit,
148 final boolean useJdKCpuLimit,
149 final double cpulimit,
150 final int channellimit) {
151 this(waitForNetOp, timeOutCon, useCpuLimit, useJdKCpuLimit, cpulimit,
152 channellimit, 0, 0, 0.01, null, 1000000, LOWBANDWIDTH_DEFAULT);
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 public WaarpConstraintLimitHandler(final long waitForNetOp2,
190 final long timeOutCon2,
191 final boolean useCpuLimit,
192 final boolean useJdKCpuLimit,
193 final double cpulimit,
194 final int channellimit,
195 final double lowcpuLimit,
196 final double highcpuLimit,
197 final double percentageDecrease,
198 final AbstractTrafficShapingHandler handler,
199 final long delay,
200 final long limitLowBandwidth) {
201 useCpuLimits = useCpuLimit;
202 waitForNetOp = waitForNetOp2;
203 timeoutCon = timeOutCon2;
204 lowCpuLimit = lowcpuLimit;
205 highCpuLimit = highcpuLimit;
206 this.limitLowBandwidth = limitLowBandwidth;
207 if (this.limitLowBandwidth < LOWBANDWIDTH_DEFAULT) {
208 this.limitLowBandwidth = LOWBANDWIDTH_DEFAULT;
209 }
210 this.delay = delay;
211 if (lowCpuLimit <= 0) {
212 lowCpuLimit = highCpuLimit / 2;
213 }
214 percentageDecreaseRatio = percentageDecrease;
215 if (percentageDecreaseRatio <= 0) {
216 percentageDecreaseRatio = 0.01;
217 } else if (percentageDecreaseRatio >= 1) {
218 percentageDecreaseRatio /= 100;
219 }
220 if (delay < waitForNetOp >> 1) {
221 this.delay = waitForNetOp;
222 }
223 this.handler = handler;
224 if (useCpuLimits || highCpuLimit > 0) {
225 if (useJdKCpuLimit) {
226 try {
227 cpuManagement = new CpuManagement();
228 constraintInactive = false;
229 } catch (final UnsupportedOperationException e) {
230 cpuManagement = new CpuManagementNoInfo();
231 constraintInactive = true;
232 }
233 } else {
234 cpuManagement = new CpuManagementSysmon();
235 constraintInactive = false;
236 }
237 } else {
238
239 constraintInactive = true;
240 cpuManagement = new CpuManagementNoInfo();
241 }
242 useBandwidthLimit = highcpuLimit > 0;
243 cpuLimit = cpulimit;
244 channelLimit = channellimit;
245 lastTime = System.currentTimeMillis();
246 if (this.handler != null && !constraintInactive && !useBandwidthLimit) {
247 executor = new ScheduledThreadPoolExecutor(1);
248 executor.scheduleWithFixedDelay(this, this.delay, this.delay,
249 TimeUnit.MILLISECONDS);
250 }
251 }
252
253
254
255
256 public void release() {
257 if (executor != null) {
258 executor.shutdownNow();
259 }
260 }
261
262
263
264
265
266
267 public void setServer(final boolean isServer) {
268 this.isServer = isServer;
269 }
270
271 private double getLastLA() {
272 final long newTime = System.currentTimeMillis();
273
274
275 if (newTime - lastTime < waitForNetOp >> 1 && lastLA <= cpuLimit) {
276
277 return lastLA;
278 }
279 lastTime = newTime;
280 lastLA = cpuManagement.getLoadAverage();
281 return lastLA;
282 }
283
284
285
286
287
288 public boolean checkConstraints() {
289 if (!isServer) {
290 return false;
291 }
292 if (useCpuLimits && cpuLimit < 1 && cpuLimit > 0) {
293 getLastLA();
294 if (lastLA <= cpuLimit) {
295 lastAlert = NOALERT;
296 return false;
297 }
298 lastAlert = "CPU Constraint: " + lastLA + " > " + cpuLimit;
299 logger.info(lastAlert);
300 return true;
301 }
302 if (channelLimit > 0) {
303 final int nb = getNumberLocalChannel();
304 if (channelLimit < nb) {
305 lastAlert = "LocalNetwork Constraint: " + nb + " > " + channelLimit;
306 logger.info(lastAlert);
307 return true;
308 }
309 }
310 lastAlert = NOALERT;
311 return false;
312 }
313
314
315
316
317 protected abstract int getNumberLocalChannel();
318
319
320
321
322
323
324
325
326
327
328
329 public boolean checkConstraintsSleep(final int step) {
330 if (!isServer) {
331 return false;
332 }
333 long delayNew = waitForNetOp >> 1;
334 if (useCpuLimits && cpuLimit < 1 && cpuLimit > 0) {
335 final long newTime = System.currentTimeMillis();
336
337 if (newTime - lastTime < delayNew) {
338
339 if (lastLA > cpuLimit) {
340 final double sleep =
341 lastLA * delayNew * (step + 1) * random.nextFloat();
342 final long shorttime = ((long) sleep / 10) * 10;
343 if (shorttime >= 10) {
344 try {
345 Thread.sleep(shorttime);
346 } catch (final InterruptedException ignore) {
347 SysErrLogger.FAKE_LOGGER.ignoreLog(ignore);
348 }
349 }
350 } else {
351
352 lastAlert = NOALERT;
353 return false;
354 }
355 }
356 }
357 if (checkConstraints()) {
358 delayNew = getSleepTime() * (step + 1);
359 try {
360 Thread.sleep(delayNew);
361 } catch (final InterruptedException ignore) {
362 SysErrLogger.FAKE_LOGGER.ignoreLog(ignore);
363 }
364 return true;
365 } else {
366 lastAlert = NOALERT;
367 return false;
368 }
369 }
370
371
372
373
374 public long getSleepTime() {
375 return (((long) (timeoutCon * random.nextFloat()) + 5000) / 10) * 10;
376 }
377
378
379
380
381 public double getCpuLimit() {
382 return cpuLimit;
383 }
384
385
386
387
388 public void setCpuLimit(final double cpuLimit) {
389 this.cpuLimit = cpuLimit;
390 }
391
392
393
394
395 public int getChannelLimit() {
396 return channelLimit;
397 }
398
399
400
401
402 public void setChannelLimit(final int channelLimit) {
403 this.channelLimit = channelLimit;
404 }
405
406
407
408
409
410
411
412
413
414 protected abstract long getReadLimit();
415
416
417
418
419
420
421
422
423
424 protected abstract long getWriteLimit();
425
426
427
428
429
430
431 public void setHandler(final AbstractTrafficShapingHandler handler) {
432 this.handler = handler;
433 if (!constraintInactive && this.handler != null && useBandwidthLimit) {
434 if (executor != null) {
435 executor.shutdownNow();
436 }
437 logger.debug("Activate Throttle bandwidth according to CPU usage");
438 executor = new ScheduledThreadPoolExecutor(1);
439 executor.scheduleWithFixedDelay(this, delay, delay,
440 TimeUnit.MILLISECONDS);
441 } else {
442 if (executor != null) {
443 executor.shutdownNow();
444 executor = null;
445 }
446 }
447 }
448
449
450
451
452
453 @Override
454 public void run() {
455 if (constraintInactive) {
456 return;
457 }
458 final double curLA = getLastLA();
459 if (!useBandwidthLimit) {
460 return;
461 }
462 if (curLA > highCpuLimit) {
463 final CurLimits curlimit;
464 if (curLimits.isEmpty()) {
465
466 curlimit = new CurLimits(getReadLimit(), getWriteLimit());
467 if (curlimit.read == 0) {
468
469 curlimit.read = handler.trafficCounter().lastReadThroughput();
470 if (curlimit.read < limitLowBandwidth) {
471 curlimit.read = 0;
472 }
473 }
474 if (curlimit.write == 0) {
475
476 curlimit.write = handler.trafficCounter().lastWriteThroughput();
477 if (curlimit.write < limitLowBandwidth) {
478 curlimit.write = 0;
479 }
480 }
481 } else {
482 curlimit = curLimits.getLast();
483 }
484 long newread = (long) (curlimit.read * (1 - percentageDecreaseRatio));
485 if (newread < limitLowBandwidth) {
486 newread = limitLowBandwidth;
487 }
488 long newwrite = (long) (curlimit.write * (1 - percentageDecreaseRatio));
489 if (newwrite < limitLowBandwidth) {
490 newwrite = limitLowBandwidth;
491 }
492 final CurLimits newlimit = new CurLimits(newread, newwrite);
493 if (curLimits.isEmpty() || curlimit.read != newread ||
494 curlimit.write != newwrite) {
495
496 curLimits.add(newlimit);
497 logger.info("Set new low limit since CPU = {} {}:{}", curLA, newwrite,
498 newread);
499 handler.configure(newlimit.write, newlimit.read);
500 nbSinceLastDecrease += PAYLOAD;
501 }
502 } else if (curLA < lowCpuLimit) {
503 if (curLimits.isEmpty()) {
504
505 return;
506 }
507 if (nbSinceLastDecrease > 0) {
508 nbSinceLastDecrease--;
509
510 return;
511 }
512 nbSinceLastDecrease = 0;
513 curLimits.pollLast();
514 final CurLimits newlimit;
515 if (curLimits.isEmpty()) {
516
517 final long newread = getReadLimit();
518 final long newwrite = getWriteLimit();
519 logger.info("Restore limit since CPU = {} {}:{}", curLA, newwrite,
520 newread);
521 handler.configure(newwrite, newread);
522 } else {
523
524 newlimit = curLimits.getLast();
525 final long newread = newlimit.read;
526 final long newwrite = newlimit.write;
527 logger.info("Set new upper limit since CPU = {} {}:{}", curLA, newwrite,
528 newread);
529 handler.configure(newwrite, newread);
530
531 nbSinceLastDecrease = PAYLOAD;
532 }
533 }
534 }
535 }