001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.WeakReference;
024import java.net.InetSocketAddress;
025import java.nio.channels.SelectionKey;
026import java.nio.channels.ServerSocketChannel;
027import java.nio.channels.SocketChannel;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.IntSummaryStatistics;
033import java.util.List;
034import java.util.Optional;
035import java.util.Set;
036import java.util.SortedMap;
037import java.util.TreeMap;
038import java.util.stream.Collectors;
039
040import javax.management.InstanceAlreadyExistsException;
041import javax.management.MBeanRegistrationException;
042import javax.management.MBeanServer;
043import javax.management.MalformedObjectNameException;
044import javax.management.NotCompliantMBeanException;
045import javax.management.ObjectName;
046
047import org.jgrapes.core.Channel;
048import org.jgrapes.core.Components;
049import org.jgrapes.core.Manager;
050import org.jgrapes.core.Self;
051import org.jgrapes.core.annotation.Handler;
052import org.jgrapes.core.events.Error;
053import org.jgrapes.core.events.Start;
054import org.jgrapes.core.events.Stop;
055import org.jgrapes.io.NioHandler;
056import org.jgrapes.io.events.Close;
057import org.jgrapes.io.events.Closed;
058import org.jgrapes.io.events.IOError;
059import org.jgrapes.io.events.Input;
060import org.jgrapes.io.events.NioRegistration;
061import org.jgrapes.io.events.NioRegistration.Registration;
062import org.jgrapes.io.events.Output;
063import org.jgrapes.io.events.Purge;
064import org.jgrapes.io.util.AvailabilityListener;
065import org.jgrapes.io.util.LinkedIOSubchannel;
066import org.jgrapes.io.util.PermitsPool;
067import org.jgrapes.net.events.Accepted;
068import org.jgrapes.net.events.Ready;
069import org.jgrapes.util.events.ConfigurationUpdate;
070
071/**
072 * Provides a TCP server. The server binds to the given address. If the
073 * address is {@code null}, address and port are automatically assigned.
074 * The port may be overwritten by a configuration event
075 * (see {@link #onConfigurationUpdate(ConfigurationUpdate)}).
076 * 
077 * For each established connection, the server creates a new
078 * {@link LinkedIOSubchannel}. The servers basic operation is to
079 * fire {@link Input} (and {@link Closed}) events on the
080 * appropriate subchannel in response to data received from the
081 * network and to handle {@link Output} (and {@link Close}) events 
082 * on the subchannel and forward the information to the network
083 * connection.
084 * 
085 * The server supports limiting the number of concurrent connections
086 * with a {@link PermitsPool}. If such a pool is set as connection
087 * limiter (see {@link #setConnectionLimiter(PermitsPool)}), a
088 * permit is acquired for each new connection attempt. If no more
089 * permits are available, the server sends a {@link Purge} event on
090 * each channel that is purgeable for at least the time span
091 * set with {@link #setMinimalPurgeableTime(long)}. Purgeability 
092 * is derived from the end of record flag of {@link Output} events
093 * (see {@link #onOutput(Output, TcpChannelImpl)}. When using this feature, 
094 * make sure that connections are either short lived or the application
095 * level components support the {@link Purge} event. Else, it may become
096 * impossible to establish new connections.
097 */
098@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
099    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
100    "PMD.ExcessiveClassLength" })
101public class TcpServer extends TcpConnectionManager implements NioHandler {
102
103    private InetSocketAddress serverAddress;
104    private ServerSocketChannel serverSocketChannel;
105    private boolean closing;
106    private int backlog;
107    private PermitsPool connLimiter;
108    private Registration registration;
109    private Purger purger;
110    private long minimumPurgeableTime;
111
112    /**
113     * The purger thread.
114     */
115    private class Purger extends Thread implements AvailabilityListener {
116
117        private boolean permitsAvailable = true;
118
119        /**
120         * Instantiates a new purger.
121         */
122        public Purger() {
123            setName(Components.simpleObjectName(this));
124            setDaemon(true);
125        }
126
127        @Override
128        public void availabilityChanged(PermitsPool pool, boolean available) {
129            if (registration == null) {
130                return;
131            }
132            synchronized (this) {
133                permitsAvailable = available;
134                registration.updateInterested(
135                    permitsAvailable ? SelectionKey.OP_ACCEPT : 0);
136                if (!permitsAvailable) {
137                    this.notifyAll();
138                }
139            }
140        }
141
142        @Override
143        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
144            "PMD.DataflowAnomalyAnalysis" })
145        public void run() {
146            if (connLimiter == null) {
147                return;
148            }
149            try {
150                connLimiter.addListener(this);
151                while (serverSocketChannel.isOpen()) {
152                    synchronized (this) {
153                        while (permitsAvailable) {
154                            wait();
155                        }
156                    }
157                    // Copy to avoid ConcurrentModificationException
158                    List<TcpChannelImpl> candidates;
159                    synchronized (channels) {
160                        candidates = new ArrayList<>(channels);
161                    }
162                    long purgeableSince
163                        = System.currentTimeMillis() - minimumPurgeableTime;
164                    candidates = candidates.stream()
165                        .filter(channel -> channel.isPurgeable()
166                            && channel.purgeableSince() < purgeableSince)
167                        .sorted(new Comparator<TcpChannelImpl>() {
168                            @Override
169                            @SuppressWarnings("PMD.ShortVariable")
170                            public int compare(TcpChannelImpl c1, TcpChannelImpl c2) {
171                                if (c1.purgeableSince() < c2
172                                    .purgeableSince()) {
173                                    return 1;
174                                }
175                                if (c1.purgeableSince() > c2
176                                    .purgeableSince()) {
177                                    return -1;
178                                }
179                                return 0;
180                            }
181                        })
182                        .collect(Collectors.toList());
183                    for (TcpChannelImpl channel : candidates) {
184                        // Sorting may have taken time...
185                        if (!channel.isPurgeable()) {
186                            continue;
187                        }
188                        channel.downPipeline().fire(new Purge(), channel);
189                        // Continue only as long as necessary
190                        if (permitsAvailable) {
191                            break;
192                        }
193                    }
194                    sleep(1000);
195                }
196            } catch (InterruptedException e) {
197                // Fall through
198            } finally {
199                connLimiter.removeListener(this);
200            }
201        }
202
203    }
204
205    /**
206     * Creates a new server, using itself as component channel. 
207     */
208    public TcpServer() {
209        this(Channel.SELF);
210    }
211
212    /**
213     * Creates a new server using the given channel.
214     * 
215     * @param componentChannel the component's channel
216     */
217    public TcpServer(Channel componentChannel) {
218        super(componentChannel);
219    }
220
221    /**
222     * Sets the address to bind to. If none is set, the address and port
223     * are assigned automatically.
224     * 
225     * @param serverAddress the address to bind to
226     * @return the TCP server for easy chaining
227     */
228    public TcpServer setServerAddress(InetSocketAddress serverAddress) {
229        this.serverAddress = serverAddress;
230        return this;
231    }
232
233    /**
234     * The component can be configured with events that include
235     * a path (see @link {@link ConfigurationUpdate#paths()})
236     * that matches this components path (see {@link Manager#componentPath()}).
237     * 
238     * The following properties are recognized:
239     * 
240     * `hostname`
241     * : If given, is used as first parameter for 
242     *   {@link InetSocketAddress#InetSocketAddress(String, int)}.
243     * 
244     * `port`
245     * : If given, is used as parameter for 
246     *   {@link InetSocketAddress#InetSocketAddress(String, int)} 
247     *   or {@link InetSocketAddress#InetSocketAddress(int)}, 
248     *   depending on whether a host name is specified. Defaults to "0".
249     *   
250     * `backlog`
251     * : See {@link #setBacklog(int)}.
252     * 
253     * `bufferSize`
254     * : See {@link #setBufferSize(int)}.
255     * 
256     * @param event the event
257     */
258    @Handler
259    @SuppressWarnings("PMD.ConfusingTernary")
260    public void onConfigurationUpdate(ConfigurationUpdate event) {
261        event.values(componentPath()).ifPresent(values -> {
262            String hostname = values.get("hostname");
263            if (hostname != null) {
264                setServerAddress(new InetSocketAddress(hostname,
265                    Integer.parseInt(values.getOrDefault("port", "0"))));
266            } else if (values.containsKey("port")) {
267                setServerAddress(new InetSocketAddress(
268                    Integer.parseInt(values.get("port"))));
269            }
270            Optional.ofNullable(values.get("backlog")).ifPresent(
271                value -> setBacklog(Integer.parseInt(value)));
272            Optional.ofNullable(values.get("bufferSize")).ifPresent(
273                value -> setBufferSize(Integer.parseInt(value)));
274        });
275    }
276
277    /**
278     * Returns the server address. Before starting, the address is the
279     * address set with {@link #setServerAddress(InetSocketAddress)}. After
280     * starting the address is obtained from the created socket.  
281     * 
282     * @return the serverAddress
283     */
284    public InetSocketAddress serverAddress() {
285        try {
286            return serverSocketChannel == null ? serverAddress
287                : (InetSocketAddress) serverSocketChannel.getLocalAddress();
288        } catch (IOException e) {
289            return serverAddress;
290        }
291    }
292
293    /**
294     * Sets the backlog size.
295     * 
296     * @param backlog the backlog to set
297     * @return the TCP server for easy chaining
298     */
299    public TcpServer setBacklog(int backlog) {
300        this.backlog = backlog;
301        return this;
302    }
303
304    /**
305     * Return the configured backlog size.
306     *
307     * @return the backlog
308     */
309    public int backlog() {
310        return backlog;
311    }
312
313    /**
314     * Sets a permit "pool". A new connection is created only if a permit
315     * can be obtained from the pool. 
316     * 
317     * A connection limiter must be set before starting the component.
318     * 
319     * @param connectionLimiter the connection pool to set
320     * @return the TCP server for easy chaining
321     */
322    public TcpServer setConnectionLimiter(PermitsPool connectionLimiter) {
323        this.connLimiter = connectionLimiter;
324        return this;
325    }
326
327    /**
328     * Returns the connection limiter.
329     *
330     * @return the connection Limiter
331     */
332    public PermitsPool getConnectionLimiter() {
333        return connLimiter;
334    }
335
336    /**
337     * Sets a minimal time that a connection must be purgeable (idle)
338     * before it may be purged.
339     *
340     * @param millis the millis
341     * @return the tcp server
342     */
343    public TcpServer setMinimalPurgeableTime(long millis) {
344        this.minimumPurgeableTime = millis;
345        return this;
346    }
347
348    /**
349     * Gets the minimal purgeable time.
350     *
351     * @return the minimal purgeable time
352     */
353    public long getMinimalPurgeableTime() {
354        return minimumPurgeableTime;
355    }
356
357    /**
358     * Starts the server.
359     * 
360     * @param event the start event
361     * @throws IOException if an I/O exception occurred
362     */
363    @Handler
364    public void onStart(Start event) throws IOException {
365        closing = false;
366        serverSocketChannel = ServerSocketChannel.open();
367        serverSocketChannel.bind(serverAddress, backlog);
368        MBeanView.addServer(this);
369        fire(new NioRegistration(this, serverSocketChannel,
370            SelectionKey.OP_ACCEPT, this), Channel.BROADCAST);
371    }
372
373    /**
374     * Handles the successful channel registration.
375     *
376     * @param event the event
377     * @throws InterruptedException the interrupted exception
378     * @throws IOException Signals that an I/O exception has occurred.
379     */
380    @Handler(channels = Self.class)
381    public void onRegistered(NioRegistration.Completed event)
382            throws InterruptedException, IOException {
383        NioHandler handler = event.event().handler();
384        if (handler == this) {
385            if (event.event().get() == null) {
386                fire(new Error(event,
387                    "Registration failed, no NioDispatcher?"));
388                return;
389            }
390            registration = event.event().get();
391            purger = new Purger();
392            purger.start();
393            fire(new Ready(serverSocketChannel.getLocalAddress()));
394            return;
395        }
396        if (handler instanceof TcpChannelImpl) {
397            TcpChannelImpl channel = (TcpChannelImpl) handler;
398            channel.downPipeline()
399                .fire(new Accepted(channel.nioChannel().getLocalAddress(),
400                    channel.nioChannel().getRemoteAddress(), false,
401                    Collections.emptyList()), channel);
402            channel.registrationComplete(event.event());
403        }
404    }
405
406    /*
407     * (non-Javadoc)
408     * 
409     * @see org.jgrapes.io.NioSelectable#handleOps(int)
410     */
411    @Override
412    public void handleOps(int ops) {
413        if ((ops & SelectionKey.OP_ACCEPT) == 0 || closing) {
414            return;
415        }
416        synchronized (channels) {
417            if (connLimiter != null && !connLimiter.tryAcquire()) {
418                return;
419            }
420            try {
421                SocketChannel socketChannel = serverSocketChannel.accept();
422                if (socketChannel == null) {
423                    // "False alarm"
424                    if (connLimiter != null) {
425                        connLimiter.release();
426                    }
427                    return;
428                }
429                channels.add(new TcpChannelImpl(socketChannel));
430            } catch (IOException e) {
431                fire(new IOError(null, e));
432            }
433        }
434    }
435
436    @Override
437    protected boolean removeChannel(TcpChannelImpl channel) {
438        synchronized (channels) {
439            if (!channels.remove(channel)) {
440                // Closed already
441                return false;
442            }
443            // In case the server is shutting down
444            channels.notifyAll();
445        }
446        if (connLimiter != null) {
447            connLimiter.release();
448        }
449        return true;
450    }
451
452    /**
453     * Shuts down the server or one of the connections to the server.
454     *
455     * @param event the event
456     * @throws IOException if an I/O exception occurred
457     * @throws InterruptedException if the execution was interrupted
458     */
459    @Handler
460    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
461    public void onClose(Close event) throws IOException, InterruptedException {
462        boolean subOnly = true;
463        for (Channel channel : event.channels()) {
464            if (channel instanceof TcpChannelImpl) {
465                if (channels.contains(channel)) {
466                    ((TcpChannelImpl) channel).close();
467                }
468            } else {
469                subOnly = false;
470            }
471        }
472        if (subOnly || !serverSocketChannel.isOpen()) {
473            // Closed already
474            fire(new Closed());
475            return;
476        }
477        synchronized (channels) {
478            closing = true;
479            // Copy to avoid concurrent modification exception
480            Set<TcpChannelImpl> conns = new HashSet<>(channels);
481            for (TcpChannelImpl conn : conns) {
482                conn.close();
483            }
484            while (!channels.isEmpty()) {
485                channels.wait();
486            }
487        }
488        serverSocketChannel.close();
489        purger.interrupt();
490        closing = false;
491        fire(new Closed());
492    }
493
494    /**
495     * Shuts down the server by firing a {@link Close} using the
496     * server as channel. Note that this automatically results
497     * in closing all open connections by the runtime system
498     * and thus in {@link Closed} events on all subchannels.
499     * 
500     * @param event the event
501     */
502    @Handler(priority = -1000)
503    public void onStop(Stop event) {
504        if (closing || !serverSocketChannel.isOpen()) {
505            return;
506        }
507        newSyncEventPipeline().fire(new Close(), this);
508    }
509
510    /**
511     * The Interface of the TcpServer MXBean.
512     */
513    public interface TcpServerMXBean {
514
515        /**
516         * The Class ChannelInfo.
517         */
518        class ChannelInfo {
519
520            private final TcpChannelImpl channel;
521
522            /**
523             * Instantiates a new channel info.
524             *
525             * @param channel the channel
526             */
527            public ChannelInfo(TcpChannelImpl channel) {
528                this.channel = channel;
529            }
530
531            /**
532             * Checks if is purgeable.
533             *
534             * @return true, if is purgeable
535             */
536            public boolean isPurgeable() {
537                return channel.isPurgeable();
538            }
539
540            /**
541             * Gets the downstream pool.
542             *
543             * @return the downstream pool
544             */
545            public String getDownstreamPool() {
546                return channel.readBuffers().name();
547            }
548
549            /**
550             * Gets the upstream pool.
551             *
552             * @return the upstream pool
553             */
554            public String getUpstreamPool() {
555                return channel.byteBufferPool().name();
556            }
557        }
558
559        /**
560         * Gets the component path.
561         *
562         * @return the component path
563         */
564        String getComponentPath();
565
566        /**
567         * Gets the port.
568         *
569         * @return the port
570         */
571        int getPort();
572
573        /**
574         * Gets the channel count.
575         *
576         * @return the channel count
577         */
578        int getChannelCount();
579
580        /**
581         * Gets the channels.
582         *
583         * @return the channels
584         */
585        SortedMap<String, ChannelInfo> getChannels();
586
587    }
588
589    /**
590     * The Class TcpServerInfo.
591     */
592    public static class TcpServerInfo implements TcpServerMXBean {
593
594        private static MBeanServer mbs
595            = ManagementFactory.getPlatformMBeanServer();
596
597        private ObjectName mbeanName;
598        private final WeakReference<TcpServer> serverRef;
599
600        /**
601         * Instantiates a new tcp server info.
602         *
603         * @param server the server
604         */
605        @SuppressWarnings({ "PMD.EmptyCatchBlock",
606            "PMD.AvoidCatchingGenericException" })
607        public TcpServerInfo(TcpServer server) {
608            serverRef = new WeakReference<>(server);
609            try {
610                int port = server.serverAddress().getPort();
611                mbeanName = new ObjectName("org.jgrapes.io:type="
612                    + TcpServer.class.getSimpleName() + ",name="
613                    + ObjectName.quote(Components.objectName(server)
614                        + " (:" + port + ")"));
615            } catch (MalformedObjectNameException e) {
616                // Should not happen
617            }
618            try {
619                mbs.unregisterMBean(mbeanName);
620            } catch (Exception e) {
621                // Just in case, should not work
622            }
623            try {
624                mbs.registerMBean(this, mbeanName);
625            } catch (InstanceAlreadyExistsException | MBeanRegistrationException
626                    | NotCompliantMBeanException e) {
627                // Have to live with that
628            }
629        }
630
631        /**
632         * Server.
633         *
634         * @return the optional
635         */
636        @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
637            "PMD.EmptyCatchBlock" })
638        public Optional<TcpServer> server() {
639            TcpServer server = serverRef.get();
640            if (server == null) {
641                try {
642                    mbs.unregisterMBean(mbeanName);
643                } catch (Exception e) {
644                    // Should work.
645                }
646            }
647            return Optional.ofNullable(server);
648        }
649
650        /*
651         * (non-Javadoc)
652         * 
653         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getComponentPath()
654         */
655        @Override
656        public String getComponentPath() {
657            return server().map(mgr -> mgr.componentPath()).orElse("<removed>");
658        }
659
660        /*
661         * (non-Javadoc)
662         * 
663         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getPort()
664         */
665        @Override
666        public int getPort() {
667            return server().map(server -> server
668                .serverAddress().getPort()).orElse(0);
669        }
670
671        /*
672         * (non-Javadoc)
673         * 
674         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getChannelCount()
675         */
676        @Override
677        public int getChannelCount() {
678            return server().map(server -> server.channels.size()).orElse(0);
679        }
680
681        /*
682         * (non-Javadoc)
683         * 
684         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getChannels()
685         */
686        @Override
687        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
688        public SortedMap<String, ChannelInfo> getChannels() {
689            return server().map(server -> {
690                SortedMap<String, ChannelInfo> result = new TreeMap<>();
691                for (TcpChannelImpl channel : server.channels) {
692                    result.put(channel.nioChannel().socket()
693                        .getRemoteSocketAddress().toString(),
694                        new ChannelInfo(channel));
695                }
696                return result;
697            }).orElse(Collections.emptySortedMap());
698        }
699    }
700
701    /**
702     * An MBean interface for getting information about the TCP servers
703     * and established connections.
704     */
705    public interface TcpServerSummaryMXBean {
706
707        /**
708         * Gets the connections per server statistics.
709         *
710         * @return the connections per server statistics
711         */
712        IntSummaryStatistics getConnectionsPerServerStatistics();
713
714        /**
715         * Gets the servers.
716         *
717         * @return the servers
718         */
719        Set<TcpServerMXBean> getServers();
720    }
721
722    /**
723     * The MBeanView.
724     */
725    private static class MBeanView implements TcpServerSummaryMXBean {
726        private static Set<TcpServerInfo> serverInfos = new HashSet<>();
727
728        /**
729         * Adds the server to the reported servers.
730         *
731         * @param server the server
732         */
733        public static void addServer(TcpServer server) {
734            synchronized (serverInfos) {
735                serverInfos.add(new TcpServerInfo(server));
736            }
737        }
738
739        /**
740         * Returns the infos.
741         *
742         * @return the sets the
743         */
744        private Set<TcpServerInfo> infos() {
745            Set<TcpServerInfo> expired = new HashSet<>();
746            synchronized (serverInfos) {
747                for (TcpServerInfo serverInfo : serverInfos) {
748                    if (!serverInfo.server().isPresent()) {
749                        expired.add(serverInfo);
750                    }
751                }
752                serverInfos.removeAll(expired);
753            }
754            return serverInfos;
755        }
756
757        @SuppressWarnings("unchecked")
758        @Override
759        public Set<TcpServerMXBean> getServers() {
760            return (Set<TcpServerMXBean>) (Object) infos();
761        }
762
763        @Override
764        public IntSummaryStatistics getConnectionsPerServerStatistics() {
765            return infos().stream().map(info -> info.server().get())
766                .filter(ref -> ref != null).collect(
767                    Collectors.summarizingInt(srv -> srv.channels.size()));
768        }
769    }
770
771    static {
772        try {
773            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
774            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
775                + TcpServer.class.getSimpleName() + "s");
776            mbs.registerMBean(new MBeanView(), mxbeanName);
777        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
778                | MBeanRegistrationException | NotCompliantMBeanException e) {
779            // Does not happen
780        }
781    }
782
783}