001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.channels.SelectableChannel;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.Selector;
026import java.util.Set;
027import org.jgrapes.core.Component;
028import org.jgrapes.core.Components;
029import org.jgrapes.core.annotation.Handler;
030import org.jgrapes.core.events.Start;
031import org.jgrapes.core.events.Stop;
032import org.jgrapes.io.events.NioRegistration;
033
034/**
035 * A helper component that provides the central hub for non blocking
036 * I/O components. Exactly one {@code NioDispatcher} must exist in
037 * any tree with {@link NioHandler} components. 
038 */
039public class NioDispatcher extends Component implements Runnable {
040
041    private final Selector selector;
042    private Thread runner;
043    private final Object selectorGate = new Object();
044
045    /**
046     * Creates a new Dispatcher.
047     * 
048     * @throws IOException if an I/O exception occurred
049     */
050    public NioDispatcher() throws IOException {
051        selector = Selector.open();
052    }
053
054    /**
055     * Starts this dispatcher. A dispatcher has an associated thread that
056     * keeps it running.
057     * 
058     * @param event the event
059     */
060    @Handler
061    public void onStart(Start event) {
062        synchronized (this) {
063            if (runner != null && !runner.isInterrupted()) {
064                return;
065            }
066            runner = new Thread(this, Components.simpleObjectName(this));
067            runner.start();
068        }
069    }
070
071    /**
072     * Stops the thread that is associated with this dispatcher.
073     * 
074     * @param event the event
075     * @throws InterruptedException if the execution is interrupted
076     */
077    @Handler(priority = -10_000)
078    public void onStop(Stop event) throws InterruptedException {
079        synchronized (this) {
080            if (runner == null) {
081                return;
082            }
083            // It just might happen that the wakeup() occurs between the
084            // check for running and the select() in the thread's run loop,
085            // but we -- obviously -- cannot put the select() in a
086            // synchronized(this).
087            while (runner.isAlive()) {
088                runner.interrupt(); // *Should* be sufficient, but...
089                selector.wakeup(); // Make sure
090                runner.join(10);
091            }
092            runner = null;
093        }
094    }
095
096    /**
097     * Invoked once by the thread associated with the dispatcher. Handles
098     * all events from the underlying {@link Selector}.  
099     */
100    @Override
101    @SuppressWarnings({ "PMD.EmptySynchronizedBlock", "PMD.EmptyCatchBlock",
102        "PMD.AvoidCatchingThrowable", "PMD.EmptyControlStatement" })
103    public void run() {
104        try {
105            registerAsGenerator();
106            while (!Thread.currentThread().isInterrupted()) {
107                try {
108                    selector.select();
109                    Set<SelectionKey> selected = selector.selectedKeys();
110                    for (SelectionKey key : selected) {
111                        ((NioHandler) key.attachment())
112                            .handleOps(key.readyOps());
113                    }
114                    selected.clear();
115                    synchronized (selectorGate) {
116                        // Delay next iteration if another thread has the lock.
117                        // "Find bugs" complains, but this is really okay.
118                    }
119                } catch (InterruptedIOException | InterruptedException
120                        | Error e) {
121                    break;
122                } catch (Throwable e) {
123                    // Ignore anything else, this loop is crucial.
124                }
125            }
126        } finally {
127            unregisterAsGenerator();
128        }
129    }
130
131    /**
132     * Handle the NIO registration.
133     *
134     * @param event the event
135     * @throws IOException Signals that an I/O exception has occurred.
136     */
137    @Handler
138    public void onNioRegistration(NioRegistration event)
139            throws IOException {
140        @SuppressWarnings("PMD.CloseResource")
141        SelectableChannel channel = event.ioChannel();
142        channel.configureBlocking(false);
143        SelectionKey key;
144        synchronized (selectorGate) {
145            selector.wakeup(); // make sure selector isn't blocking
146            key = channel.register(
147                selector, event.ops(), event.handler());
148        }
149        event.setResult(new Registration(key));
150    }
151
152    /**
153     * Represents a NIO registration.
154     */
155    public class Registration extends NioRegistration.Registration {
156
157        private final SelectionKey key;
158
159        /**
160         * Instantiates a new registration.
161         *
162         * @param key the key
163         */
164        public Registration(SelectionKey key) {
165            super();
166            this.key = key;
167        }
168
169        @Override
170        public void updateInterested(int ops) {
171            synchronized (selectorGate) {
172                selector.wakeup(); // make sure selector isn't blocking
173                key.interestOps(ops);
174            }
175        }
176    }
177
178}