001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2023  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.ByteArrayInputStream;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.nio.channels.SeekableByteChannel;
027import java.util.Map;
028import org.jgrapes.core.Event;
029import org.jgrapes.core.EventPipeline;
030import org.jgrapes.io.IOSubchannel;
031import org.jgrapes.io.events.Closed;
032import org.jgrapes.io.events.IOError;
033import org.jgrapes.io.events.IOEvent;
034import org.jgrapes.io.events.Input;
035import org.jgrapes.io.events.Output;
036
037/**
038 * Forwards the content of an input stream as a sequence of 
039 * {@link Output} (or optionally {@link Input}) events.
040 * 
041 * The default settings and the constructor 
042 * {@link #InputStreamPipeline(InputStream, IOSubchannel)} reflect
043 * the usage of this class for generating a response (e.g. provide
044 * the content of a file in response to a request from a client).
045 * Using the class with a "downstream" event pipeline, generating
046 * {@link Input} events is used when an input stream generates events
047 * that should be processed as requests by the application.
048 */
049@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
050public class InputStreamPipeline implements Runnable {
051
052    private InputStream inStream;
053    private IOSubchannel channel;
054    private EventPipeline eventPipeline;
055    private boolean sendClosed = true;
056    private Map<Object, Object> eventAssociations;
057    private boolean sendInputEvents;
058
059    /**
060     * Creates a new pipeline that sends the data from the given input stream
061     * as events on the given channel, using the given event pipeline.
062     * 
063     * @param in the input stream to read from
064     * @param channel the channel to send to
065     * @param eventPipeline
066     *            the event pipeline used for firing events
067     */
068    @SuppressWarnings("PMD.ShortVariable")
069    public InputStreamPipeline(InputStream in, IOSubchannel channel,
070            EventPipeline eventPipeline) {
071        this.inStream = in;
072        this.channel = channel;
073        this.eventPipeline = eventPipeline;
074    }
075
076    /**
077     * Creates a new pipeline that sends the data from the given input stream
078     * as events on the given channel, using the channel's response pipeline.
079     * 
080     * @param in the input stream to read from
081     * @param channel the channel to send to
082     */
083    @SuppressWarnings("PMD.ShortVariable")
084    public InputStreamPipeline(InputStream in, IOSubchannel channel) {
085        this(in, channel, channel.responsePipeline());
086    }
087
088    /**
089     * Causes the data to be fired as {@link Input} events rather
090     * than the usual {@link Output} events. 
091     * 
092     * @return the stream for easy chaining
093     */
094    public InputStreamPipeline sendInputEvents() {
095        sendInputEvents = true;
096        return this;
097    }
098
099    /**
100     * Suppresses the sending of a closed event when the stream is closed. 
101     * 
102     * @return the stream for easy chaining
103     */
104    public InputStreamPipeline suppressClosed() {
105        sendClosed = false;
106        return this;
107    }
108
109    /**
110     * Configure associations that are applied to the generated
111     * Output events, see {@link Event#setAssociated}.
112     * 
113     * @param associations the associations to apply
114     * @return the pipeline for easy chaining
115     */
116    public InputStreamPipeline
117            setEventAssociations(Map<Object, Object> associations) {
118        eventAssociations = associations;
119        return this;
120    }
121
122    @Override
123    @SuppressWarnings("PMD.CloseResource")
124    public void run() {
125        try {
126            if (inStream instanceof FileInputStream fip) {
127                seekableTransfer(fip.getChannel());
128            } else {
129                defaultTransfer();
130            }
131            if (sendClosed) {
132                eventPipeline.fire(associate(new Closed<Void>()), channel);
133            }
134        } catch (InterruptedException e) { // NOPMD
135            // Just stop
136        } catch (IOException e) {
137            eventPipeline.fire(associate(new IOError(null, e)), channel);
138        }
139    }
140
141    private void defaultTransfer() throws InterruptedException, IOException {
142        // If available() returns remaining, we can optimize.
143        // Regrettably, there is no marker interface for this, but
144        // the assumption should be true for ByteArrayInputStream.
145        boolean availableIsRemaining = inStream instanceof ByteArrayInputStream;
146        while (true) {
147            ManagedBuffer<ByteBuffer> buffer = null;
148            try {
149                buffer = channel.byteBufferPool().acquire();
150                var backing = buffer.backing;
151                int recvd = inStream.read(backing.array(),
152                    backing.position(), backing.remaining());
153                if (recvd > 0) {
154                    boolean eof
155                        = availableIsRemaining && inStream.available() == 0;
156                    backing.position(backing.position() + recvd);
157                    eventPipeline.fire(associate(ioEvent(buffer, eof)),
158                        channel);
159                    if (eof) {
160                        break;
161                    }
162                    continue;
163                }
164                if (recvd == -1) {
165                    eventPipeline.fire(associate(ioEvent(buffer, true)),
166                        channel);
167                    break;
168                }
169                // Reading 0 bytes shouldn't happen.
170                buffer.unlockBuffer();
171            } catch (IOException e) {
172                buffer.unlockBuffer();
173                throw e;
174            }
175        }
176    }
177
178    /**
179     * A seekable channel allows us to avoid generating an event with
180     * no data and eof set, because we can check after reading if there
181     * is remaining data.
182     *
183     * @param input the input
184     * @throws InterruptedException the interrupted exception
185     * @throws IOException Signals that an I/O exception has occurred.
186     */
187    private void seekableTransfer(SeekableByteChannel input)
188            throws InterruptedException, IOException {
189        while (true) {
190            ManagedBuffer<ByteBuffer> buffer = null;
191            try {
192                buffer = channel.byteBufferPool().acquire();
193                int recvd = input.read(buffer.backing);
194                if (recvd > 0) {
195                    boolean eof = input.position() == input.size();
196                    eventPipeline.fire(associate(ioEvent(buffer, eof)),
197                        channel);
198                    if (eof) {
199                        break;
200                    }
201                    continue;
202                }
203                if (recvd == -1) {
204                    eventPipeline.fire(associate(ioEvent(buffer, true)),
205                        channel);
206                    break;
207                }
208                // Reading 0 bytes shouldn't happen.
209                buffer.unlockBuffer();
210            } catch (IOException e) {
211                buffer.unlockBuffer();
212                throw e;
213            }
214        }
215    }
216
217    private IOEvent<ByteBuffer> ioEvent(ManagedBuffer<ByteBuffer> buffer,
218            boolean eor) {
219        if (sendInputEvents) {
220            return Input.fromSink(buffer, eor);
221        }
222        return Output.fromSink(buffer, eor);
223    }
224
225    private Event<?> associate(Event<?> event) {
226        if (eventAssociations != null) {
227            for (var entry : eventAssociations.entrySet()) {
228                event.setAssociated(entry.getKey(), entry.getValue());
229            }
230        }
231        return event;
232    }
233
234}