001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.beans.ConstructorProperties; 022import java.lang.management.ManagementFactory; 023import java.lang.ref.ReferenceQueue; 024import java.lang.ref.WeakReference; 025import java.nio.Buffer; 026import java.time.Duration; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.IntSummaryStatistics; 030import java.util.Map; 031import java.util.Optional; 032import java.util.Set; 033import java.util.SortedMap; 034import java.util.TreeMap; 035import java.util.WeakHashMap; 036import java.util.concurrent.ArrayBlockingQueue; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiFunction; 042import java.util.function.Supplier; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045import java.util.stream.Collectors; 046import javax.management.InstanceAlreadyExistsException; 047import javax.management.MBeanRegistrationException; 048import javax.management.MBeanServer; 049import javax.management.MalformedObjectNameException; 050import javax.management.NotCompliantMBeanException; 051import javax.management.ObjectName; 052import org.jgrapes.core.Components; 053import org.jgrapes.core.Components.Timer; 054import org.jgrapes.io.IOSubchannel; 055import org.jgrapes.io.events.Output; 056 057/** 058 * A queue based buffer pool. Using buffers from a pool is an important 059 * feature for limiting the computational resources for an {@link IOSubchannel}. 060 * A producer of {@link Output} events that simply creates its own buffers 061 * may produce and enqueue a large number of events that are not consumed 062 * as fast as they are produced. 063 * 064 * Using a buffer pool with a typical size of two synchronizes the 065 * producer and the consumers of events nicely. The producer 066 * (thread) holds one buffer and fills it, the consumer (thread) holds 067 * the other buffer and works with its content. If the producer finishes 068 * before the consumer, it has to stop until the consumer has processed 069 * previous event and releases the buffer. The consumer can continue 070 * without delay, because the data has already been prepared and enqueued 071 * as the next event. 072 * 073 * One of the biggest problems when using a pool can be to identify 074 * leaking buffers, i.e. buffers that are not properly returned to the pool. 075 * This implementation therefore tracks all created buffers 076 * (with a small overhead) and logs a warning if a buffer is no longer 077 * used (referenced) but has not been returned to the pool. If the 078 * log level for {@link ManagedBufferPool} is set to {@link Level#FINE}, 079 * the warning also includes a stack trace of the call to {@link #acquire()} 080 * that handed out the buffer. Providing this information in addition 081 * obviously requires a larger overhead and is therefore limited to the 082 * finer log levels. 083 * 084 * @param <W> the type of the wrapped (managed) buffer 085 * @param <T> the type of the content buffer that is wrapped 086 */ 087@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.NcssCount", 088 "PMD.EmptyCatchBlock" }) 089public class ManagedBufferPool<W extends ManagedBuffer<T>, T extends Buffer> 090 implements BufferCollector<W> { 091 092 @SuppressWarnings("PMD.FieldNamingConventions") 093 protected final Logger logger 094 = Logger.getLogger(ManagedBufferPool.class.getName()); 095 096 private static long defaultDrainDelay = 1500; 097 private static long acquireWarningLimit = 1000; 098 099 private String name = Components.objectName(this); 100 private BiFunction<T, BufferCollector<W>, W> wrapper; 101 private Supplier<T> bufferFactory; 102 private BufferMonitor bufferMonitor; 103 private BlockingQueue<W> queue; 104 private int bufferSize = -1; 105 private int preservedBufs; 106 private int maximumBufs; 107 private AtomicInteger createdBufs; 108 private long drainDelay = -1; 109 private final AtomicReference<Timer> idleTimer 110 = new AtomicReference<>(null); 111 112 /** 113 * Sets the default delay after which buffers are removed from 114 * the pool. The default value is 1500ms. 115 * 116 * @param delay the delay in ms 117 */ 118 public static void setDefaultDrainDelay(long delay) { 119 defaultDrainDelay = delay; 120 } 121 122 /** 123 * Returns the default drain delay. 124 * 125 * @return the delay 126 */ 127 public static long defaultDrainDelay() { 128 return defaultDrainDelay; 129 } 130 131 /** 132 * Create a pool that contains a varying number of (wrapped) buffers. 133 * The pool is initially empty. When buffers are requested and none 134 * are left in the pool, new buffers are created up to the given 135 * upper limit. Recollected buffers are put in the pool until it holds 136 * the number specified by the lower threshold. Any additional 137 * recollected buffers are discarded. 138 * 139 * @param wrapper the function that converts buffers to managed buffers 140 * @param bufferFactory a function that creates a new buffer 141 * @param lowerThreshold the number of buffers kept in the pool 142 * @param upperLimit the maximum number of buffers 143 */ 144 public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper, 145 Supplier<T> bufferFactory, int lowerThreshold, int upperLimit) { 146 this.wrapper = wrapper; 147 this.bufferFactory = bufferFactory; 148 preservedBufs = lowerThreshold; 149 maximumBufs = upperLimit; 150 createdBufs = new AtomicInteger(); 151 queue = new ArrayBlockingQueue<>(lowerThreshold); 152 bufferMonitor = new BufferMonitor(upperLimit); 153 MBeanView.addPool(this); 154 } 155 156 /** 157 * Create a pool that keeps up to the given number of (wrapped) buffers 158 * in the pool and also uses that number as upper limit. 159 * 160 * @param wrapper the function that converts buffers to managed buffers 161 * @param bufferFactory a function that creates a new buffer 162 * @param buffers the number of buffers 163 */ 164 public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper, 165 Supplier<T> bufferFactory, int buffers) { 166 this(wrapper, bufferFactory, buffers, buffers); 167 } 168 169 /** 170 * Sets a name for this pool (to be used in status reports). 171 * 172 * @param name the name 173 * @return the object for easy chaining 174 */ 175 public ManagedBufferPool<W, T> setName(String name) { 176 this.name = name; 177 return this; 178 } 179 180 /** 181 * Returns the name of this pool. 182 * 183 * @return the name 184 */ 185 public String name() { 186 return name; 187 } 188 189 /** 190 * Sets the delay after which buffers are removed from 191 * the pool. 192 * 193 * @param delay the delay 194 * @return the object for easy chaining 195 */ 196 public ManagedBufferPool<W, T> setDrainDelay(long delay) { 197 this.drainDelay = delay; 198 return this; 199 } 200 201 private W createBuffer() { 202 createdBufs.incrementAndGet(); 203 W buffer = wrapper.apply(this.bufferFactory.get(), this); 204 bufferMonitor.put(buffer, new BufferProperties()); 205 bufferSize = buffer.capacity(); 206 return buffer; 207 } 208 209 /** 210 * Removes the buffer from the pool. 211 * 212 * @param buffer the buffer to remove 213 */ 214 private void removeBuffer(W buffer) { 215 createdBufs.decrementAndGet(); 216 if (bufferMonitor.remove(buffer) == null) { 217 if (logger.isLoggable(Level.FINE)) { 218 logger.log(Level.WARNING, 219 "Attempt to remove unknown buffer from pool.", 220 new Throwable()); 221 } else { 222 logger.warning("Attempt to remove unknown buffer from pool."); 223 } 224 } 225 } 226 227 /** 228 * Returns the size of the buffers managed by this pool. 229 * 230 * @return the buffer size 231 */ 232 public int bufferSize() { 233 if (bufferSize < 0) { 234 createBuffer().unlockBuffer(); 235 } 236 return bufferSize; 237 } 238 239 /** 240 * Acquires a managed buffer from the pool. If the pool is empty, 241 * waits for a buffer to become available. The acquired buffer has 242 * a lock count of one. 243 * 244 * @return the acquired buffer 245 * @throws InterruptedException if the current thread is interrupted 246 */ 247 @SuppressWarnings("PMD.GuardLogStatement") 248 public W acquire() throws InterruptedException { 249 // Stop draining, because we obviously need this kind of buffers 250 Optional.ofNullable(idleTimer.getAndSet(null)).ifPresent( 251 timer -> timer.cancel()); 252 if (createdBufs.get() < maximumBufs) { 253 // Haven't reached maximum, so if no buffer is queued, create one. 254 W buffer = queue.poll(); 255 if (buffer != null) { 256 buffer.lockBuffer(); 257 return buffer; 258 } 259 return createBuffer(); 260 } 261 // Wait for buffer to become available. 262 if (logger.isLoggable(Level.FINE)) { 263 // If configured, log message after waiting some time. 264 W buffer = queue.poll(acquireWarningLimit, TimeUnit.MILLISECONDS); 265 if (buffer != null) { 266 buffer.lockBuffer(); 267 return buffer; 268 } 269 logger.log(Level.FINE, 270 Thread.currentThread().getName() + " waiting > " 271 + acquireWarningLimit + "ms for buffer, while executing:", 272 new Throwable()); 273 } 274 W buffer = queue.take(); 275 buffer.lockBuffer(); 276 return buffer; 277 } 278 279 /** 280 * Re-adds the buffer to the pool. The buffer is cleared. 281 * 282 * @param buffer the buffer 283 * @see org.jgrapes.io.util.BufferCollector#recollect(org.jgrapes.io.util.ManagedBuffer) 284 */ 285 @Override 286 public void recollect(W buffer) { 287 if (queue.size() < preservedBufs) { 288 long effectiveDrainDelay 289 = drainDelay > 0 ? drainDelay : defaultDrainDelay; 290 if (effectiveDrainDelay > 0) { 291 // Enqueue 292 buffer.clear(); 293 queue.add(buffer); 294 Timer old = idleTimer.getAndSet(Components.schedule(this::drain, 295 Duration.ofMillis(effectiveDrainDelay))); 296 if (old != null) { 297 old.cancel(); 298 } 299 return; 300 } 301 } 302 // Discard 303 removeBuffer(buffer); 304 } 305 306 @SuppressWarnings("PMD.UnusedFormalParameter") 307 private void drain(Timer timer) { 308 idleTimer.set(null); 309 while (true) { 310 W buffer = queue.poll(); 311 if (buffer == null) { 312 break; 313 } 314 removeBuffer(buffer); 315 } 316 } 317 318 /* 319 * (non-Javadoc) 320 * 321 * @see java.lang.Object#toString() 322 */ 323 @Override 324 public String toString() { 325 StringBuilder builder = new StringBuilder(50); 326 builder.append("ManagedBufferPool ["); 327 if (queue != null) { 328 builder.append("queue="); 329 builder.append(queue); 330 } 331 builder.append(']'); 332 return builder.toString(); 333 } 334 335 /** 336 * Buffer properties. 337 */ 338 private class BufferProperties { 339 340 private final StackTraceElement[] createdBy; 341 342 /** 343 * Instantiates new buffer properties. 344 */ 345 public BufferProperties() { 346 if (logger.isLoggable(Level.FINE)) { 347 createdBy = Thread.currentThread().getStackTrace(); 348 } else { 349 createdBy = new StackTraceElement[0]; 350 } 351 } 352 353 /** 354 * Returns where the buffer was created. 355 * 356 * @return the stack trace element[] 357 */ 358 @SuppressWarnings("PMD.MethodReturnsInternalArray") 359 public StackTraceElement[] createdBy() { 360 return createdBy; 361 } 362 } 363 364 /** 365 * This is basically a WeakHashMap. We cannot use WeakHashMap 366 * because there is no "hook" into the collection of orphaned 367 * references, which is what we want here. 368 */ 369 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 370 private class BufferMonitor { 371 372 private Entry<W>[] data; 373 private int indexMask; 374 private final ReferenceQueue<W> orphanedEntries 375 = new ReferenceQueue<>(); 376 377 /** 378 * An Entry. 379 * 380 * @param <B> the generic type 381 */ 382 private class Entry<B extends ManagedBuffer<?>> extends WeakReference<B> 383 implements Map.Entry<B, BufferProperties> { 384 /* default */ final int index; 385 /* default */ BufferProperties props; 386 /* default */ Entry<B> next; 387 388 /** 389 * Instantiates a new entry. 390 * 391 * @param buffer the buffer 392 * @param props the props 393 * @param queue the queue 394 * @param index the index 395 * @param next the next 396 */ 397 /* default */ Entry(B buffer, BufferProperties props, 398 ReferenceQueue<B> queue, int index, Entry<B> next) { 399 super(buffer, queue); 400 this.index = index; 401 this.props = props; 402 this.next = next; 403 } 404 405 @Override 406 public B getKey() { 407 return get(); 408 } 409 410 @Override 411 public BufferProperties getValue() { 412 return props; 413 } 414 415 @Override 416 public BufferProperties setValue(BufferProperties props) { 417 return this.props = props; 418 } 419 } 420 421 /** 422 * @param data 423 */ 424 @SuppressWarnings("unchecked") 425 public BufferMonitor(int maxBuffers) { 426 int lists = 1; 427 while (lists < maxBuffers) { 428 lists <<= 1; 429 indexMask = (indexMask << 1) + 1; 430 } 431 data = new Entry[lists]; 432 } 433 434 /** 435 * Put an entry in the map. 436 * 437 * @param buffer the buffer 438 * @param properties the properties 439 * @return the buffer properties 440 */ 441 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 442 public BufferProperties put(W buffer, BufferProperties properties) { 443 check(); 444 int index = buffer.hashCode() & indexMask; 445 synchronized (data) { 446 Entry<W> entry = data[index]; 447 Entry<W> prev = null; 448 while (true) { 449 if (entry == null) { 450 // Not found, create new. 451 entry = new Entry<>(buffer, properties, 452 orphanedEntries, index, null); 453 if (prev == null) { 454 data[index] = entry; // Is first. 455 } else { 456 prev.next = entry; // Is next (last). 457 } 458 return properties; 459 } 460 if (entry.getKey() == buffer) { 461 // Found, update. 462 BufferProperties old = entry.getValue(); 463 entry.setValue(properties); 464 return old; 465 } 466 prev = entry; 467 entry = entry.next; 468 } 469 } 470 } 471 472 /** 473 * Returns the properties for the given buffer. 474 * 475 * @param buffer the buffer 476 * @return the buffer properties 477 */ 478 @SuppressWarnings("unused") 479 public BufferProperties get(ManagedBuffer<?> buffer) { 480 check(); 481 int index = buffer.hashCode() & indexMask; 482 synchronized (data) { 483 Entry<W> entry = data[index]; 484 while (entry != null) { 485 if (entry.getKey() == buffer) { 486 return entry.getValue(); 487 } 488 entry = entry.next; 489 } 490 return null; 491 } 492 } 493 494 /** 495 * Removes the given buffer. 496 * 497 * @param buffer the buffer 498 * @return the buffer properties 499 */ 500 public BufferProperties remove(ManagedBuffer<?> buffer) { 501 check(); 502 int index = buffer.hashCode() & indexMask; 503 synchronized (data) { 504 Entry<W> entry = data[index]; 505 Entry<W> prev = null; 506 while (entry != null) { 507 if (entry.getKey() == buffer) { 508 if (prev == null) { 509 data[index] = entry.next; // Was first. 510 } else { 511 prev.next = entry.next; 512 } 513 return entry.getValue(); 514 } 515 prev = entry; 516 entry = entry.next; 517 } 518 return null; 519 } 520 } 521 522 @SuppressWarnings("PMD.CompareObjectsWithEquals") 523 private BufferProperties remove(Entry<W> toBeRemoved) { 524 synchronized (data) { 525 Entry<W> entry = data[toBeRemoved.index]; 526 Entry<W> prev = null; 527 while (entry != null) { 528 if (entry == toBeRemoved) { 529 if (prev == null) { 530 data[toBeRemoved.index] = entry.next; // Was first. 531 } else { 532 prev.next = entry.next; 533 } 534 return entry.getValue(); 535 } 536 prev = entry; 537 entry = entry.next; 538 } 539 return null; 540 } 541 } 542 543 private void check() { 544 while (true) { 545 @SuppressWarnings("unchecked") 546 Entry<W> entry = (Entry<W>) orphanedEntries.poll(); 547 if (entry == null) { 548 return; 549 } 550 // Managed buffer has not been properly recollected, fix. 551 BufferProperties props = remove(entry); 552 if (props == null) { 553 return; 554 } 555 createdBufs.decrementAndGet(); 556 // Create warning 557 if (logger.isLoggable(Level.WARNING)) { 558 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 559 final StringBuilder msg = new StringBuilder( 560 "Orphaned buffer from pool "); 561 msg.append(name()); 562 StackTraceElement[] trace = props.createdBy(); 563 if (trace != null) { 564 msg.append(", created"); 565 for (StackTraceElement e : trace) { 566 msg.append(System.lineSeparator()); 567 msg.append("\tat "); 568 msg.append(e.toString()); 569 } 570 } 571 logger.warning(msg.toString()); 572 } 573 } 574 } 575 } 576 577 /** 578 * An MBean interface for getting information about the managed 579 * buffer pools. Note that created buffer pools are tracked using 580 * weak references. Therefore, the MBean may report more pools than 581 * are really in use. 582 */ 583 public interface ManagedBufferPoolMXBean { 584 585 /** 586 * Information about a single managed pool. 587 */ 588 @SuppressWarnings("PMD.DataClass") 589 class PoolInfo { 590 private final int created; 591 private final int pooled; 592 private final int preserved; 593 private final int maximum; 594 private final int bufferSize; 595 596 /** 597 * Instantiates a new pool info. 598 * 599 * @param created the created 600 * @param pooled the pooled 601 * @param preserved the preserved 602 * @param maximum the maximum 603 * @param bufferSize the buffer size 604 */ 605 @ConstructorProperties({ "created", "pooled", 606 "preserved", "maximum", "bufferSize" }) 607 public PoolInfo(int created, int pooled, 608 int preserved, int maximum, int bufferSize) { 609 this.created = created; 610 this.pooled = pooled; 611 this.preserved = preserved; 612 this.maximum = maximum; 613 this.bufferSize = bufferSize; 614 } 615 616 /** 617 * The number of buffers created by this pool. 618 * 619 * @return the value 620 */ 621 public int getCreated() { 622 return created; 623 } 624 625 /** 626 * The number of buffers pooled (ready to be acquired). 627 * 628 * @return the value 629 */ 630 public int getPooled() { 631 return pooled; 632 } 633 634 /** 635 * The number of buffers preserved. 636 * 637 * @return the value 638 */ 639 public int getPreserved() { 640 return preserved; 641 } 642 643 /** 644 * The maximum number of buffers created by this pool. 645 * 646 * @return the value 647 */ 648 public int getMaximum() { 649 return maximum; 650 } 651 652 /** 653 * The size of the buffers in items. 654 * 655 * @return the buffer size 656 */ 657 public int getBufferSize() { 658 return bufferSize; 659 } 660 } 661 662 /** 663 * Three views on the existing pool. 664 */ 665 class PoolInfos { 666 private final SortedMap<String, PoolInfo> allPools; 667 private final SortedMap<String, PoolInfo> nonEmptyPools; 668 private final SortedMap<String, PoolInfo> usedPools; 669 670 /** 671 * Instantiates a new pool infos. 672 * 673 * @param pools the pools 674 */ 675 public PoolInfos(Set<ManagedBufferPool<?, ?>> pools) { 676 allPools = new TreeMap<>(); 677 nonEmptyPools = new TreeMap<>(); 678 usedPools = new TreeMap<>(); 679 680 @SuppressWarnings("PMD.UseConcurrentHashMap") 681 Map<String, Integer> dupsNext = new HashMap<>(); 682 for (ManagedBufferPool<?, ?> mbp : pools) { 683 String key = mbp.name(); 684 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 685 PoolInfo infos = new PoolInfo( 686 mbp.createdBufs.get(), mbp.queue.size(), 687 mbp.preservedBufs, mbp.maximumBufs, 688 mbp.bufferSize()); 689 if (allPools.containsKey(key) 690 || dupsNext.containsKey(key)) { 691 if (allPools.containsKey(key)) { 692 // Found first duplicate, rename 693 allPools.put(key + "#1", allPools.get(key)); 694 allPools.remove(key); 695 dupsNext.put(key, 2); 696 } 697 allPools.put(key + "#" 698 + (dupsNext.put(key, dupsNext.get(key) + 1)), 699 infos); 700 } else { 701 allPools.put(key, infos); 702 } 703 } 704 for (Map.Entry<String, PoolInfo> e : allPools.entrySet()) { 705 PoolInfo infos = e.getValue(); 706 if (infos.getPooled() > 0) { 707 nonEmptyPools.put(e.getKey(), infos); 708 } 709 if (infos.getCreated() > 0) { 710 usedPools.put(e.getKey(), infos); 711 } 712 } 713 } 714 715 /** 716 * All pools. 717 * 718 * @return the all pools 719 */ 720 public SortedMap<String, PoolInfo> getAllPools() { 721 return allPools; 722 } 723 724 /** 725 * Pools that have at least managed buffer enqueued 726 * (ready to be acquired). 727 * 728 * @return the non empty pools 729 */ 730 public SortedMap<String, PoolInfo> getNonEmptyPools() { 731 return nonEmptyPools; 732 } 733 734 /** 735 * Pools that have at least one associated buffer 736 * (in pool or in use). 737 * 738 * @return the used pools 739 */ 740 public SortedMap<String, PoolInfo> getUsedPools() { 741 return usedPools; 742 } 743 } 744 745 /** 746 * Set the default drain delay. 747 * 748 * @param millis the drain delay in milli seconds 749 */ 750 void setDefaultDrainDelay(long millis); 751 752 /** 753 * Returns the drain delay in milli seconds. 754 * 755 * @return the value 756 */ 757 long getDefaultDrainDelay(); 758 759 /** 760 * Set the acquire warning limit. 761 * 762 * @param millis the limit 763 */ 764 void setAcquireWarningLimit(long millis); 765 766 /** 767 * Returns the acquire warning limit. 768 * 769 * @return the value 770 */ 771 long getAcquireWarningLimit(); 772 773 /** 774 * Informations about the pools. 775 * 776 * @return the map 777 */ 778 PoolInfos getPoolInfos(); 779 780 /** 781 * Summary information about the pooled buffers. 782 * 783 * @return the values 784 */ 785 IntSummaryStatistics getPooledPerPoolStatistics(); 786 787 /** 788 * Summary information about the created buffers. 789 * 790 * @return the values 791 */ 792 IntSummaryStatistics getCreatedPerPoolStatistics(); 793 } 794 795 /** 796 * The MBean view 797 */ 798 private static class MBeanView implements ManagedBufferPoolMXBean { 799 800 private static Set<ManagedBufferPool<?, ?>> allPools 801 = Collections.synchronizedSet( 802 Collections.newSetFromMap( 803 new WeakHashMap<ManagedBufferPool<?, ?>, Boolean>())); 804 805 /** 806 * Adds the pool. 807 * 808 * @param pool the pool 809 */ 810 public static void addPool(ManagedBufferPool<?, ?> pool) { 811 allPools.add(pool); 812 } 813 814 @Override 815 public void setDefaultDrainDelay(long millis) { 816 ManagedBufferPool.setDefaultDrainDelay(millis); 817 } 818 819 @Override 820 public long getDefaultDrainDelay() { 821 return ManagedBufferPool.defaultDrainDelay(); 822 } 823 824 @Override 825 public void setAcquireWarningLimit(long millis) { 826 ManagedBufferPool.acquireWarningLimit = millis; 827 } 828 829 @Override 830 public long getAcquireWarningLimit() { 831 return ManagedBufferPool.acquireWarningLimit; 832 } 833 834 @Override 835 public PoolInfos getPoolInfos() { 836 return new PoolInfos(allPools); 837 } 838 839 @Override 840 public IntSummaryStatistics getPooledPerPoolStatistics() { 841 return allPools.stream().collect( 842 Collectors.summarizingInt(mbp -> mbp.queue.size())); 843 } 844 845 @Override 846 public IntSummaryStatistics getCreatedPerPoolStatistics() { 847 return allPools.stream().collect( 848 Collectors.summarizingInt(mbp -> mbp.createdBufs.get())); 849 } 850 } 851 852 static { 853 try { 854 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 855 ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type=" 856 + ManagedBufferPool.class.getSimpleName() + "s"); 857 mbs.registerMBean(new MBeanView(), mxbeanName); 858 } catch (MalformedObjectNameException | InstanceAlreadyExistsException 859 | MBeanRegistrationException | NotCompliantMBeanException e) { 860 // Does not happen 861 } 862 } 863}