001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.beans.ConstructorProperties;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.ReferenceQueue;
024import java.lang.ref.WeakReference;
025import java.nio.Buffer;
026import java.time.Duration;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.IntSummaryStatistics;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.SortedMap;
034import java.util.TreeMap;
035import java.util.WeakHashMap;
036import java.util.concurrent.ArrayBlockingQueue;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiFunction;
042import java.util.function.Supplier;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045import java.util.stream.Collectors;
046import javax.management.InstanceAlreadyExistsException;
047import javax.management.MBeanRegistrationException;
048import javax.management.MBeanServer;
049import javax.management.MalformedObjectNameException;
050import javax.management.NotCompliantMBeanException;
051import javax.management.ObjectName;
052import org.jgrapes.core.Components;
053import org.jgrapes.core.Components.Timer;
054import org.jgrapes.io.IOSubchannel;
055import org.jgrapes.io.events.Output;
056
057/**
058 * A queue based buffer pool. Using buffers from a pool is an important
059 * feature for limiting the computational resources for an {@link IOSubchannel}.
060 * A producer of {@link Output} events that simply creates its own buffers
061 * may produce and enqueue a large number of events that are not consumed
062 * as fast as they are produced. 
063 * 
064 * Using a buffer pool with a typical size of two synchronizes the 
065 * producer and the consumers of events nicely. The producer
066 * (thread) holds one buffer and fills it, the consumer (thread) holds 
067 * the other buffer and works with its content. If the producer finishes
068 * before the consumer, it has to stop until the consumer has processed 
069 * previous event and releases the buffer. The consumer can continue
070 * without delay, because the data has already been prepared and enqueued
071 * as the next event.
072 * 
073 * One of the biggest problems when using a pool can be to identify 
074 * leaking buffers, i.e. buffers that are not properly returned to the pool.
075 * This implementation therefore tracks all created buffers 
076 * (with a small overhead) and logs a warning if a buffer is no longer
077 * used (referenced) but has not been returned to the pool. If the
078 * log level for {@link ManagedBufferPool} is set to {@link Level#FINE},
079 * the warning also includes a stack trace of the call to {@link #acquire()}
080 * that handed out the buffer. Providing this information in addition 
081 * obviously requires a larger overhead and is therefore limited to the
082 * finer log levels.
083 *
084 * @param <W> the type of the wrapped (managed) buffer
085 * @param <T> the type of the content buffer that is wrapped
086 */
087@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.NcssCount",
088    "PMD.EmptyCatchBlock", "PMD.CouplingBetweenObjects" })
089public class ManagedBufferPool<W extends ManagedBuffer<T>, T extends Buffer>
090        implements BufferCollector<W> {
091
092    @SuppressWarnings("PMD.FieldNamingConventions")
093    protected final Logger logger
094        = Logger.getLogger(ManagedBufferPool.class.getName());
095
096    private static long defaultDrainDelay = 1500;
097    private static long acquireWarningLimit = 1000;
098
099    private String name = Components.objectName(this);
100    private BiFunction<T, BufferCollector<W>, W> wrapper;
101    private Supplier<T> bufferFactory;
102    private BufferMonitor bufferMonitor;
103    private BlockingQueue<W> queue;
104    private int bufferSize = -1;
105    private int preservedBufs;
106    private int maximumBufs;
107    private AtomicInteger createdBufs;
108    private long drainDelay = -1;
109    private final AtomicReference<Timer> idleTimer
110        = new AtomicReference<>(null);
111
112    /**
113     * Sets the default delay after which buffers are removed from
114     * the pool. The default value is 1500ms.
115     * 
116     * @param delay the delay in ms
117     */
118    public static void setDefaultDrainDelay(long delay) {
119        defaultDrainDelay = delay;
120    }
121
122    /**
123     * Returns the default drain delay.
124     * 
125     * @return the delay
126     */
127    public static long defaultDrainDelay() {
128        return defaultDrainDelay;
129    }
130
131    /**
132     * Create a pool that contains a varying number of (wrapped) buffers.
133     * The pool is initially empty. When buffers are requested and none 
134     * are left in the pool, new buffers are created up to the given 
135     * upper limit. Recollected buffers are put in the pool until it holds
136     * the number specified by the lower threshold. Any additional 
137     * recollected buffers are discarded. 
138     * 
139     * @param wrapper the function that converts buffers to managed buffers
140     * @param bufferFactory a function that creates a new buffer
141     * @param lowerThreshold the number of buffers kept in the pool
142     * @param upperLimit the maximum number of buffers
143     */
144    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
145            Supplier<T> bufferFactory, int lowerThreshold, int upperLimit) {
146        this.wrapper = wrapper;
147        this.bufferFactory = bufferFactory;
148        preservedBufs = lowerThreshold;
149        maximumBufs = upperLimit;
150        createdBufs = new AtomicInteger();
151        queue = new ArrayBlockingQueue<>(lowerThreshold);
152        bufferMonitor = new BufferMonitor(upperLimit);
153        MBeanView.addPool(this);
154    }
155
156    /**
157     * Create a pool that keeps up to the given number of (wrapped) buffers
158     * in the pool and also uses that number as upper limit.
159     * 
160     * @param wrapper the function that converts buffers to managed buffers
161     * @param bufferFactory a function that creates a new buffer
162     * @param buffers the number of buffers
163     */
164    public ManagedBufferPool(BiFunction<T, BufferCollector<W>, W> wrapper,
165            Supplier<T> bufferFactory, int buffers) {
166        this(wrapper, bufferFactory, buffers, buffers);
167    }
168
169    /**
170     * Sets a name for this pool (to be used in status reports).
171     * 
172     * @param name the name
173     * @return the object for easy chaining
174     */
175    public ManagedBufferPool<W, T> setName(String name) {
176        this.name = name;
177        return this;
178    }
179
180    /**
181     * Returns the name of this pool.
182     * 
183     * @return the name
184     */
185    public String name() {
186        return name;
187    }
188
189    /**
190     * Sets the delay after which buffers are removed from
191     * the pool.
192     * 
193     * @param delay the delay
194     * @return the object for easy chaining
195     */
196    public ManagedBufferPool<W, T> setDrainDelay(long delay) {
197        this.drainDelay = delay;
198        return this;
199    }
200
201    private W createBuffer() {
202        createdBufs.incrementAndGet();
203        W buffer = wrapper.apply(this.bufferFactory.get(), this);
204        bufferMonitor.put(buffer, new BufferProperties());
205        bufferSize = buffer.capacity();
206        return buffer;
207    }
208
209    /**
210     * Removes the buffer from the pool.
211     * 
212     * @param buffer the buffer to remove
213     */
214    @SuppressWarnings("PMD.GuardLogStatement")
215    private void removeBuffer(W buffer) {
216        createdBufs.decrementAndGet();
217        if (bufferMonitor.remove(buffer) == null) {
218            if (logger.isLoggable(Level.FINE)) {
219                logger.log(Level.WARNING,
220                    "Attempt to remove unknown buffer from pool.",
221                    new Throwable());
222            } else {
223                logger.warning("Attempt to remove unknown buffer from pool.");
224            }
225        }
226    }
227
228    /**
229     * Returns the size of the buffers managed by this pool.
230     * 
231     * @return the buffer size
232     */
233    public int bufferSize() {
234        if (bufferSize < 0) {
235            createBuffer().unlockBuffer();
236        }
237        return bufferSize;
238    }
239
240    /**
241     * Acquires a managed buffer from the pool. If the pool is empty,
242     * waits for a buffer to become available. The acquired buffer has 
243     * a lock count of one.
244     * 
245     * @return the acquired buffer
246     * @throws InterruptedException if the current thread is interrupted
247     */
248    @SuppressWarnings("PMD.GuardLogStatement")
249    public W acquire() throws InterruptedException {
250        // Stop draining, because we obviously need this kind of buffers
251        Optional.ofNullable(idleTimer.getAndSet(null)).ifPresent(
252            timer -> timer.cancel());
253        if (createdBufs.get() < maximumBufs) {
254            // Haven't reached maximum, so if no buffer is queued, create one.
255            W buffer = queue.poll();
256            if (buffer != null) {
257                buffer.lockBuffer();
258                return buffer;
259            }
260            return createBuffer();
261        }
262        // Wait for buffer to become available.
263        if (logger.isLoggable(Level.FINE)) {
264            // If configured, log message after waiting some time.
265            W buffer = queue.poll(acquireWarningLimit, TimeUnit.MILLISECONDS);
266            if (buffer != null) {
267                buffer.lockBuffer();
268                return buffer;
269            }
270            logger.log(Level.FINE,
271                Thread.currentThread().getName() + " waiting > "
272                    + acquireWarningLimit + "ms for buffer, while executing:",
273                new Throwable());
274        }
275        W buffer = queue.take();
276        buffer.lockBuffer();
277        return buffer;
278    }
279
280    /**
281     * Re-adds the buffer to the pool. The buffer is cleared.
282     *
283     * @param buffer the buffer
284     * @see org.jgrapes.io.util.BufferCollector#recollect(org.jgrapes.io.util.ManagedBuffer)
285     */
286    @Override
287    public void recollect(W buffer) {
288        if (queue.size() < preservedBufs) {
289            long effectiveDrainDelay
290                = drainDelay > 0 ? drainDelay : defaultDrainDelay;
291            if (effectiveDrainDelay > 0) {
292                // Enqueue
293                buffer.clear();
294                queue.add(buffer);
295                Timer old = idleTimer.getAndSet(Components.schedule(this::drain,
296                    Duration.ofMillis(effectiveDrainDelay)));
297                if (old != null) {
298                    old.cancel();
299                }
300                return;
301            }
302        }
303        // Discard
304        removeBuffer(buffer);
305    }
306
307    @SuppressWarnings({ "PMD.UnusedFormalParameter",
308        "PMD.UnusedPrivateMethod" })
309    private void drain(Timer timer) {
310        idleTimer.set(null);
311        while (true) {
312            W buffer = queue.poll();
313            if (buffer == null) {
314                break;
315            }
316            removeBuffer(buffer);
317        }
318    }
319
320    /*
321     * (non-Javadoc)
322     * 
323     * @see java.lang.Object#toString()
324     */
325    @Override
326    public String toString() {
327        StringBuilder builder = new StringBuilder(50);
328        builder.append("ManagedBufferPool [");
329        if (queue != null) {
330            builder.append("queue=").append(queue);
331        }
332        builder.append(']');
333        return builder.toString();
334    }
335
336    /**
337     * Buffer properties.
338     */
339    private class BufferProperties {
340
341        private final StackTraceElement[] createdBy;
342
343        /**
344         * Instantiates new buffer properties.
345         */
346        public BufferProperties() {
347            if (logger.isLoggable(Level.FINE)) {
348                createdBy = Thread.currentThread().getStackTrace();
349            } else {
350                createdBy = new StackTraceElement[0];
351            }
352        }
353
354        /**
355         * Returns where the buffer was created.
356         *
357         * @return the stack trace element[]
358         */
359        @SuppressWarnings("PMD.MethodReturnsInternalArray")
360        public StackTraceElement[] createdBy() {
361            return createdBy;
362        }
363    }
364
365    /**
366     * This is basically a WeakHashMap. We cannot use WeakHashMap
367     * because there is no "hook" into the collection of orphaned
368     * references, which is what we want here.
369     */
370    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
371    private class BufferMonitor {
372
373        private final Entry<W>[] data;
374        private int indexMask;
375        private final ReferenceQueue<W> orphanedEntries
376            = new ReferenceQueue<>();
377
378        /**
379         * An Entry.
380         *
381         * @param <B> the generic type
382         */
383        private class Entry<B extends ManagedBuffer<?>> extends WeakReference<B>
384                implements Map.Entry<B, BufferProperties> {
385            /* default */ final int index;
386            /* default */ BufferProperties props;
387            /* default */ Entry<B> next;
388
389            /**
390             * Instantiates a new entry.
391             *
392             * @param buffer the buffer
393             * @param props the props
394             * @param queue the queue
395             * @param index the index
396             * @param next the next
397             */
398            /* default */ Entry(B buffer, BufferProperties props,
399                    ReferenceQueue<B> queue, int index, Entry<B> next) {
400                super(buffer, queue);
401                this.index = index;
402                this.props = props;
403                this.next = next;
404            }
405
406            @Override
407            public B getKey() {
408                return get();
409            }
410
411            @Override
412            public BufferProperties getValue() {
413                return props;
414            }
415
416            @Override
417            public BufferProperties setValue(BufferProperties props) {
418                return this.props = props;
419            }
420        }
421
422        /**
423         * @param data
424         */
425        @SuppressWarnings("unchecked")
426        public BufferMonitor(int maxBuffers) {
427            int lists = 1;
428            while (lists < maxBuffers) {
429                lists <<= 1;
430                indexMask = (indexMask << 1) + 1;
431            }
432            data = new Entry[lists];
433        }
434
435        /**
436         * Put an entry in the map.
437         *
438         * @param buffer the buffer
439         * @param properties the properties
440         * @return the buffer properties
441         */
442        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
443        public BufferProperties put(W buffer, BufferProperties properties) {
444            check();
445            int index = buffer.hashCode() & indexMask;
446            synchronized (data) {
447                Entry<W> entry = data[index];
448                Entry<W> prev = null;
449                while (true) {
450                    if (entry == null) {
451                        // Not found, create new.
452                        entry = new Entry<>(buffer, properties,
453                            orphanedEntries, index, null);
454                        if (prev == null) {
455                            data[index] = entry; // Is first.
456                        } else {
457                            prev.next = entry; // Is next (last).
458                        }
459                        return properties;
460                    }
461                    if (entry.getKey() == buffer) { // NOPMD
462                        // Found, update.
463                        BufferProperties old = entry.getValue();
464                        entry.setValue(properties);
465                        return old;
466                    }
467                    prev = entry;
468                    entry = entry.next;
469                }
470            }
471        }
472
473        /**
474         * Returns the properties for the given buffer.
475         *
476         * @param buffer the buffer
477         * @return the buffer properties
478         */
479        @SuppressWarnings("unused")
480        public BufferProperties get(ManagedBuffer<?> buffer) {
481            check();
482            int index = buffer.hashCode() & indexMask;
483            synchronized (data) {
484                Entry<W> entry = data[index];
485                while (entry != null) {
486                    if (entry.getKey() == buffer) {
487                        return entry.getValue();
488                    }
489                    entry = entry.next;
490                }
491                return null;
492            }
493        }
494
495        /**
496         * Removes the given buffer.
497         *
498         * @param buffer the buffer
499         * @return the buffer properties
500         */
501        public BufferProperties remove(ManagedBuffer<?> buffer) {
502            check();
503            int index = buffer.hashCode() & indexMask;
504            synchronized (data) {
505                Entry<W> entry = data[index];
506                Entry<W> prev = null;
507                while (entry != null) {
508                    if (entry.getKey() == buffer) {
509                        if (prev == null) {
510                            data[index] = entry.next; // Was first.
511                        } else {
512                            prev.next = entry.next;
513                        }
514                        return entry.getValue();
515                    }
516                    prev = entry;
517                    entry = entry.next;
518                }
519                return null;
520            }
521        }
522
523        @SuppressWarnings("PMD.CompareObjectsWithEquals")
524        private BufferProperties remove(Entry<W> toBeRemoved) {
525            synchronized (data) {
526                Entry<W> entry = data[toBeRemoved.index];
527                Entry<W> prev = null;
528                while (entry != null) {
529                    if (entry == toBeRemoved) {
530                        if (prev == null) {
531                            data[toBeRemoved.index] = entry.next; // Was first.
532                        } else {
533                            prev.next = entry.next;
534                        }
535                        return entry.getValue();
536                    }
537                    prev = entry;
538                    entry = entry.next;
539                }
540                return null;
541            }
542        }
543
544        private void check() {
545            while (true) {
546                @SuppressWarnings("unchecked")
547                Entry<W> entry = (Entry<W>) orphanedEntries.poll();
548                if (entry == null) {
549                    return;
550                }
551                // Managed buffer has not been properly recollected, fix.
552                BufferProperties props = remove(entry);
553                if (props == null) {
554                    return;
555                }
556                createdBufs.decrementAndGet();
557                // Create warning
558                if (logger.isLoggable(Level.WARNING)) {
559                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
560                    final StringBuilder msg = new StringBuilder(
561                        "Orphaned buffer from pool ");
562                    msg.append(name());
563                    StackTraceElement[] trace = props.createdBy();
564                    if (trace != null) {
565                        msg.append(", created");
566                        for (StackTraceElement e : trace) {
567                            msg.append(System.lineSeparator()).append("\tat ")
568                                .append(e.toString());
569                        }
570                    }
571                    logger.warning(msg.toString());
572                }
573            }
574        }
575    }
576
577    /**
578     * An MBean interface for getting information about the managed
579     * buffer pools. Note that created buffer pools are tracked using
580     * weak references. Therefore, the MBean may report more pools than
581     * are really in use. 
582     */
583    public interface ManagedBufferPoolMXBean {
584
585        /**
586         * Information about a single managed pool.
587         */
588        @SuppressWarnings("PMD.DataClass")
589        class PoolInfo {
590            private final int created;
591            private final int pooled;
592            private final int preserved;
593            private final int maximum;
594            private final int bufferSize;
595
596            /**
597             * Instantiates a new pool info.
598             *
599             * @param created the created
600             * @param pooled the pooled
601             * @param preserved the preserved
602             * @param maximum the maximum
603             * @param bufferSize the buffer size
604             */
605            @ConstructorProperties({ "created", "pooled",
606                "preserved", "maximum", "bufferSize" })
607            public PoolInfo(int created, int pooled,
608                    int preserved, int maximum, int bufferSize) {
609                this.created = created;
610                this.pooled = pooled;
611                this.preserved = preserved;
612                this.maximum = maximum;
613                this.bufferSize = bufferSize;
614            }
615
616            /**
617             * The number of buffers created by this pool.
618             * 
619             * @return the value
620             */
621            public int getCreated() {
622                return created;
623            }
624
625            /**
626             * The number of buffers pooled (ready to be acquired).
627             * 
628             * @return the value
629             */
630            public int getPooled() {
631                return pooled;
632            }
633
634            /**
635             * The number of buffers preserved.
636             * 
637             * @return the value
638             */
639            public int getPreserved() {
640                return preserved;
641            }
642
643            /**
644             * The maximum number of buffers created by this pool.
645             * 
646             * @return the value
647             */
648            public int getMaximum() {
649                return maximum;
650            }
651
652            /**
653             * The size of the buffers in items.
654             * 
655             * @return the buffer size
656             */
657            public int getBufferSize() {
658                return bufferSize;
659            }
660        }
661
662        /**
663         * Three views on the existing pool.
664         */
665        class PoolInfos {
666            private final SortedMap<String, PoolInfo> allPools;
667            private final SortedMap<String, PoolInfo> nonEmptyPools;
668            private final SortedMap<String, PoolInfo> usedPools;
669
670            /**
671             * Instantiates a new pool infos.
672             *
673             * @param pools the pools
674             */
675            public PoolInfos(Set<ManagedBufferPool<?, ?>> pools) {
676                allPools = new TreeMap<>();
677                nonEmptyPools = new TreeMap<>();
678                usedPools = new TreeMap<>();
679
680                @SuppressWarnings("PMD.UseConcurrentHashMap")
681                Map<String, Integer> dupsNext = new HashMap<>();
682                for (ManagedBufferPool<?, ?> mbp : pools) {
683                    String key = mbp.name();
684                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
685                    PoolInfo infos = new PoolInfo(
686                        mbp.createdBufs.get(), mbp.queue.size(),
687                        mbp.preservedBufs, mbp.maximumBufs,
688                        mbp.bufferSize());
689                    if (allPools.containsKey(key)
690                        || dupsNext.containsKey(key)) {
691                        if (allPools.containsKey(key)) {
692                            // Found first duplicate, rename
693                            allPools.put(key + "#1", allPools.get(key));
694                            allPools.remove(key);
695                            dupsNext.put(key, 2);
696                        }
697                        allPools.put(key + "#"
698                            + dupsNext.put(key, dupsNext.get(key) + 1), infos);
699                    } else {
700                        allPools.put(key, infos);
701                    }
702                }
703                for (Map.Entry<String, PoolInfo> e : allPools.entrySet()) {
704                    PoolInfo infos = e.getValue();
705                    if (infos.getPooled() > 0) {
706                        nonEmptyPools.put(e.getKey(), infos);
707                    }
708                    if (infos.getCreated() > 0) {
709                        usedPools.put(e.getKey(), infos);
710                    }
711                }
712            }
713
714            /**
715             * All pools.
716             *
717             * @return the all pools
718             */
719            public SortedMap<String, PoolInfo> getAllPools() {
720                return allPools;
721            }
722
723            /**
724             * Pools that have at least managed buffer enqueued
725             * (ready to be acquired).
726             *
727             * @return the non empty pools
728             */
729            public SortedMap<String, PoolInfo> getNonEmptyPools() {
730                return nonEmptyPools;
731            }
732
733            /**
734             * Pools that have at least one associated buffer
735             * (in pool or in use).
736             *
737             * @return the used pools
738             */
739            public SortedMap<String, PoolInfo> getUsedPools() {
740                return usedPools;
741            }
742        }
743
744        /**
745         * Set the default drain delay.
746         * 
747         * @param millis the drain delay in milli seconds
748         */
749        void setDefaultDrainDelay(long millis);
750
751        /**
752         * Returns the drain delay in milli seconds.
753         * 
754         * @return the value
755         */
756        long getDefaultDrainDelay();
757
758        /**
759         * Set the acquire warning limit.
760         * 
761         * @param millis the limit
762         */
763        void setAcquireWarningLimit(long millis);
764
765        /**
766         * Returns the acquire warning limit.
767         * 
768         * @return the value
769         */
770        long getAcquireWarningLimit();
771
772        /**
773         * Informations about the pools.
774         * 
775         * @return the map
776         */
777        PoolInfos getPoolInfos();
778
779        /**
780         * Summary information about the pooled buffers.
781         * 
782         * @return the values
783         */
784        IntSummaryStatistics getPooledPerPoolStatistics();
785
786        /**
787         * Summary information about the created buffers.
788         * 
789         * @return the values
790         */
791        IntSummaryStatistics getCreatedPerPoolStatistics();
792    }
793
794    /**
795     * The MBean view
796     */
797    private static final class MBeanView implements ManagedBufferPoolMXBean {
798
799        private static Set<ManagedBufferPool<?, ?>> allPools
800            = Collections.synchronizedSet(
801                Collections.newSetFromMap(
802                    new WeakHashMap<ManagedBufferPool<?, ?>, Boolean>()));
803
804        /**
805         * Adds the pool.
806         *
807         * @param pool the pool
808         */
809        public static void addPool(ManagedBufferPool<?, ?> pool) {
810            allPools.add(pool);
811        }
812
813        @Override
814        public void setDefaultDrainDelay(long millis) {
815            ManagedBufferPool.setDefaultDrainDelay(millis);
816        }
817
818        @Override
819        public long getDefaultDrainDelay() {
820            return defaultDrainDelay();
821        }
822
823        @Override
824        public void setAcquireWarningLimit(long millis) {
825            acquireWarningLimit = millis;
826        }
827
828        @Override
829        public long getAcquireWarningLimit() {
830            return acquireWarningLimit;
831        }
832
833        @Override
834        public PoolInfos getPoolInfos() {
835            return new PoolInfos(allPools);
836        }
837
838        @Override
839        public IntSummaryStatistics getPooledPerPoolStatistics() {
840            return allPools.stream().collect(
841                Collectors.summarizingInt(mbp -> mbp.queue.size()));
842        }
843
844        @Override
845        public IntSummaryStatistics getCreatedPerPoolStatistics() {
846            return allPools.stream().collect(
847                Collectors.summarizingInt(mbp -> mbp.createdBufs.get()));
848        }
849    }
850
851    static {
852        try {
853            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
854            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
855                + ManagedBufferPool.class.getSimpleName() + "s");
856            mbs.registerMBean(new MBeanView(), mxbeanName);
857        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
858                | MBeanRegistrationException | NotCompliantMBeanException e) {
859            // Does not happen
860        }
861    }
862}