001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.beans.ConstructorProperties;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.ReferenceQueue;
024import java.lang.ref.WeakReference;
025import java.nio.Buffer;
026import java.time.Duration;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.IntSummaryStatistics;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.SortedMap;
034import java.util.TreeMap;
035import java.util.WeakHashMap;
036import java.util.concurrent.ArrayBlockingQueue;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiFunction;
042import java.util.function.Supplier;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045import java.util.stream.Collectors;
046
047import javax.management.InstanceAlreadyExistsException;
048import javax.management.MBeanRegistrationException;
049import javax.management.MBeanServer;
050import javax.management.MalformedObjectNameException;
051import javax.management.NotCompliantMBeanException;
052import javax.management.ObjectName;
053
054import org.jgrapes.core.Components;
055import org.jgrapes.core.Components.Timer;
056import org.jgrapes.io.IOSubchannel;
057import org.jgrapes.io.events.Output;
058
059/**
060 * A queue based buffer pool. Using buffers from a pool is an important
061 * feature for limiting the computational resources for an {@link IOSubchannel}.
062 * A producer of {@link Output} events that simply creates its own buffers
063 * may produce and enqueue a large number of events that are not consumed
064 * as fast as they are produced. 
065 * 
066 * Using a buffer pool with a typical size of two synchronizes the 
067 * producer and the consumers of events nicely. The producer
068 * (thread) holds one buffer and fills it, the consumer (thread) holds 
069 * the other buffer and works with its content. If the producer finishes
070 * before the consumer, it has to stop until the consumer has processed 
071 * previous event and releases the buffer. The consumer can continue
072 * without delay, because the data has already been prepared and enqueued
073 * as the next event.
074 * 
075 * One of the biggest problems when using a pool can be to identify 
076 * leaking buffers, i.e. buffers that are not properly returned to the pool.
077 * This implementation therefore tracks all created buffers 
078 * (with a small overhead) and logs a warning if a buffer is no longer
079 * used (referenced) but has not been returned to the pool. If the
080 * log level for {@link ManagedBufferPool} is set to {@link Level#FINE},
081 * the warning also includes a stack trace of the call to {@link #acquire()}
082 * that handed out the buffer. Providing this information in addition 
083 * obviously requires a larger overhead and is therefore limited to the
084 * finer log levels.
085 *
086 * @param <W> the type of the wrapped (managed) buffer
087 * @param <T> the type of the content buffer that is wrapped
088 */
089@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.NcssCount",
090    "PMD.EmptyCatchBlock" })
091public class ManagedBufferPool<W extends ManagedBuffer<T>, T extends Buffer>
092        implements BufferCollector<W> {
093
094    @SuppressWarnings("PMD.FieldNamingConventions")
095    protected static final Logger logger
096        = Logger.getLogger(ManagedBufferPool.class.getName());
097
098    private static long defaultDrainDelay = 1500;
099    private static long acquireWarningLimit = 1000;
100
101    private String name = Components.objectName(this);
102    private BiFunction<T, BufferCollector<W>, W> wrapper;
103    private Supplier<T> bufferFactory;
104    private BufferMonitor bufferMonitor;
105    private BlockingQueue<W> queue;
106    private int bufferSize = -1;
107    private int preservedBufs;
108    private int maximumBufs;
109    private AtomicInteger createdBufs;
110    private long drainDelay = -1;
111    private final AtomicReference<Timer> idleTimer
112        = new AtomicReference<>(null);
113
114    /**
115     * Sets the default delay after which buffers are removed from
116     * the pool. The default value is 1500ms.
117     * 
118     * @param delay the delay in ms
119     */
120    public static void setDefaultDrainDelay(long delay) {
121        defaultDrainDelay = delay;
122    }
123
124    /**
125     * Returns the default drain delay.
126     * 
127     * @return the delay
128     */
129    public static long defaultDrainDelay() {
130        return defaultDrainDelay;
131    }
132
133    /**
134     * Create a pool that contains a varying number of (wrapped) buffers.
135     * The pool is initially empty. When buffers are requested and none 
136     * are left in the pool, new buffers are created up to the given 
137     * upper limit. Recollected buffers are put in the pool until it holds
138     * the number specified by the lower threshold. Any additional 
139     * recollected buffers are discarded. 
140     * 
141     * @param wrapper the function that converts buffers to managed buffers
142     * @param bufferFactory a function that creates a new buffer
143     * @param lowerThreshold the number of buffers kept in the pool
144     * @param upperLimit the maximum number of buffers
145     */
146    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
147            Supplier<T> bufferFactory, int lowerThreshold, int upperLimit) {
148        this.wrapper = wrapper;
149        this.bufferFactory = bufferFactory;
150        preservedBufs = lowerThreshold;
151        maximumBufs = upperLimit;
152        createdBufs = new AtomicInteger();
153        queue = new ArrayBlockingQueue<W>(lowerThreshold);
154        bufferMonitor = new BufferMonitor(upperLimit);
155        MBeanView.addPool(this);
156    }
157
158    /**
159     * Create a pool that keeps up to the given number of (wrapped) buffers
160     * in the pool and also uses that number as upper limit.
161     * 
162     * @param wrapper the function that converts buffers to managed buffers
163     * @param bufferFactory a function that creates a new buffer
164     * @param buffers the number of buffers
165     */
166    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
167            Supplier<T> bufferFactory, int buffers) {
168        this(wrapper, bufferFactory, buffers, buffers);
169    }
170
171    /**
172     * Sets a name for this pool (to be used in status reports).
173     * 
174     * @param name the name
175     * @return the object for easy chaining
176     */
177    public ManagedBufferPool<W, T> setName(String name) {
178        this.name = name;
179        return this;
180    }
181
182    /**
183     * Returns the name of this pool.
184     * 
185     * @return the name
186     */
187    public String name() {
188        return name;
189    }
190
191    /**
192     * Sets the delay after which buffers are removed from
193     * the pool.
194     * 
195     * @param delay the delay
196     * @return the object for easy chaining
197     */
198    public ManagedBufferPool<W, T> setDrainDelay(long delay) {
199        this.drainDelay = delay;
200        return this;
201    }
202
203    private W createBuffer() {
204        createdBufs.incrementAndGet();
205        W buffer = wrapper.apply(this.bufferFactory.get(), this);
206        bufferMonitor.put(buffer, new BufferProperties());
207        bufferSize = buffer.capacity();
208        return buffer;
209    }
210
211    /**
212     * Removes the buffer from the pool.
213     * 
214     * @param buffer the buffer to remove
215     */
216    private void removeBuffer(W buffer) {
217        createdBufs.decrementAndGet();
218        if (bufferMonitor.remove(buffer) == null) {
219            if (logger.isLoggable(Level.FINE)) {
220                logger.log(Level.WARNING,
221                    "Attempt to remove unknown buffer from pool.",
222                    new Throwable());
223            } else {
224                logger.warning("Attempt to remove unknown buffer from pool.");
225            }
226        }
227    }
228
229    /**
230     * Returns the size of the buffers managed by this pool.
231     * 
232     * @return the buffer size
233     */
234    public int bufferSize() {
235        if (bufferSize < 0) {
236            createBuffer().unlockBuffer();
237        }
238        return bufferSize;
239    }
240
241    /**
242     * Acquires a managed buffer from the pool. If the pool is empty,
243     * waits for a buffer to become available. The acquired buffer has 
244     * a lock count of one.
245     * 
246     * @return the acquired buffer
247     * @throws InterruptedException if the current thread is interrupted
248     */
249    @SuppressWarnings("PMD.GuardLogStatement")
250    public W acquire() throws InterruptedException {
251        // Stop draining, because we obviously need this kind of buffers
252        Optional.ofNullable(idleTimer.getAndSet(null)).ifPresent(
253            timer -> timer.cancel());
254        if (createdBufs.get() < maximumBufs) {
255            // Haven't reached maximum, so if no buffer is queued, create one.
256            W buffer = queue.poll();
257            if (buffer != null) {
258                buffer.lockBuffer();
259                return buffer;
260            }
261            return createBuffer();
262        }
263        // Wait for buffer to become available.
264        if (logger.isLoggable(Level.FINE)) {
265            // If configured, log message after waiting some time.
266            W buffer = queue.poll(acquireWarningLimit, TimeUnit.MILLISECONDS);
267            if (buffer != null) {
268                buffer.lockBuffer();
269                return buffer;
270            }
271            logger.log(Level.FINE, new Throwable(),
272                () -> Thread.currentThread().getName() + " waiting > "
273                    + acquireWarningLimit + "ms for buffer, while executing:");
274        }
275        W buffer = queue.take();
276        buffer.lockBuffer();
277        return buffer;
278    }
279
280    /**
281     * Re-adds the buffer to the pool. The buffer is cleared.
282     *
283     * @param buffer the buffer
284     * @see org.jgrapes.io.util.BufferCollector#recollect(org.jgrapes.io.util.ManagedBuffer)
285     */
286    @Override
287    public void recollect(W buffer) {
288        if (queue.size() < preservedBufs) {
289            long effectiveDrainDelay
290                = drainDelay > 0 ? drainDelay : defaultDrainDelay;
291            if (effectiveDrainDelay > 0) {
292                // Enqueue
293                buffer.clear();
294                queue.add(buffer);
295                Timer old = idleTimer.getAndSet(Components.schedule(this::drain,
296                    Duration.ofMillis(effectiveDrainDelay)));
297                if (old != null) {
298                    old.cancel();
299                }
300                return;
301            }
302        }
303        // Discard
304        removeBuffer(buffer);
305    }
306
307    @SuppressWarnings("PMD.UnusedFormalParameter")
308    private void drain(Timer timer) {
309        idleTimer.set(null);
310        while (true) {
311            W buffer = queue.poll();
312            if (buffer == null) {
313                break;
314            }
315            removeBuffer(buffer);
316        }
317    }
318
319    /*
320     * (non-Javadoc)
321     * 
322     * @see java.lang.Object#toString()
323     */
324    @Override
325    public String toString() {
326        StringBuilder builder = new StringBuilder(50);
327        builder.append("ManagedBufferPool [");
328        if (queue != null) {
329            builder.append("queue=");
330            builder.append(queue);
331        }
332        builder.append(']');
333        return builder.toString();
334    }
335
336    /**
337     * Buffer properties.
338     */
339    private static class BufferProperties {
340
341        private StackTraceElement[] createdBy;
342
343        /**
344         * Instantiates new buffer properties.
345         */
346        public BufferProperties() {
347            if (logger.isLoggable(Level.FINE)) {
348                createdBy = Thread.currentThread().getStackTrace();
349            }
350        }
351
352        /**
353         * Returns where the buffer was created.
354         *
355         * @return the stack trace element[]
356         */
357        @SuppressWarnings("PMD.MethodReturnsInternalArray")
358        public StackTraceElement[] createdBy() {
359            return createdBy;
360        }
361    }
362
363    /**
364     * This is basically a WeakHashMap. We cannot use WeakHashMap
365     * because there is no "hook" into the collection of orphaned
366     * references, which is what we want here.
367     */
368    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
369    private class BufferMonitor {
370
371        private Entry<W>[] data;
372        private int indexMask;
373        private final ReferenceQueue<W> orphanedEntries
374            = new ReferenceQueue<>();
375
376        /**
377         * An Entry.
378         *
379         * @param <B> the generic type
380         */
381        private class Entry<B extends ManagedBuffer<?>> extends WeakReference<B>
382                implements Map.Entry<B, BufferProperties> {
383            /* default */ final int index;
384            /* default */ BufferProperties props;
385            /* default */ Entry<B> next;
386
387            /**
388             * Instantiates a new entry.
389             *
390             * @param buffer the buffer
391             * @param props the props
392             * @param queue the queue
393             * @param index the index
394             * @param next the next
395             */
396            /* default */ Entry(B buffer, BufferProperties props,
397                    ReferenceQueue<B> queue, int index, Entry<B> next) {
398                super(buffer, queue);
399                this.index = index;
400                this.props = props;
401                this.next = next;
402            }
403
404            @Override
405            public B getKey() {
406                return get();
407            }
408
409            @Override
410            public BufferProperties getValue() {
411                return props;
412            }
413
414            @Override
415            public BufferProperties setValue(BufferProperties props) {
416                return this.props = props;
417            }
418        }
419
420        /**
421         * @param data
422         */
423        @SuppressWarnings("unchecked")
424        public BufferMonitor(int maxBuffers) {
425            int lists = 1;
426            while (lists < maxBuffers) {
427                lists <<= 1;
428                indexMask = (indexMask << 1) + 1;
429            }
430            data = new Entry[lists];
431        }
432
433        /**
434         * Put an entry in the map.
435         *
436         * @param buffer the buffer
437         * @param properties the properties
438         * @return the buffer properties
439         */
440        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
441        public BufferProperties put(W buffer, BufferProperties properties) {
442            check();
443            int index = buffer.hashCode() & indexMask;
444            synchronized (data) {
445                Entry<W> entry = data[index];
446                Entry<W> prev = null;
447                while (true) {
448                    if (entry == null) {
449                        // Not found, create new.
450                        entry = new Entry<>(buffer, properties,
451                            orphanedEntries, index, null);
452                        if (prev == null) {
453                            data[index] = entry; // Is first.
454                        } else {
455                            prev.next = entry; // Is next (last).
456                        }
457                        return properties;
458                    }
459                    if (entry.getKey() == buffer) {
460                        // Found, update.
461                        BufferProperties old = entry.getValue();
462                        entry.setValue(properties);
463                        return old;
464                    }
465                    prev = entry;
466                    entry = entry.next;
467                }
468            }
469        }
470
471        /**
472         * Returns the properties for the given buffer.
473         *
474         * @param buffer the buffer
475         * @return the buffer properties
476         */
477        @SuppressWarnings("unused")
478        public BufferProperties get(ManagedBuffer<?> buffer) {
479            check();
480            int index = buffer.hashCode() & indexMask;
481            synchronized (data) {
482                Entry<W> entry = data[index];
483                while (entry != null) {
484                    if (entry.getKey() == buffer) {
485                        return entry.getValue();
486                    }
487                    entry = entry.next;
488                }
489                return null;
490            }
491        }
492
493        /**
494         * Removes the given buffer.
495         *
496         * @param buffer the buffer
497         * @return the buffer properties
498         */
499        public BufferProperties remove(ManagedBuffer<?> buffer) {
500            check();
501            int index = buffer.hashCode() & indexMask;
502            synchronized (data) {
503                Entry<W> entry = data[index];
504                Entry<W> prev = null;
505                while (entry != null) {
506                    if (entry.getKey() == buffer) {
507                        if (prev == null) {
508                            data[index] = entry.next; // Was first.
509                        } else {
510                            prev.next = entry.next;
511                        }
512                        return entry.getValue();
513                    }
514                    prev = entry;
515                    entry = entry.next;
516                }
517                return null;
518            }
519        }
520
521        @SuppressWarnings("PMD.CompareObjectsWithEquals")
522        private BufferProperties remove(Entry<W> toBeRemoved) {
523            synchronized (data) {
524                Entry<W> entry = data[toBeRemoved.index];
525                Entry<W> prev = null;
526                while (entry != null) {
527                    if (entry == toBeRemoved) {
528                        if (prev == null) {
529                            data[toBeRemoved.index] = entry.next; // Was first.
530                        } else {
531                            prev.next = entry.next;
532                        }
533                        return entry.getValue();
534                    }
535                    prev = entry;
536                    entry = entry.next;
537                }
538                return null;
539            }
540        }
541
542        private void check() {
543            while (true) {
544                @SuppressWarnings("unchecked")
545                Entry<W> entry = (Entry<W>) orphanedEntries.poll();
546                if (entry == null) {
547                    return;
548                }
549                // Managed buffer has not been properly recollected, fix.
550                BufferProperties props = remove(entry);
551                if (props == null) {
552                    return;
553                }
554                createdBufs.decrementAndGet();
555                // Create warning
556                if (logger.isLoggable(Level.WARNING)) {
557                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
558                    final StringBuilder msg = new StringBuilder(
559                        "Orphaned buffer from pool ");
560                    msg.append(name());
561                    StackTraceElement[] trace = props.createdBy();
562                    if (trace != null) {
563                        msg.append(", created");
564                        for (StackTraceElement e : trace) {
565                            msg.append(System.lineSeparator());
566                            msg.append("\tat ");
567                            msg.append(e.toString());
568                        }
569                    }
570                    logger.warning(msg.toString());
571                }
572            }
573        }
574    }
575
576    /**
577     * An MBean interface for getting information about the managed
578     * buffer pools. Note that created buffer pools are tracked using
579     * weak references. Therefore, the MBean may report more pools than
580     * are really in use. 
581     */
582    public interface ManagedBufferPoolMXBean {
583
584        /**
585         * Information about a single managed pool.
586         */
587        @SuppressWarnings("PMD.DataClass")
588        class PoolInfo {
589            private final int created;
590            private final int pooled;
591            private final int preserved;
592            private final int maximum;
593            private final int bufferSize;
594
595            /**
596             * Instantiates a new pool info.
597             *
598             * @param created the created
599             * @param pooled the pooled
600             * @param preserved the preserved
601             * @param maximum the maximum
602             * @param bufferSize the buffer size
603             */
604            @ConstructorProperties({ "created", "pooled",
605                "preserved", "maximum", "bufferSize" })
606            public PoolInfo(int created, int pooled,
607                    int preserved, int maximum, int bufferSize) {
608                this.created = created;
609                this.pooled = pooled;
610                this.preserved = preserved;
611                this.maximum = maximum;
612                this.bufferSize = bufferSize;
613            }
614
615            /**
616             * The number of buffers created by this pool.
617             * 
618             * @return the value
619             */
620            public int getCreated() {
621                return created;
622            }
623
624            /**
625             * The number of buffers pooled (ready to be acquired).
626             * 
627             * @return the value
628             */
629            public int getPooled() {
630                return pooled;
631            }
632
633            /**
634             * The number of buffers preserved.
635             * 
636             * @return the value
637             */
638            public int getPreserved() {
639                return preserved;
640            }
641
642            /**
643             * The maximum number of buffers created by this pool.
644             * 
645             * @return the value
646             */
647            public int getMaximum() {
648                return maximum;
649            }
650
651            /**
652             * The size of the buffers in items.
653             * 
654             * @return the buffer size
655             */
656            public int getBufferSize() {
657                return bufferSize;
658            }
659        }
660
661        /**
662         * Three views on the existing pool.
663         */
664        class PoolInfos {
665            private final SortedMap<String, PoolInfo> allPools;
666            private final SortedMap<String, PoolInfo> nonEmptyPools;
667            private final SortedMap<String, PoolInfo> usedPools;
668
669            /**
670             * Instantiates a new pool infos.
671             *
672             * @param pools the pools
673             */
674            public PoolInfos(Set<ManagedBufferPool<?, ?>> pools) {
675                allPools = new TreeMap<>();
676                nonEmptyPools = new TreeMap<>();
677                usedPools = new TreeMap<>();
678
679                @SuppressWarnings("PMD.UseConcurrentHashMap")
680                Map<String, Integer> dupsNext = new HashMap<>();
681                for (ManagedBufferPool<?, ?> mbp : pools) {
682                    String key = mbp.name();
683                    PoolInfo infos = new PoolInfo(
684                        mbp.createdBufs.get(), mbp.queue.size(),
685                        mbp.preservedBufs, mbp.maximumBufs,
686                        mbp.bufferSize());
687                    if (allPools.containsKey(key)
688                        || dupsNext.containsKey(key)) {
689                        if (allPools.containsKey(key)) {
690                            // Found first duplicate, rename
691                            allPools.put(key + "#1", allPools.get(key));
692                            allPools.remove(key);
693                            dupsNext.put(key, 2);
694                        }
695                        allPools.put(key + "#"
696                            + (dupsNext.put(key, dupsNext.get(key) + 1)),
697                            infos);
698                    } else {
699                        allPools.put(key, infos);
700                    }
701                }
702                for (Map.Entry<String, PoolInfo> e : allPools.entrySet()) {
703                    PoolInfo infos = e.getValue();
704                    if (infos.getPooled() > 0) {
705                        nonEmptyPools.put(e.getKey(), infos);
706                    }
707                    if (infos.getCreated() > 0) {
708                        usedPools.put(e.getKey(), infos);
709                    }
710                }
711            }
712
713            /**
714             * All pools.
715             *
716             * @return the all pools
717             */
718            public SortedMap<String, PoolInfo> getAllPools() {
719                return allPools;
720            }
721
722            /**
723             * Pools that have at least managed buffer enqueued
724             * (ready to be acquired).
725             *
726             * @return the non empty pools
727             */
728            public SortedMap<String, PoolInfo> getNonEmptyPools() {
729                return nonEmptyPools;
730            }
731
732            /**
733             * Pools that have at least one associated buffer
734             * (in pool or in use).
735             *
736             * @return the used pools
737             */
738            public SortedMap<String, PoolInfo> getUsedPools() {
739                return usedPools;
740            }
741        }
742
743        /**
744         * Set the default drain delay.
745         * 
746         * @param millis the drain delay in milli seconds
747         */
748        void setDefaultDrainDelay(long millis);
749
750        /**
751         * Returns the drain delay in milli seconds.
752         * 
753         * @return the value
754         */
755        long getDefaultDrainDelay();
756
757        /**
758         * Set the acquire warning limit.
759         * 
760         * @param millis the limit
761         */
762        void setAcquireWarningLimit(long millis);
763
764        /**
765         * Returns the acquire warning limit.
766         * 
767         * @return the value
768         */
769        long getAcquireWarningLimit();
770
771        /**
772         * Informations about the pools.
773         * 
774         * @return the map
775         */
776        PoolInfos getPoolInfos();
777
778        /**
779         * Summary information about the pooled buffers.
780         * 
781         * @return the values
782         */
783        IntSummaryStatistics getPooledPerPoolStatistics();
784
785        /**
786         * Summary information about the created buffers.
787         * 
788         * @return the values
789         */
790        IntSummaryStatistics getCreatedPerPoolStatistics();
791    }
792
793    /**
794     * The MBean view
795     */
796    private static class MBeanView implements ManagedBufferPoolMXBean {
797
798        private static Set<ManagedBufferPool<?, ?>> allPools
799            = Collections.synchronizedSet(
800                Collections.newSetFromMap(
801                    new WeakHashMap<ManagedBufferPool<?, ?>, Boolean>()));
802
803        /**
804         * Adds the pool.
805         *
806         * @param pool the pool
807         */
808        public static void addPool(ManagedBufferPool<?, ?> pool) {
809            allPools.add(pool);
810        }
811
812        @Override
813        public void setDefaultDrainDelay(long millis) {
814            ManagedBufferPool.setDefaultDrainDelay(millis);
815        }
816
817        @Override
818        public long getDefaultDrainDelay() {
819            return ManagedBufferPool.defaultDrainDelay();
820        }
821
822        @Override
823        public void setAcquireWarningLimit(long millis) {
824            ManagedBufferPool.acquireWarningLimit = millis;
825        }
826
827        @Override
828        public long getAcquireWarningLimit() {
829            return ManagedBufferPool.acquireWarningLimit;
830        }
831
832        @Override
833        public PoolInfos getPoolInfos() {
834            return new PoolInfos(allPools);
835        }
836
837        @Override
838        public IntSummaryStatistics getPooledPerPoolStatistics() {
839            return allPools.stream().collect(
840                Collectors.summarizingInt(mbp -> mbp.queue.size()));
841        }
842
843        @Override
844        public IntSummaryStatistics getCreatedPerPoolStatistics() {
845            return allPools.stream().collect(
846                Collectors.summarizingInt(mbp -> mbp.createdBufs.get()));
847        }
848    }
849
850    static {
851        try {
852            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
853            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
854                + ManagedBufferPool.class.getSimpleName() + "s");
855            mbs.registerMBean(new MBeanView(), mxbeanName);
856        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
857                | MBeanRegistrationException | NotCompliantMBeanException e) {
858            // Does not happen
859        }
860    }
861}