001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.beans.ConstructorProperties;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.ReferenceQueue;
024import java.lang.ref.WeakReference;
025import java.nio.Buffer;
026import java.time.Duration;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.IntSummaryStatistics;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.SortedMap;
034import java.util.TreeMap;
035import java.util.WeakHashMap;
036import java.util.concurrent.ArrayBlockingQueue;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiFunction;
042import java.util.function.Supplier;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045import java.util.stream.Collectors;
046import javax.management.InstanceAlreadyExistsException;
047import javax.management.MBeanRegistrationException;
048import javax.management.MBeanServer;
049import javax.management.MalformedObjectNameException;
050import javax.management.NotCompliantMBeanException;
051import javax.management.ObjectName;
052import org.jgrapes.core.Components;
053import org.jgrapes.core.Components.Timer;
054import org.jgrapes.io.IOSubchannel;
055import org.jgrapes.io.events.Output;
056
057/**
058 * A queue based buffer pool. Using buffers from a pool is an important
059 * feature for limiting the computational resources for an {@link IOSubchannel}.
060 * A producer of {@link Output} events that simply creates its own buffers
061 * may produce and enqueue a large number of events that are not consumed
062 * as fast as they are produced. 
063 * 
064 * Using a buffer pool with a typical size of two synchronizes the 
065 * producer and the consumers of events nicely. The producer
066 * (thread) holds one buffer and fills it, the consumer (thread) holds 
067 * the other buffer and works with its content. If the producer finishes
068 * before the consumer, it has to stop until the consumer has processed 
069 * previous event and releases the buffer. The consumer can continue
070 * without delay, because the data has already been prepared and enqueued
071 * as the next event.
072 * 
073 * One of the biggest problems when using a pool can be to identify 
074 * leaking buffers, i.e. buffers that are not properly returned to the pool.
075 * This implementation therefore tracks all created buffers 
076 * (with a small overhead) and logs a warning if a buffer is no longer
077 * used (referenced) but has not been returned to the pool. If the
078 * log level for {@link ManagedBufferPool} is set to {@link Level#FINE},
079 * the warning also includes a stack trace of the call to {@link #acquire()}
080 * that handed out the buffer. Providing this information in addition 
081 * obviously requires a larger overhead and is therefore limited to the
082 * finer log levels.
083 *
084 * @param <W> the type of the wrapped (managed) buffer
085 * @param <T> the type of the content buffer that is wrapped
086 */
087@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.NcssCount",
088    "PMD.EmptyCatchBlock" })
089public class ManagedBufferPool<W extends ManagedBuffer<T>, T extends Buffer>
090        implements BufferCollector<W> {
091
092    @SuppressWarnings("PMD.FieldNamingConventions")
093    protected final Logger logger
094        = Logger.getLogger(ManagedBufferPool.class.getName());
095
096    private static long defaultDrainDelay = 1500;
097    private static long acquireWarningLimit = 1000;
098
099    private String name = Components.objectName(this);
100    private BiFunction<T, BufferCollector<W>, W> wrapper;
101    private Supplier<T> bufferFactory;
102    private BufferMonitor bufferMonitor;
103    private BlockingQueue<W> queue;
104    private int bufferSize = -1;
105    private int preservedBufs;
106    private int maximumBufs;
107    private AtomicInteger createdBufs;
108    private long drainDelay = -1;
109    private final AtomicReference<Timer> idleTimer
110        = new AtomicReference<>(null);
111
112    /**
113     * Sets the default delay after which buffers are removed from
114     * the pool. The default value is 1500ms.
115     * 
116     * @param delay the delay in ms
117     */
118    public static void setDefaultDrainDelay(long delay) {
119        defaultDrainDelay = delay;
120    }
121
122    /**
123     * Returns the default drain delay.
124     * 
125     * @return the delay
126     */
127    public static long defaultDrainDelay() {
128        return defaultDrainDelay;
129    }
130
131    /**
132     * Create a pool that contains a varying number of (wrapped) buffers.
133     * The pool is initially empty. When buffers are requested and none 
134     * are left in the pool, new buffers are created up to the given 
135     * upper limit. Recollected buffers are put in the pool until it holds
136     * the number specified by the lower threshold. Any additional 
137     * recollected buffers are discarded. 
138     * 
139     * @param wrapper the function that converts buffers to managed buffers
140     * @param bufferFactory a function that creates a new buffer
141     * @param lowerThreshold the number of buffers kept in the pool
142     * @param upperLimit the maximum number of buffers
143     */
144    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
145            Supplier<T> bufferFactory, int lowerThreshold, int upperLimit) {
146        this.wrapper = wrapper;
147        this.bufferFactory = bufferFactory;
148        preservedBufs = lowerThreshold;
149        maximumBufs = upperLimit;
150        createdBufs = new AtomicInteger();
151        queue = new ArrayBlockingQueue<>(lowerThreshold);
152        bufferMonitor = new BufferMonitor(upperLimit);
153        MBeanView.addPool(this);
154    }
155
156    /**
157     * Create a pool that keeps up to the given number of (wrapped) buffers
158     * in the pool and also uses that number as upper limit.
159     * 
160     * @param wrapper the function that converts buffers to managed buffers
161     * @param bufferFactory a function that creates a new buffer
162     * @param buffers the number of buffers
163     */
164    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
165            Supplier<T> bufferFactory, int buffers) {
166        this(wrapper, bufferFactory, buffers, buffers);
167    }
168
169    /**
170     * Sets a name for this pool (to be used in status reports).
171     * 
172     * @param name the name
173     * @return the object for easy chaining
174     */
175    public ManagedBufferPool<W, T> setName(String name) {
176        this.name = name;
177        return this;
178    }
179
180    /**
181     * Returns the name of this pool.
182     * 
183     * @return the name
184     */
185    public String name() {
186        return name;
187    }
188
189    /**
190     * Sets the delay after which buffers are removed from
191     * the pool.
192     * 
193     * @param delay the delay
194     * @return the object for easy chaining
195     */
196    public ManagedBufferPool<W, T> setDrainDelay(long delay) {
197        this.drainDelay = delay;
198        return this;
199    }
200
201    private W createBuffer() {
202        createdBufs.incrementAndGet();
203        W buffer = wrapper.apply(this.bufferFactory.get(), this);
204        bufferMonitor.put(buffer, new BufferProperties());
205        bufferSize = buffer.capacity();
206        return buffer;
207    }
208
209    /**
210     * Removes the buffer from the pool.
211     * 
212     * @param buffer the buffer to remove
213     */
214    private void removeBuffer(W buffer) {
215        createdBufs.decrementAndGet();
216        if (bufferMonitor.remove(buffer) == null) {
217            if (logger.isLoggable(Level.FINE)) {
218                logger.log(Level.WARNING,
219                    "Attempt to remove unknown buffer from pool.",
220                    new Throwable());
221            } else {
222                logger.warning("Attempt to remove unknown buffer from pool.");
223            }
224        }
225    }
226
227    /**
228     * Returns the size of the buffers managed by this pool.
229     * 
230     * @return the buffer size
231     */
232    public int bufferSize() {
233        if (bufferSize < 0) {
234            createBuffer().unlockBuffer();
235        }
236        return bufferSize;
237    }
238
239    /**
240     * Acquires a managed buffer from the pool. If the pool is empty,
241     * waits for a buffer to become available. The acquired buffer has 
242     * a lock count of one.
243     * 
244     * @return the acquired buffer
245     * @throws InterruptedException if the current thread is interrupted
246     */
247    @SuppressWarnings("PMD.GuardLogStatement")
248    public W acquire() throws InterruptedException {
249        // Stop draining, because we obviously need this kind of buffers
250        Optional.ofNullable(idleTimer.getAndSet(null)).ifPresent(
251            timer -> timer.cancel());
252        if (createdBufs.get() < maximumBufs) {
253            // Haven't reached maximum, so if no buffer is queued, create one.
254            W buffer = queue.poll();
255            if (buffer != null) {
256                buffer.lockBuffer();
257                return buffer;
258            }
259            return createBuffer();
260        }
261        // Wait for buffer to become available.
262        if (logger.isLoggable(Level.FINE)) {
263            // If configured, log message after waiting some time.
264            W buffer = queue.poll(acquireWarningLimit, TimeUnit.MILLISECONDS);
265            if (buffer != null) {
266                buffer.lockBuffer();
267                return buffer;
268            }
269            logger.log(Level.FINE,
270                Thread.currentThread().getName() + " waiting > "
271                    + acquireWarningLimit + "ms for buffer, while executing:",
272                new Throwable());
273        }
274        W buffer = queue.take();
275        buffer.lockBuffer();
276        return buffer;
277    }
278
279    /**
280     * Re-adds the buffer to the pool. The buffer is cleared.
281     *
282     * @param buffer the buffer
283     * @see org.jgrapes.io.util.BufferCollector#recollect(org.jgrapes.io.util.ManagedBuffer)
284     */
285    @Override
286    public void recollect(W buffer) {
287        if (queue.size() < preservedBufs) {
288            long effectiveDrainDelay
289                = drainDelay > 0 ? drainDelay : defaultDrainDelay;
290            if (effectiveDrainDelay > 0) {
291                // Enqueue
292                buffer.clear();
293                queue.add(buffer);
294                Timer old = idleTimer.getAndSet(Components.schedule(this::drain,
295                    Duration.ofMillis(effectiveDrainDelay)));
296                if (old != null) {
297                    old.cancel();
298                }
299                return;
300            }
301        }
302        // Discard
303        removeBuffer(buffer);
304    }
305
306    @SuppressWarnings("PMD.UnusedFormalParameter")
307    private void drain(Timer timer) {
308        idleTimer.set(null);
309        while (true) {
310            W buffer = queue.poll();
311            if (buffer == null) {
312                break;
313            }
314            removeBuffer(buffer);
315        }
316    }
317
318    /*
319     * (non-Javadoc)
320     * 
321     * @see java.lang.Object#toString()
322     */
323    @Override
324    public String toString() {
325        StringBuilder builder = new StringBuilder(50);
326        builder.append("ManagedBufferPool [");
327        if (queue != null) {
328            builder.append("queue=");
329            builder.append(queue);
330        }
331        builder.append(']');
332        return builder.toString();
333    }
334
335    /**
336     * Buffer properties.
337     */
338    private class BufferProperties {
339
340        private final StackTraceElement[] createdBy;
341
342        /**
343         * Instantiates new buffer properties.
344         */
345        public BufferProperties() {
346            if (logger.isLoggable(Level.FINE)) {
347                createdBy = Thread.currentThread().getStackTrace();
348            } else {
349                createdBy = new StackTraceElement[0];
350            }
351        }
352
353        /**
354         * Returns where the buffer was created.
355         *
356         * @return the stack trace element[]
357         */
358        @SuppressWarnings("PMD.MethodReturnsInternalArray")
359        public StackTraceElement[] createdBy() {
360            return createdBy;
361        }
362    }
363
364    /**
365     * This is basically a WeakHashMap. We cannot use WeakHashMap
366     * because there is no "hook" into the collection of orphaned
367     * references, which is what we want here.
368     */
369    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
370    private class BufferMonitor {
371
372        private Entry<W>[] data;
373        private int indexMask;
374        private final ReferenceQueue<W> orphanedEntries
375            = new ReferenceQueue<>();
376
377        /**
378         * An Entry.
379         *
380         * @param <B> the generic type
381         */
382        private class Entry<B extends ManagedBuffer<?>> extends WeakReference<B>
383                implements Map.Entry<B, BufferProperties> {
384            /* default */ final int index;
385            /* default */ BufferProperties props;
386            /* default */ Entry<B> next;
387
388            /**
389             * Instantiates a new entry.
390             *
391             * @param buffer the buffer
392             * @param props the props
393             * @param queue the queue
394             * @param index the index
395             * @param next the next
396             */
397            /* default */ Entry(B buffer, BufferProperties props,
398                    ReferenceQueue<B> queue, int index, Entry<B> next) {
399                super(buffer, queue);
400                this.index = index;
401                this.props = props;
402                this.next = next;
403            }
404
405            @Override
406            public B getKey() {
407                return get();
408            }
409
410            @Override
411            public BufferProperties getValue() {
412                return props;
413            }
414
415            @Override
416            public BufferProperties setValue(BufferProperties props) {
417                return this.props = props;
418            }
419        }
420
421        /**
422         * @param data
423         */
424        @SuppressWarnings("unchecked")
425        public BufferMonitor(int maxBuffers) {
426            int lists = 1;
427            while (lists < maxBuffers) {
428                lists <<= 1;
429                indexMask = (indexMask << 1) + 1;
430            }
431            data = new Entry[lists];
432        }
433
434        /**
435         * Put an entry in the map.
436         *
437         * @param buffer the buffer
438         * @param properties the properties
439         * @return the buffer properties
440         */
441        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
442        public BufferProperties put(W buffer, BufferProperties properties) {
443            check();
444            int index = buffer.hashCode() & indexMask;
445            synchronized (data) {
446                Entry<W> entry = data[index];
447                Entry<W> prev = null;
448                while (true) {
449                    if (entry == null) {
450                        // Not found, create new.
451                        entry = new Entry<>(buffer, properties,
452                            orphanedEntries, index, null);
453                        if (prev == null) {
454                            data[index] = entry; // Is first.
455                        } else {
456                            prev.next = entry; // Is next (last).
457                        }
458                        return properties;
459                    }
460                    if (entry.getKey() == buffer) {
461                        // Found, update.
462                        BufferProperties old = entry.getValue();
463                        entry.setValue(properties);
464                        return old;
465                    }
466                    prev = entry;
467                    entry = entry.next;
468                }
469            }
470        }
471
472        /**
473         * Returns the properties for the given buffer.
474         *
475         * @param buffer the buffer
476         * @return the buffer properties
477         */
478        @SuppressWarnings("unused")
479        public BufferProperties get(ManagedBuffer<?> buffer) {
480            check();
481            int index = buffer.hashCode() & indexMask;
482            synchronized (data) {
483                Entry<W> entry = data[index];
484                while (entry != null) {
485                    if (entry.getKey() == buffer) {
486                        return entry.getValue();
487                    }
488                    entry = entry.next;
489                }
490                return null;
491            }
492        }
493
494        /**
495         * Removes the given buffer.
496         *
497         * @param buffer the buffer
498         * @return the buffer properties
499         */
500        public BufferProperties remove(ManagedBuffer<?> buffer) {
501            check();
502            int index = buffer.hashCode() & indexMask;
503            synchronized (data) {
504                Entry<W> entry = data[index];
505                Entry<W> prev = null;
506                while (entry != null) {
507                    if (entry.getKey() == buffer) {
508                        if (prev == null) {
509                            data[index] = entry.next; // Was first.
510                        } else {
511                            prev.next = entry.next;
512                        }
513                        return entry.getValue();
514                    }
515                    prev = entry;
516                    entry = entry.next;
517                }
518                return null;
519            }
520        }
521
522        @SuppressWarnings("PMD.CompareObjectsWithEquals")
523        private BufferProperties remove(Entry<W> toBeRemoved) {
524            synchronized (data) {
525                Entry<W> entry = data[toBeRemoved.index];
526                Entry<W> prev = null;
527                while (entry != null) {
528                    if (entry == toBeRemoved) {
529                        if (prev == null) {
530                            data[toBeRemoved.index] = entry.next; // Was first.
531                        } else {
532                            prev.next = entry.next;
533                        }
534                        return entry.getValue();
535                    }
536                    prev = entry;
537                    entry = entry.next;
538                }
539                return null;
540            }
541        }
542
543        private void check() {
544            while (true) {
545                @SuppressWarnings("unchecked")
546                Entry<W> entry = (Entry<W>) orphanedEntries.poll();
547                if (entry == null) {
548                    return;
549                }
550                // Managed buffer has not been properly recollected, fix.
551                BufferProperties props = remove(entry);
552                if (props == null) {
553                    return;
554                }
555                createdBufs.decrementAndGet();
556                // Create warning
557                if (logger.isLoggable(Level.WARNING)) {
558                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
559                    final StringBuilder msg = new StringBuilder(
560                        "Orphaned buffer from pool ");
561                    msg.append(name());
562                    StackTraceElement[] trace = props.createdBy();
563                    if (trace != null) {
564                        msg.append(", created");
565                        for (StackTraceElement e : trace) {
566                            msg.append(System.lineSeparator());
567                            msg.append("\tat ");
568                            msg.append(e.toString());
569                        }
570                    }
571                    logger.warning(msg.toString());
572                }
573            }
574        }
575    }
576
577    /**
578     * An MBean interface for getting information about the managed
579     * buffer pools. Note that created buffer pools are tracked using
580     * weak references. Therefore, the MBean may report more pools than
581     * are really in use. 
582     */
583    public interface ManagedBufferPoolMXBean {
584
585        /**
586         * Information about a single managed pool.
587         */
588        @SuppressWarnings("PMD.DataClass")
589        class PoolInfo {
590            private final int created;
591            private final int pooled;
592            private final int preserved;
593            private final int maximum;
594            private final int bufferSize;
595
596            /**
597             * Instantiates a new pool info.
598             *
599             * @param created the created
600             * @param pooled the pooled
601             * @param preserved the preserved
602             * @param maximum the maximum
603             * @param bufferSize the buffer size
604             */
605            @ConstructorProperties({ "created", "pooled",
606                "preserved", "maximum", "bufferSize" })
607            public PoolInfo(int created, int pooled,
608                    int preserved, int maximum, int bufferSize) {
609                this.created = created;
610                this.pooled = pooled;
611                this.preserved = preserved;
612                this.maximum = maximum;
613                this.bufferSize = bufferSize;
614            }
615
616            /**
617             * The number of buffers created by this pool.
618             * 
619             * @return the value
620             */
621            public int getCreated() {
622                return created;
623            }
624
625            /**
626             * The number of buffers pooled (ready to be acquired).
627             * 
628             * @return the value
629             */
630            public int getPooled() {
631                return pooled;
632            }
633
634            /**
635             * The number of buffers preserved.
636             * 
637             * @return the value
638             */
639            public int getPreserved() {
640                return preserved;
641            }
642
643            /**
644             * The maximum number of buffers created by this pool.
645             * 
646             * @return the value
647             */
648            public int getMaximum() {
649                return maximum;
650            }
651
652            /**
653             * The size of the buffers in items.
654             * 
655             * @return the buffer size
656             */
657            public int getBufferSize() {
658                return bufferSize;
659            }
660        }
661
662        /**
663         * Three views on the existing pool.
664         */
665        class PoolInfos {
666            private final SortedMap<String, PoolInfo> allPools;
667            private final SortedMap<String, PoolInfo> nonEmptyPools;
668            private final SortedMap<String, PoolInfo> usedPools;
669
670            /**
671             * Instantiates a new pool infos.
672             *
673             * @param pools the pools
674             */
675            public PoolInfos(Set<ManagedBufferPool<?, ?>> pools) {
676                allPools = new TreeMap<>();
677                nonEmptyPools = new TreeMap<>();
678                usedPools = new TreeMap<>();
679
680                @SuppressWarnings("PMD.UseConcurrentHashMap")
681                Map<String, Integer> dupsNext = new HashMap<>();
682                for (ManagedBufferPool<?, ?> mbp : pools) {
683                    String key = mbp.name();
684                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
685                    PoolInfo infos = new PoolInfo(
686                        mbp.createdBufs.get(), mbp.queue.size(),
687                        mbp.preservedBufs, mbp.maximumBufs,
688                        mbp.bufferSize());
689                    if (allPools.containsKey(key)
690                        || dupsNext.containsKey(key)) {
691                        if (allPools.containsKey(key)) {
692                            // Found first duplicate, rename
693                            allPools.put(key + "#1", allPools.get(key));
694                            allPools.remove(key);
695                            dupsNext.put(key, 2);
696                        }
697                        allPools.put(key + "#"
698                            + (dupsNext.put(key, dupsNext.get(key) + 1)),
699                            infos);
700                    } else {
701                        allPools.put(key, infos);
702                    }
703                }
704                for (Map.Entry<String, PoolInfo> e : allPools.entrySet()) {
705                    PoolInfo infos = e.getValue();
706                    if (infos.getPooled() > 0) {
707                        nonEmptyPools.put(e.getKey(), infos);
708                    }
709                    if (infos.getCreated() > 0) {
710                        usedPools.put(e.getKey(), infos);
711                    }
712                }
713            }
714
715            /**
716             * All pools.
717             *
718             * @return the all pools
719             */
720            public SortedMap<String, PoolInfo> getAllPools() {
721                return allPools;
722            }
723
724            /**
725             * Pools that have at least managed buffer enqueued
726             * (ready to be acquired).
727             *
728             * @return the non empty pools
729             */
730            public SortedMap<String, PoolInfo> getNonEmptyPools() {
731                return nonEmptyPools;
732            }
733
734            /**
735             * Pools that have at least one associated buffer
736             * (in pool or in use).
737             *
738             * @return the used pools
739             */
740            public SortedMap<String, PoolInfo> getUsedPools() {
741                return usedPools;
742            }
743        }
744
745        /**
746         * Set the default drain delay.
747         * 
748         * @param millis the drain delay in milli seconds
749         */
750        void setDefaultDrainDelay(long millis);
751
752        /**
753         * Returns the drain delay in milli seconds.
754         * 
755         * @return the value
756         */
757        long getDefaultDrainDelay();
758
759        /**
760         * Set the acquire warning limit.
761         * 
762         * @param millis the limit
763         */
764        void setAcquireWarningLimit(long millis);
765
766        /**
767         * Returns the acquire warning limit.
768         * 
769         * @return the value
770         */
771        long getAcquireWarningLimit();
772
773        /**
774         * Informations about the pools.
775         * 
776         * @return the map
777         */
778        PoolInfos getPoolInfos();
779
780        /**
781         * Summary information about the pooled buffers.
782         * 
783         * @return the values
784         */
785        IntSummaryStatistics getPooledPerPoolStatistics();
786
787        /**
788         * Summary information about the created buffers.
789         * 
790         * @return the values
791         */
792        IntSummaryStatistics getCreatedPerPoolStatistics();
793    }
794
795    /**
796     * The MBean view
797     */
798    private static class MBeanView implements ManagedBufferPoolMXBean {
799
800        private static Set<ManagedBufferPool<?, ?>> allPools
801            = Collections.synchronizedSet(
802                Collections.newSetFromMap(
803                    new WeakHashMap<ManagedBufferPool<?, ?>, Boolean>()));
804
805        /**
806         * Adds the pool.
807         *
808         * @param pool the pool
809         */
810        public static void addPool(ManagedBufferPool<?, ?> pool) {
811            allPools.add(pool);
812        }
813
814        @Override
815        public void setDefaultDrainDelay(long millis) {
816            ManagedBufferPool.setDefaultDrainDelay(millis);
817        }
818
819        @Override
820        public long getDefaultDrainDelay() {
821            return ManagedBufferPool.defaultDrainDelay();
822        }
823
824        @Override
825        public void setAcquireWarningLimit(long millis) {
826            ManagedBufferPool.acquireWarningLimit = millis;
827        }
828
829        @Override
830        public long getAcquireWarningLimit() {
831            return ManagedBufferPool.acquireWarningLimit;
832        }
833
834        @Override
835        public PoolInfos getPoolInfos() {
836            return new PoolInfos(allPools);
837        }
838
839        @Override
840        public IntSummaryStatistics getPooledPerPoolStatistics() {
841            return allPools.stream().collect(
842                Collectors.summarizingInt(mbp -> mbp.queue.size()));
843        }
844
845        @Override
846        public IntSummaryStatistics getCreatedPerPoolStatistics() {
847            return allPools.stream().collect(
848                Collectors.summarizingInt(mbp -> mbp.createdBufs.get()));
849        }
850    }
851
852    static {
853        try {
854            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
855            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
856                + ManagedBufferPool.class.getSimpleName() + "s");
857            mbs.registerMBean(new MBeanView(), mxbeanName);
858        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
859                | MBeanRegistrationException | NotCompliantMBeanException e) {
860            // Does not happen
861        }
862    }
863}