001/*******************************************************************************
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 *******************************************************************************/
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.Channels;
025import java.nio.channels.ReadableByteChannel;
026import java.util.Optional;
027
028import org.jgrapes.core.Channel;
029import org.jgrapes.core.Component;
030import org.jgrapes.core.Components;
031import org.jgrapes.core.Manager;
032import org.jgrapes.core.annotation.Handler;
033import org.jgrapes.core.events.Start;
034import org.jgrapes.core.events.Stop;
035import org.jgrapes.io.events.IOError;
036import org.jgrapes.io.events.Input;
037import org.jgrapes.io.util.ManagedBuffer;
038import org.jgrapes.io.util.ManagedBufferPool;
039import org.jgrapes.util.events.ConfigurationUpdate;
040
041/**
042 * A component that watches for new input on an
043 * {@link InputStream}. If new input becomes
044 * available, it is fired as {@link Input} event.
045 * 
046 * This component should only be used to monitor an
047 * input stream that is available during the complete
048 * lifetime of the application. A typical usage is
049 * to make data from `System.in` available as events.
050 */
051@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
052public class InputStreamMonitor extends Component implements Runnable {
053
054    private Channel dataChannel;
055    private InputStream input;
056    private boolean registered;
057    private Thread runner;
058    private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers;
059    private int bufferSize = 2048;
060
061    /**
062     * Creates a new input stream monitor with its channel set to the given 
063     * channel. The channel is also used for firing the {@link Input}
064     * events.
065     *
066     * @param componentChannel the component channel
067     * @param input the input stream
068     * @param dataChannel the data channel
069     */
070    public InputStreamMonitor(
071            Channel componentChannel, InputStream input, Channel dataChannel) {
072        super(componentChannel);
073        this.input = input;
074        this.dataChannel = dataChannel;
075    }
076
077    /**
078     * Creates a new input stream monitor with its channel set to the given 
079     * channel. The channel is also used for firing the {@link Input}
080     * events.
081     *
082     * @param componentChannel the component channel
083     * @param input the input
084     */
085    public InputStreamMonitor(Channel componentChannel, InputStream input) {
086        this(componentChannel, input, componentChannel);
087    }
088
089    /**
090     * Sets the buffer size.
091     *
092     * @param bufferSize the buffer size
093     * @return the input stream monitor for easy chaining
094     */
095    public InputStreamMonitor setBufferSize(int bufferSize) {
096        this.bufferSize = bufferSize;
097        return this;
098    }
099
100    /**
101     * Returns the buffer size.
102     *
103     * @return the buffer size
104     */
105    public int getBufferSize() {
106        return bufferSize;
107    }
108
109    /**
110     * The component can be configured with events that include
111     * a path (see @link {@link ConfigurationUpdate#paths()})
112     * that matches this components path (see {@link Manager#componentPath()}).
113     * 
114     * The following properties are recognized:
115     * 
116     * `bufferSize`
117     * : See {@link #setBufferSize(int)}.
118     * 
119     * @param event the event
120     */
121    @Handler
122    public void onConfigurationUpdate(ConfigurationUpdate event) {
123        event.values(componentPath()).ifPresent(values -> {
124            Optional.ofNullable(values.get("bufferSize")).ifPresent(
125                value -> setBufferSize(Integer.parseInt(value)));
126        });
127    }
128
129    /**
130     * Starts a thread that continuously reads available
131     * data from the input stream. 
132     *
133     * @param event the event
134     */
135    @Handler
136    public void onStart(Start event) {
137        synchronized (this) {
138            if (runner != null) {
139                return;
140            }
141            buffers = new ManagedBufferPool<>(ManagedBuffer::new,
142                () -> {
143                    return ByteBuffer.allocateDirect(bufferSize);
144                }, 2);
145            runner = new Thread(this, Components.simpleObjectName(this));
146            // Because this cannot reliably be stopped, it doesn't prevent
147            // shutdown.
148            runner.setDaemon(true);
149            runner.start();
150        }
151    }
152
153    /**
154     * Stops the thread that reads data from the input stream.
155     * Note that the input stream is not closed.
156     *
157     * @param event the event
158     * @throws InterruptedException the interrupted exception
159     */
160    @Handler(priority = -10_000)
161    public void onStop(Stop event) throws InterruptedException {
162        synchronized (this) {
163            if (runner == null) {
164                return;
165            }
166            runner.interrupt();
167            synchronized (this) {
168                if (registered) {
169                    unregisterAsGenerator();
170                    registered = false;
171                }
172            }
173            runner = null;
174        }
175    }
176
177    @Override
178    public void run() {
179        Thread.currentThread().setName(Components.simpleObjectName(this));
180        try {
181            synchronized (this) {
182                registerAsGenerator();
183                registered = true;
184            }
185            ReadableByteChannel inChannel = Channels.newChannel(input);
186            while (!Thread.currentThread().isInterrupted()) {
187                ManagedBuffer<ByteBuffer> buffer = buffers.acquire();
188                int read = buffer.fillFromChannel(inChannel);
189                boolean eof = read == -1;
190                fire(Input.fromSink(buffer, eof), dataChannel);
191                if (eof) {
192                    break;
193                }
194            }
195        } catch (InterruptedException e) {
196            // Some called stop(), so what?
197        } catch (IOException e) {
198            fire(new IOError(null, e), channel());
199        } finally {
200            synchronized (this) {
201                if (registered) {
202                    unregisterAsGenerator();
203                    registered = false;
204                }
205            }
206        }
207    }
208
209}