001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.Channels;
025import java.nio.channels.ReadableByteChannel;
026
027import org.jgrapes.core.EventPipeline;
028import org.jgrapes.io.IOSubchannel;
029import org.jgrapes.io.events.Close;
030import org.jgrapes.io.events.IOError;
031import org.jgrapes.io.events.Output;
032
033/**
034 * Forwards the content of an input stream as a sequence of {@link Output}
035 * events.
036 */
037@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
038public class InputStreamPipeline implements Runnable {
039
040    private InputStream inStream;
041    private IOSubchannel channel;
042    private EventPipeline eventPipeline;
043    private boolean sendClose = true;
044
045    /**
046     * Creates a new pipeline that sends the data from the given input stream
047     * as events on the given channel, using the given event pipeline.
048     * 
049     * @param in the input stream to read from
050     * @param channel the channel to send to
051     * @param eventPipeline
052     *            the event pipeline used for firing events
053     */
054    @SuppressWarnings("PMD.ShortVariable")
055    public InputStreamPipeline(InputStream in, IOSubchannel channel,
056            EventPipeline eventPipeline) {
057        this.inStream = in;
058        this.channel = channel;
059        this.eventPipeline = eventPipeline;
060    }
061
062    /**
063     * Creates a new pipeline that sends the data from the given input stream
064     * as events on the given channel, using the channel's response pipeline.
065     * 
066     * @param in the input stream to read from
067     * @param channel the channel to send to
068     */
069    @SuppressWarnings("PMD.ShortVariable")
070    public InputStreamPipeline(InputStream in, IOSubchannel channel) {
071        this(in, channel, channel.responsePipeline());
072    }
073
074    /**
075     * Suppresses the sending of a close event when the stream is closed. 
076     * 
077     * @return the stream for easy chaining
078     */
079    public InputStreamPipeline suppressClose() {
080        sendClose = false;
081        return this;
082    }
083
084    @Override
085    public void run() {
086        try (ReadableByteChannel inChannel = Channels.newChannel(inStream)) {
087            ManagedBuffer<ByteBuffer> lookAhead = ManagedBuffer.wrap(
088                ByteBuffer.allocate(channel.byteBufferPool().bufferSize()));
089            // First attempt
090            if (lookAhead.fillFromChannel(inChannel) == -1) {
091                ManagedBuffer<ByteBuffer> buffer
092                    = channel.byteBufferPool().acquire();
093                eventPipeline.fire(Output.fromSink(buffer, true), channel);
094            } else {
095                while (true) {
096                    // Save data read so far
097                    ManagedBuffer<ByteBuffer> buffer
098                        = channel.byteBufferPool().acquire();
099                    buffer.linkBackingBuffer(lookAhead);
100                    // Get new look ahead
101                    lookAhead = ManagedBuffer.wrap(ByteBuffer.allocate(
102                        channel.byteBufferPool().bufferSize()));
103                    // Next read attempt
104                    boolean eof;
105                    try {
106                        eof = lookAhead.fillFromChannel(inChannel) == -1;
107                    } catch (IOException e) {
108                        buffer.unlockBuffer();
109                        throw e;
110                    }
111                    // Fire "old" data with up-to-date end of record flag.
112                    eventPipeline.fire(Output.fromSink(buffer, eof), channel);
113                    if (eof) {
114                        break;
115                    }
116                }
117            }
118            if (sendClose) {
119                eventPipeline.fire(new Close(), channel);
120            }
121        } catch (InterruptedException e) {
122            // Just stop
123        } catch (IOException e) {
124            eventPipeline.fire(new IOError(null, e), channel);
125        }
126    }
127}