001/*******************************************************************************
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 *******************************************************************************/
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.Channels;
025import java.nio.channels.ReadableByteChannel;
026import java.util.Optional;
027import org.jgrapes.core.Channel;
028import org.jgrapes.core.Component;
029import org.jgrapes.core.Components;
030import org.jgrapes.core.Manager;
031import org.jgrapes.core.annotation.Handler;
032import org.jgrapes.core.events.Start;
033import org.jgrapes.core.events.Stop;
034import org.jgrapes.io.events.IOError;
035import org.jgrapes.io.events.Input;
036import org.jgrapes.io.util.ManagedBuffer;
037import org.jgrapes.io.util.ManagedBufferPool;
038import org.jgrapes.util.events.ConfigurationUpdate;
039
040/**
041 * A component that watches for new input on an
042 * {@link InputStream}. If new input becomes
043 * available, it is fired as {@link Input} event.
044 * 
045 * This component should only be used to monitor an
046 * input stream that is available during the complete
047 * lifetime of the application. A typical usage is
048 * to make data from `System.in` available as events.
049 */
050@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
051public class InputStreamMonitor extends Component implements Runnable {
052
053    @SuppressWarnings("PMD.SingularField")
054    private Channel dataChannel;
055    @SuppressWarnings("PMD.SingularField")
056    private InputStream input;
057    private boolean registered;
058    private Thread runner;
059    @SuppressWarnings("PMD.SingularField")
060    private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers;
061    private int bufferSize = 2048;
062
063    /**
064     * Creates a new input stream monitor with its channel set to the given 
065     * channel. The channel is also used for firing the {@link Input}
066     * events.
067     *
068     * @param componentChannel the component channel
069     * @param input the input stream
070     * @param dataChannel the data channel
071     */
072    public InputStreamMonitor(
073            Channel componentChannel, InputStream input, Channel dataChannel) {
074        super(componentChannel);
075        this.input = input;
076        this.dataChannel = dataChannel;
077    }
078
079    /**
080     * Creates a new input stream monitor with its channel set to the given 
081     * channel. The channel is also used for firing the {@link Input}
082     * events.
083     *
084     * @param componentChannel the component channel
085     * @param input the input
086     */
087    public InputStreamMonitor(Channel componentChannel, InputStream input) {
088        this(componentChannel, input, componentChannel);
089    }
090
091    /**
092     * Sets the buffer size.
093     *
094     * @param bufferSize the buffer size
095     * @return the input stream monitor for easy chaining
096     */
097    public InputStreamMonitor setBufferSize(int bufferSize) {
098        this.bufferSize = bufferSize;
099        return this;
100    }
101
102    /**
103     * Returns the buffer size.
104     *
105     * @return the buffer size
106     */
107    public int getBufferSize() {
108        return bufferSize;
109    }
110
111    /**
112     * The component can be configured with events that include
113     * a path (see @link {@link ConfigurationUpdate#paths()})
114     * that matches this components path (see {@link Manager#componentPath()}).
115     * 
116     * The following properties are recognized:
117     * 
118     * `bufferSize`
119     * : See {@link #setBufferSize(int)}.
120     * 
121     * @param event the event
122     */
123    @Handler
124    public void onConfigurationUpdate(ConfigurationUpdate event) {
125        event.values(componentPath()).ifPresent(values -> {
126            Optional.ofNullable(values.get("bufferSize")).ifPresent(
127                value -> setBufferSize(Integer.parseInt(value)));
128        });
129    }
130
131    /**
132     * Starts a thread that continuously reads available
133     * data from the input stream. 
134     *
135     * @param event the event
136     */
137    @Handler
138    public void onStart(Start event) {
139        synchronized (this) {
140            if (runner != null) {
141                return;
142            }
143            buffers = new ManagedBufferPool<>(ManagedBuffer::new,
144                () -> {
145                    return ByteBuffer.allocateDirect(bufferSize);
146                }, 2);
147            runner = new Thread(this, Components.simpleObjectName(this));
148            // Because this cannot reliably be stopped, it doesn't prevent
149            // shutdown.
150            runner.setDaemon(true);
151            runner.start();
152        }
153    }
154
155    /**
156     * Stops the thread that reads data from the input stream.
157     * Note that the input stream is not closed.
158     *
159     * @param event the event
160     * @throws InterruptedException the interrupted exception
161     */
162    @Handler(priority = -10_000)
163    public void onStop(Stop event) throws InterruptedException {
164        synchronized (this) {
165            if (runner == null) {
166                return;
167            }
168            runner.interrupt();
169            synchronized (this) {
170                if (registered) {
171                    unregisterAsGenerator();
172                    registered = false;
173                }
174            }
175            runner = null;
176        }
177    }
178
179    @Override
180    public void run() {
181        Thread.currentThread().setName(Components.simpleObjectName(this));
182        try {
183            synchronized (this) {
184                registerAsGenerator();
185                registered = true;
186            }
187            @SuppressWarnings("PMD.CloseResource")
188            ReadableByteChannel inChannel = Channels.newChannel(input);
189            while (!Thread.currentThread().isInterrupted()) {
190                ManagedBuffer<ByteBuffer> buffer = buffers.acquire();
191                int read = buffer.fillFromChannel(inChannel);
192                boolean eof = read == -1;
193                fire(Input.fromSink(buffer, eof), dataChannel);
194                if (eof) {
195                    break;
196                }
197            }
198        } catch (InterruptedException e) { // NOPMD
199            // Some called stop(), so what?
200        } catch (IOException e) {
201            fire(new IOError(null, e), channel());
202        } finally {
203            synchronized (this) {
204                if (registered) {
205                    unregisterAsGenerator();
206                    registered = false;
207                }
208            }
209        }
210    }
211
212}