001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.channels.SelectableChannel; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.Selector; 026import java.util.Set; 027import org.jgrapes.core.Component; 028import org.jgrapes.core.Components; 029import org.jgrapes.core.annotation.Handler; 030import org.jgrapes.core.events.Start; 031import org.jgrapes.core.events.Stop; 032import org.jgrapes.io.events.NioRegistration; 033 034/** 035 * A helper component that provides the central hub for non blocking 036 * I/O components. Exactly one {@code NioDispatcher} must exist in 037 * any tree with {@link NioHandler} components. 038 */ 039public class NioDispatcher extends Component implements Runnable { 040 041 private final Selector selector; 042 private Thread runner; 043 private final Object selectorGate = new Object(); 044 045 /** 046 * Creates a new Dispatcher. 047 * 048 * @throws IOException if an I/O exception occurred 049 */ 050 public NioDispatcher() throws IOException { 051 selector = Selector.open(); 052 } 053 054 /** 055 * Starts this dispatcher. A dispatcher has an associated thread that 056 * keeps it running. 057 * 058 * @param event the event 059 */ 060 @Handler 061 public void onStart(Start event) { 062 synchronized (this) { 063 if (runner != null && !runner.isInterrupted()) { 064 return; 065 } 066 runner = new Thread(this, Components.simpleObjectName(this)); 067 runner.start(); 068 } 069 } 070 071 /** 072 * Stops the thread that is associated with this dispatcher. 073 * 074 * @param event the event 075 * @throws InterruptedException if the execution is interrupted 076 */ 077 @Handler(priority = -10_000) 078 public void onStop(Stop event) throws InterruptedException { 079 synchronized (this) { 080 if (runner == null) { 081 return; 082 } 083 // It just might happen that the wakeup() occurs between the 084 // check for running and the select() in the thread's run loop, 085 // but we -- obviously -- cannot put the select() in a 086 // synchronized(this). 087 while (runner.isAlive()) { 088 runner.interrupt(); // *Should* be sufficient, but... 089 selector.wakeup(); // Make sure 090 runner.join(10); 091 } 092 runner = null; 093 } 094 } 095 096 /** 097 * Invoked once by the thread associated with the dispatcher. Handles 098 * all events from the underlying {@link Selector}. 099 */ 100 @Override 101 @SuppressWarnings({ "PMD.EmptySynchronizedBlock", "PMD.EmptyCatchBlock", 102 "PMD.AvoidCatchingThrowable", "PMD.EmptyControlStatement" }) 103 public void run() { 104 try { 105 registerAsGenerator(); 106 while (!Thread.currentThread().isInterrupted()) { 107 try { 108 selector.select(); 109 Set<SelectionKey> selected = selector.selectedKeys(); 110 for (SelectionKey key : selected) { 111 ((NioHandler) key.attachment()) 112 .handleOps(key.readyOps()); 113 } 114 selected.clear(); 115 synchronized (selectorGate) { 116 // Delay next iteration if another thread has the lock. 117 // "Find bugs" complains, but this is really okay. 118 } 119 } catch (InterruptedIOException | InterruptedException 120 | Error e) { 121 break; 122 } catch (Throwable e) { 123 // Ignore anything else, this loop is crucial. 124 } 125 } 126 } finally { 127 unregisterAsGenerator(); 128 } 129 } 130 131 /** 132 * Handle the NIO registration. 133 * 134 * @param event the event 135 * @throws IOException Signals that an I/O exception has occurred. 136 */ 137 @Handler 138 public void onNioRegistration(NioRegistration event) 139 throws IOException { 140 @SuppressWarnings("PMD.CloseResource") 141 SelectableChannel channel = event.ioChannel(); 142 channel.configureBlocking(false); 143 SelectionKey key; 144 synchronized (selectorGate) { 145 selector.wakeup(); // make sure selector isn't blocking 146 key = channel.register( 147 selector, event.ops(), event.handler()); 148 } 149 event.setResult(new Registration(key)); 150 } 151 152 /** 153 * Represents a NIO registration. 154 */ 155 public class Registration extends NioRegistration.Registration { 156 157 private final SelectionKey key; 158 159 /** 160 * Instantiates a new registration. 161 * 162 * @param key the key 163 */ 164 public Registration(SelectionKey key) { 165 super(); 166 this.key = key; 167 } 168 169 @Override 170 public void updateInterested(int ops) { 171 synchronized (selectorGate) { 172 selector.wakeup(); // make sure selector isn't blocking 173 key.interestOps(ops); 174 } 175 } 176 } 177 178}