001/******************************************************************************* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016, 2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 *******************************************************************************/ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.nio.ByteBuffer; 024import java.nio.channels.Channels; 025import java.nio.channels.ReadableByteChannel; 026import java.util.Optional; 027import org.jgrapes.core.Channel; 028import org.jgrapes.core.Component; 029import org.jgrapes.core.Components; 030import org.jgrapes.core.Manager; 031import org.jgrapes.core.annotation.Handler; 032import org.jgrapes.core.events.Start; 033import org.jgrapes.core.events.Stop; 034import org.jgrapes.io.events.IOError; 035import org.jgrapes.io.events.Input; 036import org.jgrapes.io.util.ManagedBuffer; 037import org.jgrapes.io.util.ManagedBufferPool; 038import org.jgrapes.util.events.ConfigurationUpdate; 039 040/** 041 * A component that watches for new input on an 042 * {@link InputStream}. If new input becomes 043 * available, it is fired as {@link Input} event. 044 * 045 * This component should only be used to monitor an 046 * input stream that is available during the complete 047 * lifetime of the application. A typical usage is 048 * to make data from `System.in` available as events. 049 */ 050@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 051public class InputStreamMonitor extends Component implements Runnable { 052 053 private Channel dataChannel; 054 private InputStream input; 055 private boolean registered; 056 private Thread runner; 057 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers; 058 private int bufferSize = 2048; 059 060 /** 061 * Creates a new input stream monitor with its channel set to the given 062 * channel. The channel is also used for firing the {@link Input} 063 * events. 064 * 065 * @param componentChannel the component channel 066 * @param input the input stream 067 * @param dataChannel the data channel 068 */ 069 public InputStreamMonitor( 070 Channel componentChannel, InputStream input, Channel dataChannel) { 071 super(componentChannel); 072 this.input = input; 073 this.dataChannel = dataChannel; 074 } 075 076 /** 077 * Creates a new input stream monitor with its channel set to the given 078 * channel. The channel is also used for firing the {@link Input} 079 * events. 080 * 081 * @param componentChannel the component channel 082 * @param input the input 083 */ 084 public InputStreamMonitor(Channel componentChannel, InputStream input) { 085 this(componentChannel, input, componentChannel); 086 } 087 088 /** 089 * Sets the buffer size. 090 * 091 * @param bufferSize the buffer size 092 * @return the input stream monitor for easy chaining 093 */ 094 public InputStreamMonitor setBufferSize(int bufferSize) { 095 this.bufferSize = bufferSize; 096 return this; 097 } 098 099 /** 100 * Returns the buffer size. 101 * 102 * @return the buffer size 103 */ 104 public int getBufferSize() { 105 return bufferSize; 106 } 107 108 /** 109 * The component can be configured with events that include 110 * a path (see @link {@link ConfigurationUpdate#paths()}) 111 * that matches this components path (see {@link Manager#componentPath()}). 112 * 113 * The following properties are recognized: 114 * 115 * `bufferSize` 116 * : See {@link #setBufferSize(int)}. 117 * 118 * @param event the event 119 */ 120 @Handler 121 public void onConfigurationUpdate(ConfigurationUpdate event) { 122 event.values(componentPath()).ifPresent(values -> { 123 Optional.ofNullable(values.get("bufferSize")).ifPresent( 124 value -> setBufferSize(Integer.parseInt(value))); 125 }); 126 } 127 128 /** 129 * Starts a thread that continuously reads available 130 * data from the input stream. 131 * 132 * @param event the event 133 */ 134 @Handler 135 public void onStart(Start event) { 136 synchronized (this) { 137 if (runner != null) { 138 return; 139 } 140 buffers = new ManagedBufferPool<>(ManagedBuffer::new, 141 () -> { 142 return ByteBuffer.allocateDirect(bufferSize); 143 }, 2); 144 runner = new Thread(this, Components.simpleObjectName(this)); 145 // Because this cannot reliably be stopped, it doesn't prevent 146 // shutdown. 147 runner.setDaemon(true); 148 runner.start(); 149 } 150 } 151 152 /** 153 * Stops the thread that reads data from the input stream. 154 * Note that the input stream is not closed. 155 * 156 * @param event the event 157 * @throws InterruptedException the interrupted exception 158 */ 159 @Handler(priority = -10_000) 160 public void onStop(Stop event) throws InterruptedException { 161 synchronized (this) { 162 if (runner == null) { 163 return; 164 } 165 runner.interrupt(); 166 synchronized (this) { 167 if (registered) { 168 unregisterAsGenerator(); 169 registered = false; 170 } 171 } 172 runner = null; 173 } 174 } 175 176 @Override 177 public void run() { 178 Thread.currentThread().setName(Components.simpleObjectName(this)); 179 try { 180 synchronized (this) { 181 registerAsGenerator(); 182 registered = true; 183 } 184 @SuppressWarnings("PMD.CloseResource") 185 ReadableByteChannel inChannel = Channels.newChannel(input); 186 while (!Thread.currentThread().isInterrupted()) { 187 ManagedBuffer<ByteBuffer> buffer = buffers.acquire(); 188 int read = buffer.fillFromChannel(inChannel); 189 boolean eof = read == -1; 190 fire(Input.fromSink(buffer, eof), dataChannel); 191 if (eof) { 192 break; 193 } 194 } 195 } catch (InterruptedException e) { 196 // Some called stop(), so what? 197 } catch (IOException e) { 198 fire(new IOError(null, e), channel()); 199 } finally { 200 synchronized (this) { 201 if (registered) { 202 unregisterAsGenerator(); 203 registered = false; 204 } 205 } 206 } 207 } 208 209}