001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.beans.ConstructorProperties; 022import java.lang.management.ManagementFactory; 023import java.lang.ref.ReferenceQueue; 024import java.lang.ref.WeakReference; 025import java.nio.Buffer; 026import java.time.Duration; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.IntSummaryStatistics; 030import java.util.Map; 031import java.util.Optional; 032import java.util.Set; 033import java.util.SortedMap; 034import java.util.TreeMap; 035import java.util.WeakHashMap; 036import java.util.concurrent.ArrayBlockingQueue; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiFunction; 042import java.util.function.Supplier; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045import java.util.stream.Collectors; 046import javax.management.InstanceAlreadyExistsException; 047import javax.management.MBeanRegistrationException; 048import javax.management.MBeanServer; 049import javax.management.MalformedObjectNameException; 050import javax.management.NotCompliantMBeanException; 051import javax.management.ObjectName; 052import org.jgrapes.core.Components; 053import org.jgrapes.core.Components.Timer; 054import org.jgrapes.io.IOSubchannel; 055import org.jgrapes.io.events.Output; 056 057/** 058 * A queue based buffer pool. Using buffers from a pool is an important 059 * feature for limiting the computational resources for an {@link IOSubchannel}. 060 * A producer of {@link Output} events that simply creates its own buffers 061 * may produce and enqueue a large number of events that are not consumed 062 * as fast as they are produced. 063 * 064 * Using a buffer pool with a typical size of two synchronizes the 065 * producer and the consumers of events nicely. The producer 066 * (thread) holds one buffer and fills it, the consumer (thread) holds 067 * the other buffer and works with its content. If the producer finishes 068 * before the consumer, it has to stop until the consumer has processed 069 * previous event and releases the buffer. The consumer can continue 070 * without delay, because the data has already been prepared and enqueued 071 * as the next event. 072 * 073 * One of the biggest problems when using a pool can be to identify 074 * leaking buffers, i.e. buffers that are not properly returned to the pool. 075 * This implementation therefore tracks all created buffers 076 * (with a small overhead) and logs a warning if a buffer is no longer 077 * used (referenced) but has not been returned to the pool. If the 078 * log level for {@link ManagedBufferPool} is set to {@link Level#FINE}, 079 * the warning also includes a stack trace of the call to {@link #acquire()} 080 * that handed out the buffer. Providing this information in addition 081 * obviously requires a larger overhead and is therefore limited to the 082 * finer log levels. 083 * 084 * @param <W> the type of the wrapped (managed) buffer 085 * @param <T> the type of the content buffer that is wrapped 086 */ 087@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.NcssCount", 088 "PMD.EmptyCatchBlock", "PMD.CouplingBetweenObjects" }) 089public class ManagedBufferPool<W extends ManagedBuffer<T>, T extends Buffer> 090 implements BufferCollector<W> { 091 092 @SuppressWarnings("PMD.FieldNamingConventions") 093 protected final Logger logger 094 = Logger.getLogger(ManagedBufferPool.class.getName()); 095 096 private static long defaultDrainDelay = 1500; 097 private static long acquireWarningLimit = 1000; 098 099 private String name = Components.objectName(this); 100 private BiFunction<T, BufferCollector<W>, W> wrapper; 101 private Supplier<T> bufferFactory; 102 private BufferMonitor bufferMonitor; 103 private BlockingQueue<W> queue; 104 private int bufferSize = -1; 105 private int preservedBufs; 106 private int maximumBufs; 107 private AtomicInteger createdBufs; 108 private long drainDelay = -1; 109 private final AtomicReference<Timer> idleTimer 110 = new AtomicReference<>(null); 111 112 /** 113 * Sets the default delay after which buffers are removed from 114 * the pool. The default value is 1500ms. 115 * 116 * @param delay the delay in ms 117 */ 118 public static void setDefaultDrainDelay(long delay) { 119 defaultDrainDelay = delay; 120 } 121 122 /** 123 * Returns the default drain delay. 124 * 125 * @return the delay 126 */ 127 public static long defaultDrainDelay() { 128 return defaultDrainDelay; 129 } 130 131 /** 132 * Create a pool that contains a varying number of (wrapped) buffers. 133 * The pool is initially empty. When buffers are requested and none 134 * are left in the pool, new buffers are created up to the given 135 * upper limit. Recollected buffers are put in the pool until it holds 136 * the number specified by the lower threshold. Any additional 137 * recollected buffers are discarded. 138 * 139 * @param wrapper the function that converts buffers to managed buffers 140 * @param bufferFactory a function that creates a new buffer 141 * @param lowerThreshold the number of buffers kept in the pool 142 * @param upperLimit the maximum number of buffers 143 */ 144 public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper, 145 Supplier<T> bufferFactory, int lowerThreshold, int upperLimit) { 146 this.wrapper = wrapper; 147 this.bufferFactory = bufferFactory; 148 preservedBufs = lowerThreshold; 149 maximumBufs = upperLimit; 150 createdBufs = new AtomicInteger(); 151 queue = new ArrayBlockingQueue<>(lowerThreshold); 152 bufferMonitor = new BufferMonitor(upperLimit); 153 MBeanView.addPool(this); 154 } 155 156 /** 157 * Create a pool that keeps up to the given number of (wrapped) buffers 158 * in the pool and also uses that number as upper limit. 159 * 160 * @param wrapper the function that converts buffers to managed buffers 161 * @param bufferFactory a function that creates a new buffer 162 * @param buffers the number of buffers 163 */ 164 public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper, 165 Supplier<T> bufferFactory, int buffers) { 166 this(wrapper, bufferFactory, buffers, buffers); 167 } 168 169 /** 170 * Sets a name for this pool (to be used in status reports). 171 * 172 * @param name the name 173 * @return the object for easy chaining 174 */ 175 public ManagedBufferPool<W, T> setName(String name) { 176 this.name = name; 177 return this; 178 } 179 180 /** 181 * Returns the name of this pool. 182 * 183 * @return the name 184 */ 185 public String name() { 186 return name; 187 } 188 189 /** 190 * Sets the delay after which buffers are removed from 191 * the pool. 192 * 193 * @param delay the delay 194 * @return the object for easy chaining 195 */ 196 public ManagedBufferPool<W, T> setDrainDelay(long delay) { 197 this.drainDelay = delay; 198 return this; 199 } 200 201 private W createBuffer() { 202 createdBufs.incrementAndGet(); 203 W buffer = wrapper.apply(this.bufferFactory.get(), this); 204 bufferMonitor.put(buffer, new BufferProperties()); 205 bufferSize = buffer.capacity(); 206 return buffer; 207 } 208 209 /** 210 * Removes the buffer from the pool. 211 * 212 * @param buffer the buffer to remove 213 */ 214 @SuppressWarnings("PMD.GuardLogStatement") 215 private void removeBuffer(W buffer) { 216 createdBufs.decrementAndGet(); 217 if (bufferMonitor.remove(buffer) == null) { 218 if (logger.isLoggable(Level.FINE)) { 219 logger.log(Level.WARNING, 220 "Attempt to remove unknown buffer from pool.", 221 new Throwable()); 222 } else { 223 logger.warning("Attempt to remove unknown buffer from pool."); 224 } 225 } 226 } 227 228 /** 229 * Returns the size of the buffers managed by this pool. 230 * 231 * @return the buffer size 232 */ 233 public int bufferSize() { 234 if (bufferSize < 0) { 235 createBuffer().unlockBuffer(); 236 } 237 return bufferSize; 238 } 239 240 /** 241 * Acquires a managed buffer from the pool. If the pool is empty, 242 * waits for a buffer to become available. The acquired buffer has 243 * a lock count of one. 244 * 245 * @return the acquired buffer 246 * @throws InterruptedException if the current thread is interrupted 247 */ 248 @SuppressWarnings("PMD.GuardLogStatement") 249 public W acquire() throws InterruptedException { 250 // Stop draining, because we obviously need this kind of buffers 251 Optional.ofNullable(idleTimer.getAndSet(null)).ifPresent( 252 timer -> timer.cancel()); 253 if (createdBufs.get() < maximumBufs) { 254 // Haven't reached maximum, so if no buffer is queued, create one. 255 W buffer = queue.poll(); 256 if (buffer != null) { 257 buffer.lockBuffer(); 258 return buffer; 259 } 260 return createBuffer(); 261 } 262 // Wait for buffer to become available. 263 if (logger.isLoggable(Level.FINE)) { 264 // If configured, log message after waiting some time. 265 W buffer = queue.poll(acquireWarningLimit, TimeUnit.MILLISECONDS); 266 if (buffer != null) { 267 buffer.lockBuffer(); 268 return buffer; 269 } 270 logger.log(Level.FINE, 271 Thread.currentThread().getName() + " waiting > " 272 + acquireWarningLimit + "ms for buffer, while executing:", 273 new Throwable()); 274 } 275 W buffer = queue.take(); 276 buffer.lockBuffer(); 277 return buffer; 278 } 279 280 /** 281 * Re-adds the buffer to the pool. The buffer is cleared. 282 * 283 * @param buffer the buffer 284 * @see org.jgrapes.io.util.BufferCollector#recollect(org.jgrapes.io.util.ManagedBuffer) 285 */ 286 @Override 287 public void recollect(W buffer) { 288 if (queue.size() < preservedBufs) { 289 long effectiveDrainDelay 290 = drainDelay > 0 ? drainDelay : defaultDrainDelay; 291 if (effectiveDrainDelay > 0) { 292 // Enqueue 293 buffer.clear(); 294 queue.add(buffer); 295 Timer old = idleTimer.getAndSet(Components.schedule(this::drain, 296 Duration.ofMillis(effectiveDrainDelay))); 297 if (old != null) { 298 old.cancel(); 299 } 300 return; 301 } 302 } 303 // Discard 304 removeBuffer(buffer); 305 } 306 307 @SuppressWarnings({ "PMD.UnusedFormalParameter", 308 "PMD.UnusedPrivateMethod" }) 309 private void drain(Timer timer) { 310 idleTimer.set(null); 311 while (true) { 312 W buffer = queue.poll(); 313 if (buffer == null) { 314 break; 315 } 316 removeBuffer(buffer); 317 } 318 } 319 320 /* 321 * (non-Javadoc) 322 * 323 * @see java.lang.Object#toString() 324 */ 325 @Override 326 public String toString() { 327 StringBuilder builder = new StringBuilder(50); 328 builder.append("ManagedBufferPool ["); 329 if (queue != null) { 330 builder.append("queue=").append(queue); 331 } 332 builder.append(']'); 333 return builder.toString(); 334 } 335 336 /** 337 * Buffer properties. 338 */ 339 private class BufferProperties { 340 341 private final StackTraceElement[] createdBy; 342 343 /** 344 * Instantiates new buffer properties. 345 */ 346 public BufferProperties() { 347 if (logger.isLoggable(Level.FINE)) { 348 createdBy = Thread.currentThread().getStackTrace(); 349 } else { 350 createdBy = new StackTraceElement[0]; 351 } 352 } 353 354 /** 355 * Returns where the buffer was created. 356 * 357 * @return the stack trace element[] 358 */ 359 @SuppressWarnings("PMD.MethodReturnsInternalArray") 360 public StackTraceElement[] createdBy() { 361 return createdBy; 362 } 363 } 364 365 /** 366 * This is basically a WeakHashMap. We cannot use WeakHashMap 367 * because there is no "hook" into the collection of orphaned 368 * references, which is what we want here. 369 */ 370 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 371 private class BufferMonitor { 372 373 private final Entry<W>[] data; 374 private int indexMask; 375 private final ReferenceQueue<W> orphanedEntries 376 = new ReferenceQueue<>(); 377 378 /** 379 * An Entry. 380 * 381 * @param <B> the generic type 382 */ 383 private class Entry<B extends ManagedBuffer<?>> extends WeakReference<B> 384 implements Map.Entry<B, BufferProperties> { 385 /* default */ final int index; 386 /* default */ BufferProperties props; 387 /* default */ Entry<B> next; 388 389 /** 390 * Instantiates a new entry. 391 * 392 * @param buffer the buffer 393 * @param props the props 394 * @param queue the queue 395 * @param index the index 396 * @param next the next 397 */ 398 /* default */ Entry(B buffer, BufferProperties props, 399 ReferenceQueue<B> queue, int index, Entry<B> next) { 400 super(buffer, queue); 401 this.index = index; 402 this.props = props; 403 this.next = next; 404 } 405 406 @Override 407 public B getKey() { 408 return get(); 409 } 410 411 @Override 412 public BufferProperties getValue() { 413 return props; 414 } 415 416 @Override 417 public BufferProperties setValue(BufferProperties props) { 418 return this.props = props; 419 } 420 } 421 422 /** 423 * @param data 424 */ 425 @SuppressWarnings("unchecked") 426 public BufferMonitor(int maxBuffers) { 427 int lists = 1; 428 while (lists < maxBuffers) { 429 lists <<= 1; 430 indexMask = (indexMask << 1) + 1; 431 } 432 data = new Entry[lists]; 433 } 434 435 /** 436 * Put an entry in the map. 437 * 438 * @param buffer the buffer 439 * @param properties the properties 440 * @return the buffer properties 441 */ 442 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 443 public BufferProperties put(W buffer, BufferProperties properties) { 444 check(); 445 int index = buffer.hashCode() & indexMask; 446 synchronized (data) { 447 Entry<W> entry = data[index]; 448 Entry<W> prev = null; 449 while (true) { 450 if (entry == null) { 451 // Not found, create new. 452 entry = new Entry<>(buffer, properties, 453 orphanedEntries, index, null); 454 if (prev == null) { 455 data[index] = entry; // Is first. 456 } else { 457 prev.next = entry; // Is next (last). 458 } 459 return properties; 460 } 461 if (entry.getKey() == buffer) { // NOPMD 462 // Found, update. 463 BufferProperties old = entry.getValue(); 464 entry.setValue(properties); 465 return old; 466 } 467 prev = entry; 468 entry = entry.next; 469 } 470 } 471 } 472 473 /** 474 * Returns the properties for the given buffer. 475 * 476 * @param buffer the buffer 477 * @return the buffer properties 478 */ 479 @SuppressWarnings("unused") 480 public BufferProperties get(ManagedBuffer<?> buffer) { 481 check(); 482 int index = buffer.hashCode() & indexMask; 483 synchronized (data) { 484 Entry<W> entry = data[index]; 485 while (entry != null) { 486 if (entry.getKey() == buffer) { 487 return entry.getValue(); 488 } 489 entry = entry.next; 490 } 491 return null; 492 } 493 } 494 495 /** 496 * Removes the given buffer. 497 * 498 * @param buffer the buffer 499 * @return the buffer properties 500 */ 501 public BufferProperties remove(ManagedBuffer<?> buffer) { 502 check(); 503 int index = buffer.hashCode() & indexMask; 504 synchronized (data) { 505 Entry<W> entry = data[index]; 506 Entry<W> prev = null; 507 while (entry != null) { 508 if (entry.getKey() == buffer) { 509 if (prev == null) { 510 data[index] = entry.next; // Was first. 511 } else { 512 prev.next = entry.next; 513 } 514 return entry.getValue(); 515 } 516 prev = entry; 517 entry = entry.next; 518 } 519 return null; 520 } 521 } 522 523 @SuppressWarnings("PMD.CompareObjectsWithEquals") 524 private BufferProperties remove(Entry<W> toBeRemoved) { 525 synchronized (data) { 526 Entry<W> entry = data[toBeRemoved.index]; 527 Entry<W> prev = null; 528 while (entry != null) { 529 if (entry == toBeRemoved) { 530 if (prev == null) { 531 data[toBeRemoved.index] = entry.next; // Was first. 532 } else { 533 prev.next = entry.next; 534 } 535 return entry.getValue(); 536 } 537 prev = entry; 538 entry = entry.next; 539 } 540 return null; 541 } 542 } 543 544 private void check() { 545 while (true) { 546 @SuppressWarnings("unchecked") 547 Entry<W> entry = (Entry<W>) orphanedEntries.poll(); 548 if (entry == null) { 549 return; 550 } 551 // Managed buffer has not been properly recollected, fix. 552 BufferProperties props = remove(entry); 553 if (props == null) { 554 return; 555 } 556 createdBufs.decrementAndGet(); 557 // Create warning 558 if (logger.isLoggable(Level.WARNING)) { 559 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 560 final StringBuilder msg = new StringBuilder( 561 "Orphaned buffer from pool "); 562 msg.append(name()); 563 StackTraceElement[] trace = props.createdBy(); 564 if (trace != null) { 565 msg.append(", created"); 566 for (StackTraceElement e : trace) { 567 msg.append(System.lineSeparator()).append("\tat ") 568 .append(e.toString()); 569 } 570 } 571 logger.warning(msg.toString()); 572 } 573 } 574 } 575 } 576 577 /** 578 * An MBean interface for getting information about the managed 579 * buffer pools. Note that created buffer pools are tracked using 580 * weak references. Therefore, the MBean may report more pools than 581 * are really in use. 582 */ 583 public interface ManagedBufferPoolMXBean { 584 585 /** 586 * Information about a single managed pool. 587 */ 588 @SuppressWarnings("PMD.DataClass") 589 class PoolInfo { 590 private final int created; 591 private final int pooled; 592 private final int preserved; 593 private final int maximum; 594 private final int bufferSize; 595 596 /** 597 * Instantiates a new pool info. 598 * 599 * @param created the created 600 * @param pooled the pooled 601 * @param preserved the preserved 602 * @param maximum the maximum 603 * @param bufferSize the buffer size 604 */ 605 @ConstructorProperties({ "created", "pooled", 606 "preserved", "maximum", "bufferSize" }) 607 public PoolInfo(int created, int pooled, 608 int preserved, int maximum, int bufferSize) { 609 this.created = created; 610 this.pooled = pooled; 611 this.preserved = preserved; 612 this.maximum = maximum; 613 this.bufferSize = bufferSize; 614 } 615 616 /** 617 * The number of buffers created by this pool. 618 * 619 * @return the value 620 */ 621 public int getCreated() { 622 return created; 623 } 624 625 /** 626 * The number of buffers pooled (ready to be acquired). 627 * 628 * @return the value 629 */ 630 public int getPooled() { 631 return pooled; 632 } 633 634 /** 635 * The number of buffers preserved. 636 * 637 * @return the value 638 */ 639 public int getPreserved() { 640 return preserved; 641 } 642 643 /** 644 * The maximum number of buffers created by this pool. 645 * 646 * @return the value 647 */ 648 public int getMaximum() { 649 return maximum; 650 } 651 652 /** 653 * The size of the buffers in items. 654 * 655 * @return the buffer size 656 */ 657 public int getBufferSize() { 658 return bufferSize; 659 } 660 } 661 662 /** 663 * Three views on the existing pool. 664 */ 665 class PoolInfos { 666 private final SortedMap<String, PoolInfo> allPools; 667 private final SortedMap<String, PoolInfo> nonEmptyPools; 668 private final SortedMap<String, PoolInfo> usedPools; 669 670 /** 671 * Instantiates a new pool infos. 672 * 673 * @param pools the pools 674 */ 675 public PoolInfos(Set<ManagedBufferPool<?, ?>> pools) { 676 allPools = new TreeMap<>(); 677 nonEmptyPools = new TreeMap<>(); 678 usedPools = new TreeMap<>(); 679 680 @SuppressWarnings("PMD.UseConcurrentHashMap") 681 Map<String, Integer> dupsNext = new HashMap<>(); 682 for (ManagedBufferPool<?, ?> mbp : pools) { 683 String key = mbp.name(); 684 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 685 PoolInfo infos = new PoolInfo( 686 mbp.createdBufs.get(), mbp.queue.size(), 687 mbp.preservedBufs, mbp.maximumBufs, 688 mbp.bufferSize()); 689 if (allPools.containsKey(key) 690 || dupsNext.containsKey(key)) { 691 if (allPools.containsKey(key)) { 692 // Found first duplicate, rename 693 allPools.put(key + "#1", allPools.get(key)); 694 allPools.remove(key); 695 dupsNext.put(key, 2); 696 } 697 allPools.put(key + "#" 698 + dupsNext.put(key, dupsNext.get(key) + 1), infos); 699 } else { 700 allPools.put(key, infos); 701 } 702 } 703 for (Map.Entry<String, PoolInfo> e : allPools.entrySet()) { 704 PoolInfo infos = e.getValue(); 705 if (infos.getPooled() > 0) { 706 nonEmptyPools.put(e.getKey(), infos); 707 } 708 if (infos.getCreated() > 0) { 709 usedPools.put(e.getKey(), infos); 710 } 711 } 712 } 713 714 /** 715 * All pools. 716 * 717 * @return the all pools 718 */ 719 public SortedMap<String, PoolInfo> getAllPools() { 720 return allPools; 721 } 722 723 /** 724 * Pools that have at least managed buffer enqueued 725 * (ready to be acquired). 726 * 727 * @return the non empty pools 728 */ 729 public SortedMap<String, PoolInfo> getNonEmptyPools() { 730 return nonEmptyPools; 731 } 732 733 /** 734 * Pools that have at least one associated buffer 735 * (in pool or in use). 736 * 737 * @return the used pools 738 */ 739 public SortedMap<String, PoolInfo> getUsedPools() { 740 return usedPools; 741 } 742 } 743 744 /** 745 * Set the default drain delay. 746 * 747 * @param millis the drain delay in milli seconds 748 */ 749 void setDefaultDrainDelay(long millis); 750 751 /** 752 * Returns the drain delay in milli seconds. 753 * 754 * @return the value 755 */ 756 long getDefaultDrainDelay(); 757 758 /** 759 * Set the acquire warning limit. 760 * 761 * @param millis the limit 762 */ 763 void setAcquireWarningLimit(long millis); 764 765 /** 766 * Returns the acquire warning limit. 767 * 768 * @return the value 769 */ 770 long getAcquireWarningLimit(); 771 772 /** 773 * Informations about the pools. 774 * 775 * @return the map 776 */ 777 PoolInfos getPoolInfos(); 778 779 /** 780 * Summary information about the pooled buffers. 781 * 782 * @return the values 783 */ 784 IntSummaryStatistics getPooledPerPoolStatistics(); 785 786 /** 787 * Summary information about the created buffers. 788 * 789 * @return the values 790 */ 791 IntSummaryStatistics getCreatedPerPoolStatistics(); 792 } 793 794 /** 795 * The MBean view 796 */ 797 private static final class MBeanView implements ManagedBufferPoolMXBean { 798 799 private static Set<ManagedBufferPool<?, ?>> allPools 800 = Collections.synchronizedSet( 801 Collections.newSetFromMap(new WeakHashMap<>())); 802 803 /** 804 * Adds the pool. 805 * 806 * @param pool the pool 807 */ 808 public static void addPool(ManagedBufferPool<?, ?> pool) { 809 allPools.add(pool); 810 } 811 812 @Override 813 public void setDefaultDrainDelay(long millis) { 814 ManagedBufferPool.setDefaultDrainDelay(millis); 815 } 816 817 @Override 818 public long getDefaultDrainDelay() { 819 return defaultDrainDelay(); 820 } 821 822 @Override 823 public void setAcquireWarningLimit(long millis) { 824 acquireWarningLimit = millis; 825 } 826 827 @Override 828 public long getAcquireWarningLimit() { 829 return acquireWarningLimit; 830 } 831 832 @Override 833 public PoolInfos getPoolInfos() { 834 return new PoolInfos(allPools); 835 } 836 837 @Override 838 public IntSummaryStatistics getPooledPerPoolStatistics() { 839 return allPools.stream().collect( 840 Collectors.summarizingInt(mbp -> mbp.queue.size())); 841 } 842 843 @Override 844 public IntSummaryStatistics getCreatedPerPoolStatistics() { 845 return allPools.stream().collect( 846 Collectors.summarizingInt(mbp -> mbp.createdBufs.get())); 847 } 848 } 849 850 static { 851 try { 852 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 853 ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type=" 854 + ManagedBufferPool.class.getSimpleName() + "s"); 855 mbs.registerMBean(new MBeanView(), mxbeanName); 856 } catch (MalformedObjectNameException | InstanceAlreadyExistsException 857 | MBeanRegistrationException | NotCompliantMBeanException e) { 858 // Does not happen 859 } 860 } 861}