001/*******************************************************************************
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 *******************************************************************************/
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.Channels;
025import java.nio.channels.ReadableByteChannel;
026import java.util.Optional;
027import org.jgrapes.core.Channel;
028import org.jgrapes.core.Component;
029import org.jgrapes.core.Components;
030import org.jgrapes.core.Manager;
031import org.jgrapes.core.annotation.Handler;
032import org.jgrapes.core.events.Start;
033import org.jgrapes.core.events.Stop;
034import org.jgrapes.io.events.IOError;
035import org.jgrapes.io.events.Input;
036import org.jgrapes.io.util.ManagedBuffer;
037import org.jgrapes.io.util.ManagedBufferPool;
038import org.jgrapes.util.events.ConfigurationUpdate;
039
040/**
041 * A component that watches for new input on an
042 * {@link InputStream}. If new input becomes
043 * available, it is fired as {@link Input} event.
044 * 
045 * This component should only be used to monitor an
046 * input stream that is available during the complete
047 * lifetime of the application. A typical usage is
048 * to make data from `System.in` available as events.
049 */
050@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
051public class InputStreamMonitor extends Component implements Runnable {
052
053    private Channel dataChannel;
054    private InputStream input;
055    private boolean registered;
056    private Thread runner;
057    private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers;
058    private int bufferSize = 2048;
059
060    /**
061     * Creates a new input stream monitor with its channel set to the given 
062     * channel. The channel is also used for firing the {@link Input}
063     * events.
064     *
065     * @param componentChannel the component channel
066     * @param input the input stream
067     * @param dataChannel the data channel
068     */
069    public InputStreamMonitor(
070            Channel componentChannel, InputStream input, Channel dataChannel) {
071        super(componentChannel);
072        this.input = input;
073        this.dataChannel = dataChannel;
074    }
075
076    /**
077     * Creates a new input stream monitor with its channel set to the given 
078     * channel. The channel is also used for firing the {@link Input}
079     * events.
080     *
081     * @param componentChannel the component channel
082     * @param input the input
083     */
084    public InputStreamMonitor(Channel componentChannel, InputStream input) {
085        this(componentChannel, input, componentChannel);
086    }
087
088    /**
089     * Sets the buffer size.
090     *
091     * @param bufferSize the buffer size
092     * @return the input stream monitor for easy chaining
093     */
094    public InputStreamMonitor setBufferSize(int bufferSize) {
095        this.bufferSize = bufferSize;
096        return this;
097    }
098
099    /**
100     * Returns the buffer size.
101     *
102     * @return the buffer size
103     */
104    public int getBufferSize() {
105        return bufferSize;
106    }
107
108    /**
109     * The component can be configured with events that include
110     * a path (see @link {@link ConfigurationUpdate#paths()})
111     * that matches this components path (see {@link Manager#componentPath()}).
112     * 
113     * The following properties are recognized:
114     * 
115     * `bufferSize`
116     * : See {@link #setBufferSize(int)}.
117     * 
118     * @param event the event
119     */
120    @Handler
121    public void onConfigurationUpdate(ConfigurationUpdate event) {
122        event.values(componentPath()).ifPresent(values -> {
123            Optional.ofNullable(values.get("bufferSize")).ifPresent(
124                value -> setBufferSize(Integer.parseInt(value)));
125        });
126    }
127
128    /**
129     * Starts a thread that continuously reads available
130     * data from the input stream. 
131     *
132     * @param event the event
133     */
134    @Handler
135    public void onStart(Start event) {
136        synchronized (this) {
137            if (runner != null) {
138                return;
139            }
140            buffers = new ManagedBufferPool<>(ManagedBuffer::new,
141                () -> {
142                    return ByteBuffer.allocateDirect(bufferSize);
143                }, 2);
144            runner = new Thread(this, Components.simpleObjectName(this));
145            // Because this cannot reliably be stopped, it doesn't prevent
146            // shutdown.
147            runner.setDaemon(true);
148            runner.start();
149        }
150    }
151
152    /**
153     * Stops the thread that reads data from the input stream.
154     * Note that the input stream is not closed.
155     *
156     * @param event the event
157     * @throws InterruptedException the interrupted exception
158     */
159    @Handler(priority = -10_000)
160    public void onStop(Stop event) throws InterruptedException {
161        synchronized (this) {
162            if (runner == null) {
163                return;
164            }
165            runner.interrupt();
166            synchronized (this) {
167                if (registered) {
168                    unregisterAsGenerator();
169                    registered = false;
170                }
171            }
172            runner = null;
173        }
174    }
175
176    @Override
177    public void run() {
178        Thread.currentThread().setName(Components.simpleObjectName(this));
179        try {
180            synchronized (this) {
181                registerAsGenerator();
182                registered = true;
183            }
184            @SuppressWarnings("PMD.CloseResource")
185            ReadableByteChannel inChannel = Channels.newChannel(input);
186            while (!Thread.currentThread().isInterrupted()) {
187                ManagedBuffer<ByteBuffer> buffer = buffers.acquire();
188                int read = buffer.fillFromChannel(inChannel);
189                boolean eof = read == -1;
190                fire(Input.fromSink(buffer, eof), dataChannel);
191                if (eof) {
192                    break;
193                }
194            }
195        } catch (InterruptedException e) {
196            // Some called stop(), so what?
197        } catch (IOException e) {
198            fire(new IOError(null, e), channel());
199        } finally {
200            synchronized (this) {
201                if (registered) {
202                    unregisterAsGenerator();
203                    registered = false;
204                }
205            }
206        }
207    }
208
209}