001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.WeakReference;
024import java.net.InetSocketAddress;
025import java.nio.channels.SelectionKey;
026import java.nio.channels.ServerSocketChannel;
027import java.nio.channels.SocketChannel;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.IntSummaryStatistics;
033import java.util.List;
034import java.util.Optional;
035import java.util.Set;
036import java.util.SortedMap;
037import java.util.TreeMap;
038import java.util.stream.Collectors;
039
040import javax.management.InstanceAlreadyExistsException;
041import javax.management.MBeanRegistrationException;
042import javax.management.MBeanServer;
043import javax.management.MalformedObjectNameException;
044import javax.management.NotCompliantMBeanException;
045import javax.management.ObjectName;
046
047import org.jgrapes.core.Channel;
048import org.jgrapes.core.Components;
049import org.jgrapes.core.Manager;
050import org.jgrapes.core.Self;
051import org.jgrapes.core.Subchannel;
052import org.jgrapes.core.annotation.Handler;
053import org.jgrapes.core.events.Error;
054import org.jgrapes.core.events.Start;
055import org.jgrapes.core.events.Stop;
056import org.jgrapes.io.NioHandler;
057import org.jgrapes.io.events.Close;
058import org.jgrapes.io.events.Closed;
059import org.jgrapes.io.events.IOError;
060import org.jgrapes.io.events.Input;
061import org.jgrapes.io.events.NioRegistration;
062import org.jgrapes.io.events.NioRegistration.Registration;
063import org.jgrapes.io.events.Output;
064import org.jgrapes.io.events.Purge;
065import org.jgrapes.io.util.AvailabilityListener;
066import org.jgrapes.io.util.LinkedIOSubchannel;
067import org.jgrapes.io.util.PermitsPool;
068import org.jgrapes.net.events.Accepted;
069import org.jgrapes.net.events.Ready;
070import org.jgrapes.util.events.ConfigurationUpdate;
071
072/**
073 * Provides a TCP server. The server binds to the given address. If the
074 * address is {@code null}, address and port are automatically assigned.
075 * The port may be overwritten by a configuration event
076 * (see {@link #onConfigurationUpdate(ConfigurationUpdate)}).
077 * 
078 * For each established connection, the server creates a new
079 * {@link LinkedIOSubchannel}. The servers basic operation is to
080 * fire {@link Input} (and {@link Closed}) events on the
081 * appropriate subchannel in response to data received from the
082 * network and to handle {@link Output} (and {@link Close}) events 
083 * on the subchannel and forward the information to the network
084 * connection.
085 * 
086 * The server supports limiting the number of concurrent connections
087 * with a {@link PermitsPool}. If such a pool is set as connection
088 * limiter (see {@link #setConnectionLimiter(PermitsPool)}), a
089 * permit is acquired for each new connection attempt. If no more
090 * permits are available, the server sends a {@link Purge} event on
091 * each channel that is purgeable for at least the time span
092 * set with {@link #setMinimalPurgeableTime(long)}. Purgeability 
093 * is derived from the end of record flag of {@link Output} events
094 * (see {@link #onOutput(Output, TcpChannelImpl)}. When using this feature, 
095 * make sure that connections are either short lived or the application
096 * level components support the {@link Purge} event. Else, it may become
097 * impossible to establish new connections.
098 */
099@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
100    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
101    "PMD.ExcessiveClassLength" })
102public class TcpServer extends TcpConnectionManager implements NioHandler {
103
104    private InetSocketAddress serverAddress;
105    private ServerSocketChannel serverSocketChannel;
106    private boolean closing;
107    private int backlog;
108    private PermitsPool connLimiter;
109    private Registration registration;
110    private Purger purger;
111    private long minimumPurgeableTime;
112
113    /**
114     * The purger thread.
115     */
116    private class Purger extends Thread implements AvailabilityListener {
117
118        private boolean permitsAvailable = true;
119
120        /**
121         * Instantiates a new purger.
122         */
123        public Purger() {
124            setName(Components.simpleObjectName(this));
125            setDaemon(true);
126        }
127
128        @Override
129        public void availabilityChanged(PermitsPool pool, boolean available) {
130            if (registration == null) {
131                return;
132            }
133            synchronized (this) {
134                permitsAvailable = available;
135                registration.updateInterested(
136                    permitsAvailable ? SelectionKey.OP_ACCEPT : 0);
137                if (!permitsAvailable) {
138                    this.notifyAll();
139                }
140            }
141        }
142
143        @Override
144        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
145            "PMD.DataflowAnomalyAnalysis" })
146        public void run() {
147            if (connLimiter == null) {
148                return;
149            }
150            try {
151                connLimiter.addListener(this);
152                while (serverSocketChannel.isOpen()) {
153                    synchronized (this) {
154                        while (permitsAvailable) {
155                            wait();
156                        }
157                    }
158                    // Copy to avoid ConcurrentModificationException
159                    List<TcpChannelImpl> candidates;
160                    synchronized (channels) {
161                        candidates = new ArrayList<>(channels);
162                    }
163                    long purgeableSince
164                        = System.currentTimeMillis() - minimumPurgeableTime;
165                    candidates = candidates.stream()
166                        .filter(channel -> channel.isPurgeable()
167                            && channel.purgeableSince() < purgeableSince)
168                        .sorted(new Comparator<TcpChannelImpl>() {
169                            @Override
170                            @SuppressWarnings("PMD.ShortVariable")
171                            public int compare(TcpChannelImpl c1,
172                                    TcpChannelImpl c2) {
173                                if (c1.purgeableSince() < c2
174                                    .purgeableSince()) {
175                                    return 1;
176                                }
177                                if (c1.purgeableSince() > c2
178                                    .purgeableSince()) {
179                                    return -1;
180                                }
181                                return 0;
182                            }
183                        })
184                        .collect(Collectors.toList());
185                    for (TcpChannelImpl channel : candidates) {
186                        // Sorting may have taken time...
187                        if (!channel.isPurgeable()) {
188                            continue;
189                        }
190                        channel.downPipeline().fire(new Purge(), channel);
191                        // Continue only as long as necessary
192                        if (permitsAvailable) {
193                            break;
194                        }
195                    }
196                    sleep(1000);
197                }
198            } catch (InterruptedException e) {
199                // Fall through
200            } finally {
201                connLimiter.removeListener(this);
202            }
203        }
204
205    }
206
207    /**
208     * Creates a new server, using itself as component channel. 
209     */
210    public TcpServer() {
211        this(Channel.SELF);
212    }
213
214    /**
215     * Creates a new server using the given channel.
216     * 
217     * @param componentChannel the component's channel
218     */
219    public TcpServer(Channel componentChannel) {
220        super(componentChannel);
221    }
222
223    /**
224     * Sets the address to bind to. If none is set, the address and port
225     * are assigned automatically.
226     * 
227     * @param serverAddress the address to bind to
228     * @return the TCP server for easy chaining
229     */
230    public TcpServer setServerAddress(InetSocketAddress serverAddress) {
231        this.serverAddress = serverAddress;
232        return this;
233    }
234
235    /**
236     * The component can be configured with events that include
237     * a path (see @link {@link ConfigurationUpdate#paths()})
238     * that matches this components path (see {@link Manager#componentPath()}).
239     * 
240     * The following properties are recognized:
241     * 
242     * `hostname`
243     * : If given, is used as first parameter for 
244     *   {@link InetSocketAddress#InetSocketAddress(String, int)}.
245     * 
246     * `port`
247     * : If given, is used as parameter for 
248     *   {@link InetSocketAddress#InetSocketAddress(String, int)} 
249     *   or {@link InetSocketAddress#InetSocketAddress(int)}, 
250     *   depending on whether a host name is specified. Defaults to "0".
251     *   
252     * `backlog`
253     * : See {@link #setBacklog(int)}.
254     * 
255     * `bufferSize`
256     * : See {@link #setBufferSize(int)}.
257     * 
258     * @param event the event
259     */
260    @Handler
261    @SuppressWarnings("PMD.ConfusingTernary")
262    public void onConfigurationUpdate(ConfigurationUpdate event) {
263        event.values(componentPath()).ifPresent(values -> {
264            String hostname = values.get("hostname");
265            if (hostname != null) {
266                setServerAddress(new InetSocketAddress(hostname,
267                    Integer.parseInt(values.getOrDefault("port", "0"))));
268            } else if (values.containsKey("port")) {
269                setServerAddress(new InetSocketAddress(
270                    Integer.parseInt(values.get("port"))));
271            }
272            Optional.ofNullable(values.get("backlog")).ifPresent(
273                value -> setBacklog(Integer.parseInt(value)));
274            Optional.ofNullable(values.get("bufferSize")).ifPresent(
275                value -> setBufferSize(Integer.parseInt(value)));
276        });
277    }
278
279    /**
280     * Returns the server address. Before starting, the address is the
281     * address set with {@link #setServerAddress(InetSocketAddress)}. After
282     * starting the address is obtained from the created socket.  
283     * 
284     * @return the serverAddress
285     */
286    public InetSocketAddress serverAddress() {
287        try {
288            return serverSocketChannel == null ? serverAddress
289                : (InetSocketAddress) serverSocketChannel.getLocalAddress();
290        } catch (IOException e) {
291            return serverAddress;
292        }
293    }
294
295    /**
296     * Sets the backlog size.
297     * 
298     * @param backlog the backlog to set
299     * @return the TCP server for easy chaining
300     */
301    public TcpServer setBacklog(int backlog) {
302        this.backlog = backlog;
303        return this;
304    }
305
306    /**
307     * Return the configured backlog size.
308     *
309     * @return the backlog
310     */
311    public int backlog() {
312        return backlog;
313    }
314
315    /**
316     * Sets a permit "pool". A new connection is created only if a permit
317     * can be obtained from the pool. 
318     * 
319     * A connection limiter must be set before starting the component.
320     * 
321     * @param connectionLimiter the connection pool to set
322     * @return the TCP server for easy chaining
323     */
324    public TcpServer setConnectionLimiter(PermitsPool connectionLimiter) {
325        this.connLimiter = connectionLimiter;
326        return this;
327    }
328
329    /**
330     * Returns the connection limiter.
331     *
332     * @return the connection Limiter
333     */
334    public PermitsPool getConnectionLimiter() {
335        return connLimiter;
336    }
337
338    /**
339     * Sets a minimal time that a connection must be purgeable (idle)
340     * before it may be purged.
341     *
342     * @param millis the millis
343     * @return the tcp server
344     */
345    public TcpServer setMinimalPurgeableTime(long millis) {
346        this.minimumPurgeableTime = millis;
347        return this;
348    }
349
350    /**
351     * Gets the minimal purgeable time.
352     *
353     * @return the minimal purgeable time
354     */
355    public long getMinimalPurgeableTime() {
356        return minimumPurgeableTime;
357    }
358
359    /**
360     * Starts the server.
361     * 
362     * @param event the start event
363     * @throws IOException if an I/O exception occurred
364     */
365    @Handler
366    public void onStart(Start event) throws IOException {
367        closing = false;
368        serverSocketChannel = ServerSocketChannel.open();
369        serverSocketChannel.bind(serverAddress, backlog);
370        MBeanView.addServer(this);
371        fire(new NioRegistration(this, serverSocketChannel,
372            SelectionKey.OP_ACCEPT, this), Channel.BROADCAST);
373    }
374
375    /**
376     * Handles the successful channel registration.
377     *
378     * @param event the event
379     * @throws InterruptedException the interrupted exception
380     * @throws IOException Signals that an I/O exception has occurred.
381     */
382    @Handler(channels = Self.class)
383    public void onRegistered(NioRegistration.Completed event)
384            throws InterruptedException, IOException {
385        NioHandler handler = event.event().handler();
386        if (handler == this) {
387            if (event.event().get() == null) {
388                fire(new Error(event,
389                    "Registration failed, no NioDispatcher?"));
390                return;
391            }
392            registration = event.event().get();
393            purger = new Purger();
394            purger.start();
395            fire(new Ready(serverSocketChannel.getLocalAddress()));
396            return;
397        }
398        if (handler instanceof TcpChannelImpl) {
399            TcpChannelImpl channel = (TcpChannelImpl) handler;
400            channel.downPipeline()
401                .fire(new Accepted(channel.nioChannel().getLocalAddress(),
402                    channel.nioChannel().getRemoteAddress(), false,
403                    Collections.emptyList()), channel);
404            channel.registrationComplete(event.event());
405        }
406    }
407
408    /*
409     * (non-Javadoc)
410     * 
411     * @see org.jgrapes.io.NioSelectable#handleOps(int)
412     */
413    @Override
414    public void handleOps(int ops) {
415        if ((ops & SelectionKey.OP_ACCEPT) == 0 || closing) {
416            return;
417        }
418        synchronized (channels) {
419            if (connLimiter != null && !connLimiter.tryAcquire()) {
420                return;
421            }
422            try {
423                SocketChannel socketChannel = serverSocketChannel.accept();
424                if (socketChannel == null) {
425                    // "False alarm"
426                    if (connLimiter != null) {
427                        connLimiter.release();
428                    }
429                    return;
430                }
431                channels.add(new TcpChannelImpl(socketChannel));
432            } catch (IOException e) {
433                fire(new IOError(null, e));
434            }
435        }
436    }
437
438    @Override
439    protected boolean removeChannel(TcpChannelImpl channel) {
440        synchronized (channels) {
441            if (!channels.remove(channel)) {
442                // Closed already
443                return false;
444            }
445            // In case the server is shutting down
446            channels.notifyAll();
447        }
448        if (connLimiter != null) {
449            connLimiter.release();
450        }
451        return true;
452    }
453
454    /**
455     * Shuts down the server or one of the connections to the server.
456     *
457     * @param event the event
458     * @throws IOException if an I/O exception occurred
459     * @throws InterruptedException if the execution was interrupted
460     */
461    @Handler
462    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
463    public void onClose(Close event) throws IOException, InterruptedException {
464        boolean closeServer = false;
465        for (Channel channel : event.channels()) {
466            if (channels.contains(channel)) {
467                ((TcpChannelImpl) channel).close();
468                continue;
469            }
470            if (channel instanceof Subchannel) {
471                // Some subchannel that we're not interested in.
472                continue;
473            }
474            // Close event on "main" channel
475            closeServer = true;
476        }
477        if (!closeServer) {
478            // Only connection(s) were to be closed.
479            return;
480        }
481        if (!serverSocketChannel.isOpen()) {
482            // Closed already
483            fire(new Closed());
484            return;
485        }
486        synchronized (channels) {
487            closing = true;
488            // Copy to avoid concurrent modification exception
489            Set<TcpChannelImpl> conns = new HashSet<>(channels);
490            for (TcpChannelImpl conn : conns) {
491                conn.close();
492            }
493            while (!channels.isEmpty()) {
494                channels.wait();
495            }
496        }
497        serverSocketChannel.close();
498        purger.interrupt();
499        closing = false;
500        fire(new Closed());
501    }
502
503    /**
504     * Shuts down the server by firing a {@link Close} using the
505     * server as channel. Note that this automatically results
506     * in closing all open connections by the runtime system
507     * and thus in {@link Closed} events on all subchannels.
508     * 
509     * @param event the event
510     */
511    @Handler(priority = -1000)
512    public void onStop(Stop event) {
513        if (closing || !serverSocketChannel.isOpen()) {
514            return;
515        }
516        newSyncEventPipeline().fire(new Close(), this);
517    }
518
519    /**
520     * The Interface of the TcpServer MXBean.
521     */
522    public interface TcpServerMXBean {
523
524        /**
525         * The Class ChannelInfo.
526         */
527        class ChannelInfo {
528
529            private final TcpChannelImpl channel;
530
531            /**
532             * Instantiates a new channel info.
533             *
534             * @param channel the channel
535             */
536            public ChannelInfo(TcpChannelImpl channel) {
537                this.channel = channel;
538            }
539
540            /**
541             * Checks if is purgeable.
542             *
543             * @return true, if is purgeable
544             */
545            public boolean isPurgeable() {
546                return channel.isPurgeable();
547            }
548
549            /**
550             * Gets the downstream pool.
551             *
552             * @return the downstream pool
553             */
554            public String getDownstreamPool() {
555                return channel.readBuffers().name();
556            }
557
558            /**
559             * Gets the upstream pool.
560             *
561             * @return the upstream pool
562             */
563            public String getUpstreamPool() {
564                return channel.byteBufferPool().name();
565            }
566        }
567
568        /**
569         * Gets the component path.
570         *
571         * @return the component path
572         */
573        String getComponentPath();
574
575        /**
576         * Gets the port.
577         *
578         * @return the port
579         */
580        int getPort();
581
582        /**
583         * Gets the channel count.
584         *
585         * @return the channel count
586         */
587        int getChannelCount();
588
589        /**
590         * Gets the channels.
591         *
592         * @return the channels
593         */
594        SortedMap<String, ChannelInfo> getChannels();
595
596    }
597
598    /**
599     * The Class TcpServerInfo.
600     */
601    public static class TcpServerInfo implements TcpServerMXBean {
602
603        private static MBeanServer mbs
604            = ManagementFactory.getPlatformMBeanServer();
605
606        private ObjectName mbeanName;
607        private final WeakReference<TcpServer> serverRef;
608
609        /**
610         * Instantiates a new tcp server info.
611         *
612         * @param server the server
613         */
614        @SuppressWarnings({ "PMD.EmptyCatchBlock",
615            "PMD.AvoidCatchingGenericException" })
616        public TcpServerInfo(TcpServer server) {
617            serverRef = new WeakReference<>(server);
618            try {
619                int port = server.serverAddress().getPort();
620                mbeanName = new ObjectName("org.jgrapes.io:type="
621                    + TcpServer.class.getSimpleName() + ",name="
622                    + ObjectName.quote(Components.objectName(server)
623                        + " (:" + port + ")"));
624            } catch (MalformedObjectNameException e) {
625                // Should not happen
626            }
627            try {
628                mbs.unregisterMBean(mbeanName);
629            } catch (Exception e) {
630                // Just in case, should not work
631            }
632            try {
633                mbs.registerMBean(this, mbeanName);
634            } catch (InstanceAlreadyExistsException | MBeanRegistrationException
635                    | NotCompliantMBeanException e) {
636                // Have to live with that
637            }
638        }
639
640        /**
641         * Server.
642         *
643         * @return the optional
644         */
645        @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
646            "PMD.EmptyCatchBlock" })
647        public Optional<TcpServer> server() {
648            TcpServer server = serverRef.get();
649            if (server == null) {
650                try {
651                    mbs.unregisterMBean(mbeanName);
652                } catch (Exception e) {
653                    // Should work.
654                }
655            }
656            return Optional.ofNullable(server);
657        }
658
659        /*
660         * (non-Javadoc)
661         * 
662         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getComponentPath()
663         */
664        @Override
665        public String getComponentPath() {
666            return server().map(mgr -> mgr.componentPath()).orElse("<removed>");
667        }
668
669        /*
670         * (non-Javadoc)
671         * 
672         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getPort()
673         */
674        @Override
675        public int getPort() {
676            return server().map(server -> server
677                .serverAddress().getPort()).orElse(0);
678        }
679
680        /*
681         * (non-Javadoc)
682         * 
683         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getChannelCount()
684         */
685        @Override
686        public int getChannelCount() {
687            return server().map(server -> server.channels.size()).orElse(0);
688        }
689
690        /*
691         * (non-Javadoc)
692         * 
693         * @see org.jgrapes.net.TcpServer.TcpServerMXBean#getChannels()
694         */
695        @Override
696        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
697        public SortedMap<String, ChannelInfo> getChannels() {
698            return server().map(server -> {
699                SortedMap<String, ChannelInfo> result = new TreeMap<>();
700                for (TcpChannelImpl channel : server.channels) {
701                    result.put(channel.nioChannel().socket()
702                        .getRemoteSocketAddress().toString(),
703                        new ChannelInfo(channel));
704                }
705                return result;
706            }).orElse(Collections.emptySortedMap());
707        }
708    }
709
710    /**
711     * An MBean interface for getting information about the TCP servers
712     * and established connections.
713     */
714    public interface TcpServerSummaryMXBean {
715
716        /**
717         * Gets the connections per server statistics.
718         *
719         * @return the connections per server statistics
720         */
721        IntSummaryStatistics getConnectionsPerServerStatistics();
722
723        /**
724         * Gets the servers.
725         *
726         * @return the servers
727         */
728        Set<TcpServerMXBean> getServers();
729    }
730
731    /**
732     * The MBeanView.
733     */
734    private static class MBeanView implements TcpServerSummaryMXBean {
735        private static Set<TcpServerInfo> serverInfos = new HashSet<>();
736
737        /**
738         * Adds the server to the reported servers.
739         *
740         * @param server the server
741         */
742        public static void addServer(TcpServer server) {
743            synchronized (serverInfos) {
744                serverInfos.add(new TcpServerInfo(server));
745            }
746        }
747
748        /**
749         * Returns the infos.
750         *
751         * @return the sets the
752         */
753        private Set<TcpServerInfo> infos() {
754            Set<TcpServerInfo> expired = new HashSet<>();
755            synchronized (serverInfos) {
756                for (TcpServerInfo serverInfo : serverInfos) {
757                    if (!serverInfo.server().isPresent()) {
758                        expired.add(serverInfo);
759                    }
760                }
761                serverInfos.removeAll(expired);
762            }
763            return serverInfos;
764        }
765
766        @SuppressWarnings("unchecked")
767        @Override
768        public Set<TcpServerMXBean> getServers() {
769            return (Set<TcpServerMXBean>) (Object) infos();
770        }
771
772        @Override
773        public IntSummaryStatistics getConnectionsPerServerStatistics() {
774            return infos().stream().map(info -> info.server().get())
775                .filter(ref -> ref != null).collect(
776                    Collectors.summarizingInt(srv -> srv.channels.size()));
777        }
778    }
779
780    static {
781        try {
782            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
783            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
784                + TcpServer.class.getSimpleName() + "s");
785            mbs.registerMBean(new MBeanView(), mxbeanName);
786        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
787                | MBeanRegistrationException | NotCompliantMBeanException e) {
788            // Does not happen
789        }
790    }
791
792}