001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.lang.management.ManagementFactory;
023import java.lang.ref.WeakReference;
024import java.net.InetSocketAddress;
025import java.net.SocketAddress;
026import java.net.StandardProtocolFamily;
027import java.net.UnixDomainSocketAddress;
028import java.nio.channels.SelectionKey;
029import java.nio.channels.ServerSocketChannel;
030import java.nio.channels.SocketChannel;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.Comparator;
034import java.util.HashSet;
035import java.util.IntSummaryStatistics;
036import java.util.List;
037import java.util.Optional;
038import java.util.Set;
039import java.util.SortedMap;
040import java.util.TreeMap;
041import java.util.stream.Collectors;
042import javax.management.InstanceAlreadyExistsException;
043import javax.management.MBeanRegistrationException;
044import javax.management.MBeanServer;
045import javax.management.MalformedObjectNameException;
046import javax.management.NotCompliantMBeanException;
047import javax.management.ObjectName;
048import org.jgrapes.core.Channel;
049import org.jgrapes.core.Components;
050import org.jgrapes.core.Event;
051import org.jgrapes.core.Manager;
052import org.jgrapes.core.Self;
053import org.jgrapes.core.Subchannel;
054import org.jgrapes.core.annotation.Handler;
055import org.jgrapes.core.events.Error;
056import org.jgrapes.core.events.Start;
057import org.jgrapes.core.events.Stop;
058import org.jgrapes.io.NioHandler;
059import org.jgrapes.io.events.Close;
060import org.jgrapes.io.events.Closed;
061import org.jgrapes.io.events.IOError;
062import org.jgrapes.io.events.Input;
063import org.jgrapes.io.events.NioRegistration;
064import org.jgrapes.io.events.NioRegistration.Registration;
065import org.jgrapes.io.events.Opening;
066import org.jgrapes.io.events.Output;
067import org.jgrapes.io.events.Purge;
068import org.jgrapes.io.util.AvailabilityListener;
069import org.jgrapes.io.util.LinkedIOSubchannel;
070import org.jgrapes.io.util.PermitsPool;
071import org.jgrapes.net.events.Accepted;
072import org.jgrapes.net.events.Ready;
073import org.jgrapes.util.events.ConfigurationUpdate;
074
075/**
076 * Provides a socket server. The server binds to the given address. If the
077 * address is {@code null}, address and port are automatically assigned.
078 * The port may be overwritten by a configuration event
079 * (see {@link #onConfigurationUpdate(ConfigurationUpdate)}).
080 * 
081 * For each established connection, the server creates a new
082 * {@link LinkedIOSubchannel}. The servers basic operation is to
083 * fire {@link Input} (and {@link Closed}) events on the
084 * appropriate subchannel in response to data received from the
085 * network and to handle {@link Output} (and {@link Close}) events 
086 * on the subchannel and forward the information to the network
087 * connection.
088 * 
089 * The server supports limiting the number of concurrent connections
090 * with a {@link PermitsPool}. If such a pool is set as connection
091 * limiter (see {@link #setConnectionLimiter(PermitsPool)}), a
092 * permit is acquired for each new connection attempt. If no more
093 * permits are available, the server sends a {@link Purge} event on
094 * each channel that is purgeable for at least the time span
095 * set with {@link #setMinimalPurgeableTime(long)}. Purgeability 
096 * is derived from the end of record flag of {@link Output} events
097 * (see {@link #onOutput(Output, SocketChannelImpl)}. When using this feature, 
098 * make sure that connections are either short lived or the application
099 * level components support the {@link Purge} event. Else, it may become
100 * impossible to establish new connections.
101 */
102@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
103    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
104    "PMD.ExcessiveClassLength" })
105public class SocketServer extends SocketConnectionManager
106        implements NioHandler {
107
108    private SocketAddress serverAddress;
109    private ServerSocketChannel serverSocketChannel;
110    private boolean closing;
111    private int backlog;
112    private PermitsPool connLimiter;
113    private Registration registration;
114    private Purger purger;
115    private long minimumPurgeableTime;
116
117    /**
118     * The purger thread.
119     */
120    private class Purger extends Thread implements AvailabilityListener {
121
122        private boolean permitsAvailable = true;
123
124        /**
125         * Instantiates a new purger.
126         */
127        public Purger() {
128            setName(Components.simpleObjectName(this));
129            setDaemon(true);
130        }
131
132        @Override
133        public void availabilityChanged(PermitsPool pool, boolean available) {
134            if (registration == null) {
135                return;
136            }
137            synchronized (this) {
138                permitsAvailable = available;
139                registration.updateInterested(
140                    permitsAvailable ? SelectionKey.OP_ACCEPT : 0);
141                if (!permitsAvailable) {
142                    this.notifyAll();
143                }
144            }
145        }
146
147        @Override
148        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
149            "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" })
150        public void run() {
151            if (connLimiter == null) {
152                return;
153            }
154            try {
155                connLimiter.addListener(this);
156                while (serverSocketChannel.isOpen()) {
157                    synchronized (this) {
158                        while (permitsAvailable) {
159                            wait();
160                        }
161                    }
162                    // Copy to avoid ConcurrentModificationException
163                    List<SocketChannelImpl> candidates;
164                    synchronized (channels) {
165                        candidates = new ArrayList<>(channels);
166                    }
167                    long purgeableSince
168                        = System.currentTimeMillis() - minimumPurgeableTime;
169                    candidates = candidates.stream()
170                        .filter(channel -> channel.isPurgeable()
171                            && channel.purgeableSince() < purgeableSince)
172                        .sorted(new Comparator<SocketChannelImpl>() {
173                            @Override
174                            @SuppressWarnings("PMD.ShortVariable")
175                            public int compare(SocketChannelImpl c1,
176                                    SocketChannelImpl c2) {
177                                if (c1.purgeableSince() < c2
178                                    .purgeableSince()) {
179                                    return 1;
180                                }
181                                if (c1.purgeableSince() > c2
182                                    .purgeableSince()) {
183                                    return -1;
184                                }
185                                return 0;
186                            }
187                        })
188                        .collect(Collectors.toList());
189                    for (SocketChannelImpl channel : candidates) {
190                        // Sorting may have taken time...
191                        if (!channel.isPurgeable()) {
192                            continue;
193                        }
194                        channel.downPipeline().fire(new Purge(), channel);
195                        // Continue only as long as necessary
196                        if (permitsAvailable) {
197                            break;
198                        }
199                    }
200                    sleep(1000);
201                }
202            } catch (InterruptedException e) {
203                // Fall through
204            } finally {
205                connLimiter.removeListener(this);
206            }
207        }
208
209    }
210
211    /**
212     * Creates a new server, using itself as component channel. 
213     */
214    public SocketServer() {
215        this(Channel.SELF);
216    }
217
218    /**
219     * Creates a new server using the given channel.
220     * 
221     * @param componentChannel the component's channel
222     */
223    public SocketServer(Channel componentChannel) {
224        super(componentChannel);
225    }
226
227    /**
228     * Sets the address to bind to. If none is set, the address and port
229     * are assigned automatically.
230     * 
231     * @param serverAddress the address to bind to
232     * @return the socket server for easy chaining
233     */
234    public SocketServer setServerAddress(SocketAddress serverAddress) {
235        this.serverAddress = serverAddress;
236        return this;
237    }
238
239    @Override
240    public SocketServer setBufferSize(int size) {
241        super.setBufferSize(size);
242        return this;
243    }
244
245    /**
246     * The component can be configured with events that include
247     * a path (see @link {@link ConfigurationUpdate#paths()})
248     * that matches this components path (see {@link Manager#componentPath()}).
249     * 
250     * The following properties are recognized:
251     * 
252     * `hostname`
253     * : If given, is used as first parameter for 
254     *   {@link InetSocketAddress#InetSocketAddress(String, int)}.
255     * 
256     * `port`
257     * : If given, is used as parameter for 
258     *   {@link InetSocketAddress#InetSocketAddress(String, int)} 
259     *   or {@link InetSocketAddress#InetSocketAddress(int)}, 
260     *   depending on whether a host name is specified. Defaults to "0".
261     *   
262     * `backlog`
263     * : See {@link #setBacklog(int)}.
264     * 
265     * `bufferSize`
266     * : See {@link #setBufferSize(int)}.
267     * 
268     * `maxConnections`
269     * : Calls {@link #setConnectionLimiter} with a
270     *   {@link PermitsPool} of the specified size.
271     * 
272     * `minimalPurgeableTime`
273     * : See {@link #setMinimalPurgeableTime(long)}.
274     * 
275     * @param event the event
276     */
277    @Handler
278    @SuppressWarnings("PMD.ConfusingTernary")
279    public void onConfigurationUpdate(ConfigurationUpdate event) {
280        event.values(componentPath()).ifPresent(values -> {
281            String hostname = values.get("hostname");
282            if (hostname != null) {
283                setServerAddress(new InetSocketAddress(hostname,
284                    Integer.parseInt(values.getOrDefault("port", "0"))));
285            } else if (values.containsKey("port")) {
286                setServerAddress(new InetSocketAddress(
287                    Integer.parseInt(values.get("port"))));
288            }
289            Optional.ofNullable(values.get("backlog")).ifPresent(
290                value -> setBacklog(Integer.parseInt(value)));
291            Optional.ofNullable(values.get("bufferSize")).ifPresent(
292                value -> setBufferSize(Integer.parseInt(value)));
293            Optional.ofNullable(values.get("maxConnections"))
294                .map(Integer::parseInt).map(PermitsPool::new)
295                .ifPresent(this::setConnectionLimiter);
296            Optional.ofNullable(values.get("minimalPurgeableTime"))
297                .map(Long::parseLong).ifPresent(this::setMinimalPurgeableTime);
298        });
299    }
300
301    /**
302     * Returns the server address. Before starting, the address is the
303     * address set with {@link #setServerAddress(InetSocketAddress)}. After
304     * starting the address is obtained from the created socket.  
305     * 
306     * @return the serverAddress
307     */
308    public SocketAddress serverAddress() {
309        try {
310            return serverSocketChannel == null ? serverAddress
311                : serverSocketChannel.getLocalAddress();
312        } catch (IOException e) {
313            return serverAddress;
314        }
315    }
316
317    /**
318     * Sets the backlog size.
319     * 
320     * @param backlog the backlog to set
321     * @return the socket server for easy chaining
322     */
323    public SocketServer setBacklog(int backlog) {
324        this.backlog = backlog;
325        return this;
326    }
327
328    /**
329     * Return the configured backlog size.
330     *
331     * @return the backlog
332     */
333    public int backlog() {
334        return backlog;
335    }
336
337    /**
338     * Sets a permit "pool". A new connection is created only if a permit
339     * can be obtained from the pool. 
340     * 
341     * A connection limiter must be set before starting the component.
342     * 
343     * @param connectionLimiter the connection pool to set
344     * @return the socket server for easy chaining
345     */
346    public SocketServer setConnectionLimiter(PermitsPool connectionLimiter) {
347        this.connLimiter = connectionLimiter;
348        return this;
349    }
350
351    /**
352     * Returns the connection limiter.
353     *
354     * @return the connection Limiter
355     */
356    public PermitsPool getConnectionLimiter() {
357        return connLimiter;
358    }
359
360    /**
361     * Sets a minimal time that a connection must be purgeable (idle)
362     * before it may be purged.
363     *
364     * @param millis the millis
365     * @return the socket server
366     */
367    public SocketServer setMinimalPurgeableTime(long millis) {
368        this.minimumPurgeableTime = millis;
369        return this;
370    }
371
372    /**
373     * Gets the minimal purgeable time.
374     *
375     * @return the minimal purgeable time
376     */
377    public long getMinimalPurgeableTime() {
378        return minimumPurgeableTime;
379    }
380
381    /**
382     * Starts the server.
383     * 
384     * @param event the start event
385     * @throws IOException if an I/O exception occurred
386     */
387    @Handler
388    public void onStart(Start event) throws IOException {
389        closing = false;
390        if (serverAddress instanceof UnixDomainSocketAddress) {
391            serverSocketChannel
392                = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
393        } else {
394            serverSocketChannel = ServerSocketChannel.open();
395        }
396        serverSocketChannel.bind(serverAddress, backlog);
397        MBeanView.addServer(this);
398        fire(new NioRegistration(this, serverSocketChannel,
399            SelectionKey.OP_ACCEPT, this), Channel.BROADCAST);
400    }
401
402    /**
403     * Handles the successful channel registration.
404     *
405     * @param event the event
406     * @throws InterruptedException the interrupted exception
407     * @throws IOException Signals that an I/O exception has occurred.
408     */
409    @Handler(channels = Self.class)
410    public void onRegistered(NioRegistration.Completed event)
411            throws InterruptedException, IOException {
412        NioHandler handler = event.event().handler();
413        if (handler == this) {
414            if (event.event().get() == null) {
415                fire(new Error(event,
416                    "Registration failed, no NioDispatcher?"));
417                return;
418            }
419            registration = event.event().get();
420            purger = new Purger();
421            purger.start();
422            fire(new Ready(serverSocketChannel.getLocalAddress()));
423            return;
424        }
425        if (handler instanceof SocketChannelImpl channel) {
426            var accepted = new Accepted(channel.nioChannel().getLocalAddress(),
427                channel.nioChannel().getRemoteAddress(), false,
428                Collections.emptyList());
429            var registration = event.event().get();
430            // (1) Opening, (2) Accepted, (3) process input
431            channel.downPipeline().fire(Event.onCompletion(new Opening<Void>(),
432                e -> {
433                    channel.downPipeline().fire(accepted, channel);
434                    channel.registrationComplete(registration);
435                }), channel);
436        }
437    }
438
439    /*
440     * (non-Javadoc)
441     * 
442     * @see org.jgrapes.io.NioSelectable#handleOps(int)
443     */
444    @Override
445    public void handleOps(int ops) {
446        if ((ops & SelectionKey.OP_ACCEPT) == 0 || closing) {
447            return;
448        }
449        synchronized (channels) {
450            if (connLimiter != null && !connLimiter.tryAcquire()) {
451                return;
452            }
453            try {
454                @SuppressWarnings("PMD.CloseResource")
455                SocketChannel socketChannel = serverSocketChannel.accept();
456                if (socketChannel == null) {
457                    // "False alarm"
458                    if (connLimiter != null) {
459                        connLimiter.release();
460                    }
461                    return;
462                }
463                channels.add(new SocketChannelImpl(null, socketChannel));
464            } catch (IOException e) {
465                fire(new IOError(null, e));
466            }
467        }
468    }
469
470    @Override
471    protected boolean removeChannel(SocketChannelImpl channel) {
472        synchronized (channels) {
473            if (!channels.remove(channel)) {
474                // Closed already
475                return false;
476            }
477            // In case the server is shutting down
478            channels.notifyAll();
479        }
480        if (connLimiter != null) {
481            connLimiter.release();
482        }
483        return true;
484    }
485
486    /**
487     * Shuts down the server or one of the connections to the server.
488     *
489     * @param event the event
490     * @throws IOException if an I/O exception occurred
491     * @throws InterruptedException if the execution was interrupted
492     */
493    @Handler
494    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
495    public void onClose(Close event) throws IOException, InterruptedException {
496        boolean closeServer = false;
497        for (Channel channel : event.channels()) {
498            if (channels.contains(channel)) {
499                ((SocketChannelImpl) channel).close();
500                continue;
501            }
502            if (channel instanceof Subchannel) {
503                // Some subchannel that we're not interested in.
504                continue;
505            }
506            // Close event on "main" channel
507            closeServer = true;
508        }
509        if (!closeServer) {
510            // Only connection(s) were to be closed.
511            return;
512        }
513        if (!serverSocketChannel.isOpen()) {
514            // Closed already
515            fire(new Closed<Void>());
516            return;
517        }
518        synchronized (channels) {
519            closing = true;
520            // Copy to avoid concurrent modification exception
521            Set<SocketChannelImpl> conns = new HashSet<>(channels);
522            for (SocketChannelImpl conn : conns) {
523                conn.close();
524            }
525            while (!channels.isEmpty()) {
526                channels.wait();
527            }
528        }
529        serverSocketChannel.close();
530        purger.interrupt();
531        closing = false;
532        fire(new Closed<Void>());
533    }
534
535    /**
536     * Shuts down the server by firing a {@link Close} using the
537     * server as channel. Note that this automatically results
538     * in closing all open connections by the runtime system
539     * and thus in {@link Closed} events on all subchannels.
540     * 
541     * @param event the event
542     * @throws InterruptedException 
543     */
544    @Handler(priority = -1000)
545    public void onStop(Stop event) throws InterruptedException {
546        if (closing || !serverSocketChannel.isOpen()) {
547            return;
548        }
549        newEventPipeline().fire(new Close(), this).get();
550    }
551
552    /**
553     * The Interface of the SocketServer MXBean.
554     */
555    public interface SocketServerMXBean {
556
557        /**
558         * The Class ChannelInfo.
559         */
560        class ChannelInfo {
561
562            private final SocketChannelImpl channel;
563
564            /**
565             * Instantiates a new channel info.
566             *
567             * @param channel the channel
568             */
569            public ChannelInfo(SocketChannelImpl channel) {
570                this.channel = channel;
571            }
572
573            /**
574             * Checks if is purgeable.
575             *
576             * @return true, if is purgeable
577             */
578            public boolean isPurgeable() {
579                return channel.isPurgeable();
580            }
581
582            /**
583             * Gets the downstream pool.
584             *
585             * @return the downstream pool
586             */
587            public String getDownstreamPool() {
588                return channel.readBuffers().name();
589            }
590
591            /**
592             * Gets the upstream pool.
593             *
594             * @return the upstream pool
595             */
596            public String getUpstreamPool() {
597                return channel.byteBufferPool().name();
598            }
599        }
600
601        /**
602         * Gets the component path.
603         *
604         * @return the component path
605         */
606        String getComponentPath();
607
608        /**
609         * Gets the channel count.
610         *
611         * @return the channel count
612         */
613        int getChannelCount();
614
615        /**
616         * Gets the channels.
617         *
618         * @return the channels
619         */
620        SortedMap<String, ChannelInfo> getChannels();
621
622    }
623
624    /**
625     * The Class SocketServerInfo.
626     */
627    public static class SocketServerInfo implements SocketServerMXBean {
628
629        private static MBeanServer mbs
630            = ManagementFactory.getPlatformMBeanServer();
631
632        private ObjectName mbeanName;
633        private final WeakReference<SocketServer> serverRef;
634
635        /**
636         * Instantiates a new socket server info.
637         *
638         * @param server the server
639         */
640        @SuppressWarnings({ "PMD.EmptyCatchBlock",
641            "PMD.AvoidCatchingGenericException",
642            "PMD.ConstructorCallsOverridableMethod" })
643        public SocketServerInfo(SocketServer server) {
644            serverRef = new WeakReference<>(server);
645            try {
646                String endPoint = "";
647                if (server.serverAddress instanceof InetSocketAddress addr) {
648                    endPoint = " (" + addr.getHostName() + ":" + addr.getPort()
649                        + ")";
650                } else if (server.serverAddress instanceof UnixDomainSocketAddress addr) {
651                    endPoint = " (" + addr.getPath() + ")";
652                }
653                mbeanName = new ObjectName("org.jgrapes.io:type="
654                    + SocketServer.class.getSimpleName() + ",name="
655                    + ObjectName
656                        .quote(Components.objectName(server) + endPoint));
657            } catch (MalformedObjectNameException e) {
658                // Should not happen
659            }
660            try {
661                mbs.unregisterMBean(mbeanName);
662            } catch (Exception e) {
663                // Just in case, should not work
664            }
665            try {
666                mbs.registerMBean(this, mbeanName);
667            } catch (InstanceAlreadyExistsException | MBeanRegistrationException
668                    | NotCompliantMBeanException e) {
669                // Have to live with that
670            }
671        }
672
673        /**
674         * Server.
675         *
676         * @return the optional
677         */
678        @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
679            "PMD.EmptyCatchBlock" })
680        public Optional<SocketServer> server() {
681            SocketServer server = serverRef.get();
682            if (server == null) {
683                try {
684                    mbs.unregisterMBean(mbeanName);
685                } catch (Exception e) {
686                    // Should work.
687                }
688            }
689            return Optional.ofNullable(server);
690        }
691
692        @Override
693        public String getComponentPath() {
694            return server().map(mgr -> mgr.componentPath()).orElse("<removed>");
695        }
696
697        @Override
698        public int getChannelCount() {
699            return server().map(server -> server.channels.size()).orElse(0);
700        }
701
702        @Override
703        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
704        public SortedMap<String, ChannelInfo> getChannels() {
705            return server().map(server -> {
706                SortedMap<String, ChannelInfo> result = new TreeMap<>();
707                for (SocketChannelImpl channel : server.channels) {
708                    result.put(channel.nioChannel().socket()
709                        .getRemoteSocketAddress().toString(),
710                        new ChannelInfo(channel));
711                }
712                return result;
713            }).orElse(Collections.emptySortedMap());
714        }
715    }
716
717    /**
718     * An MBean interface for getting information about the socket servers
719     * and established connections.
720     */
721    public interface SocketServerSummaryMXBean {
722
723        /**
724         * Gets the connections per server statistics.
725         *
726         * @return the connections per server statistics
727         */
728        IntSummaryStatistics getConnectionsPerServerStatistics();
729
730        /**
731         * Gets the servers.
732         *
733         * @return the servers
734         */
735        Set<SocketServerMXBean> getServers();
736    }
737
738    /**
739     * The MBeanView.
740     */
741    private static class MBeanView implements SocketServerSummaryMXBean {
742        private static Set<SocketServerInfo> serverInfos = new HashSet<>();
743
744        /**
745         * Adds the server to the reported servers.
746         *
747         * @param server the server
748         */
749        public static void addServer(SocketServer server) {
750            synchronized (serverInfos) {
751                serverInfos.add(new SocketServerInfo(server));
752            }
753        }
754
755        /**
756         * Returns the infos.
757         *
758         * @return the sets the
759         */
760        private Set<SocketServerInfo> infos() {
761            Set<SocketServerInfo> expired = new HashSet<>();
762            synchronized (serverInfos) {
763                for (SocketServerInfo serverInfo : serverInfos) {
764                    if (!serverInfo.server().isPresent()) {
765                        expired.add(serverInfo);
766                    }
767                }
768                serverInfos.removeAll(expired);
769            }
770            return serverInfos;
771        }
772
773        @SuppressWarnings("unchecked")
774        @Override
775        public Set<SocketServerMXBean> getServers() {
776            return (Set<SocketServerMXBean>) (Object) infos();
777        }
778
779        @Override
780        public IntSummaryStatistics getConnectionsPerServerStatistics() {
781            return infos().stream().map(info -> info.server().get())
782                .filter(ref -> ref != null).collect(
783                    Collectors.summarizingInt(srv -> srv.channels.size()));
784        }
785    }
786
787    static {
788        try {
789            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
790            ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type="
791                + SocketServer.class.getSimpleName() + "s");
792            mbs.registerMBean(new MBeanView(), mxbeanName);
793        } catch (MalformedObjectNameException | InstanceAlreadyExistsException
794                | MBeanRegistrationException | NotCompliantMBeanException e) {
795            // Does not happen
796        }
797    }
798
799}