001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2023  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.ByteArrayInputStream;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.nio.channels.SeekableByteChannel;
027import java.util.Map;
028import org.jgrapes.core.Event;
029import org.jgrapes.core.EventPipeline;
030import org.jgrapes.io.IOSubchannel;
031import org.jgrapes.io.events.Closed;
032import org.jgrapes.io.events.IOError;
033import org.jgrapes.io.events.IOEvent;
034import org.jgrapes.io.events.Input;
035import org.jgrapes.io.events.Output;
036
037/**
038 * Forwards the content of an input stream as a sequence of 
039 * {@link Output} (or optionally {@link Input}) events.
040 * 
041 * The default settings and the constructor 
042 * {@link #InputStreamPipeline(InputStream, IOSubchannel)} reflect
043 * the usage of this class for generating a response (e.g. provide
044 * the content of a file in response to a request from a client).
045 * Using the class with a "downstream" event pipeline, generating
046 * {@link Input} events is used when an input stream generates events
047 * that should be processed as requests by the application.
048 */
049@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
050public class InputStreamPipeline implements Runnable {
051
052    private InputStream inStream;
053    private IOSubchannel channel;
054    private EventPipeline eventPipeline;
055    private boolean sendClosed = true;
056    private Map<Object, Object> eventAssociations;
057    private boolean sendInputEvents;
058
059    /**
060     * Creates a new pipeline that sends the data from the given input stream
061     * as events on the given channel, using the given event pipeline.
062     * 
063     * @param in the input stream to read from
064     * @param channel the channel to send to
065     * @param eventPipeline
066     *            the event pipeline used for firing events
067     */
068    @SuppressWarnings("PMD.ShortVariable")
069    public InputStreamPipeline(InputStream in, IOSubchannel channel,
070            EventPipeline eventPipeline) {
071        this.inStream = in;
072        this.channel = channel;
073        this.eventPipeline = eventPipeline;
074    }
075
076    /**
077     * Creates a new pipeline that sends the data from the given input stream
078     * as events on the given channel, using the channel's response pipeline.
079     * 
080     * @param in the input stream to read from
081     * @param channel the channel to send to
082     */
083    @SuppressWarnings("PMD.ShortVariable")
084    public InputStreamPipeline(InputStream in, IOSubchannel channel) {
085        this(in, channel, channel.responsePipeline());
086    }
087
088    /**
089     * Causes the data to be fired as {@link Input} events rather
090     * than the usual {@link Output} events. 
091     * 
092     * @return the stream for easy chaining
093     */
094    public InputStreamPipeline sendInputEvents() {
095        sendInputEvents = true;
096        return this;
097    }
098
099    /**
100     * Suppresses the sending of a closed event when the stream is closed. 
101     * 
102     * @return the stream for easy chaining
103     */
104    public InputStreamPipeline suppressClosed() {
105        sendClosed = false;
106        return this;
107    }
108
109    /**
110     * Configure associations that are applied to the generated
111     * Output events, see {@link Event#setAssociated}.
112     * 
113     * @param associations the associations to apply
114     * @return the pipeline for easy chaining
115     */
116    public InputStreamPipeline
117            setEventAssociations(Map<Object, Object> associations) {
118        eventAssociations = associations;
119        return this;
120    }
121
122    @Override
123    public void run() {
124        try {
125            if (inStream instanceof FileInputStream fip) {
126                seekableTransfer(fip.getChannel());
127            } else {
128                defaultTransfer();
129            }
130            if (sendClosed) {
131                eventPipeline.fire(associate(new Closed<Void>()), channel);
132            }
133        } catch (InterruptedException e) {
134            // Just stop
135        } catch (IOException e) {
136            eventPipeline.fire(associate(new IOError(null, e)), channel);
137        }
138    }
139
140    private void defaultTransfer() throws InterruptedException, IOException {
141        // If available() returns remaining, we can optimize.
142        // Regrettably, there is no marker interface for this, but
143        // the assumption should be true for ByteArrayInputStream.
144        boolean availableIsRemaining = inStream instanceof ByteArrayInputStream;
145        while (true) {
146            ManagedBuffer<ByteBuffer> buffer = null;
147            try {
148                buffer = channel.byteBufferPool().acquire();
149                var backing = buffer.backing;
150                int recvd = inStream.read(backing.array(),
151                    backing.position(), backing.remaining());
152                if (recvd > 0) {
153                    boolean eof
154                        = availableIsRemaining && inStream.available() == 0;
155                    backing.position(backing.position() + recvd);
156                    eventPipeline.fire(associate(ioEvent(buffer, eof)),
157                        channel);
158                    if (eof) {
159                        break;
160                    }
161                    continue;
162                }
163                if (recvd == -1) {
164                    eventPipeline.fire(associate(ioEvent(buffer, true)),
165                        channel);
166                    break;
167                }
168                // Reading 0 bytes shouldn't happen.
169                buffer.unlockBuffer();
170            } catch (IOException e) {
171                buffer.unlockBuffer();
172                throw e;
173            }
174        }
175    }
176
177    /**
178     * A seekable channel allows us to avoid generating an event with
179     * no data and eof set, because we can check after reading if there
180     * is remaining data.
181     *
182     * @param input the input
183     * @throws InterruptedException the interrupted exception
184     * @throws IOException Signals that an I/O exception has occurred.
185     */
186    private void seekableTransfer(SeekableByteChannel input)
187            throws InterruptedException, IOException {
188        while (true) {
189            ManagedBuffer<ByteBuffer> buffer = null;
190            try {
191                buffer = channel.byteBufferPool().acquire();
192                int recvd = input.read(buffer.backing);
193                if (recvd > 0) {
194                    boolean eof = input.position() == input.size();
195                    eventPipeline.fire(associate(ioEvent(buffer, eof)),
196                        channel);
197                    if (eof) {
198                        break;
199                    }
200                    continue;
201                }
202                if (recvd == -1) {
203                    eventPipeline.fire(associate(ioEvent(buffer, true)),
204                        channel);
205                    break;
206                }
207                // Reading 0 bytes shouldn't happen.
208                buffer.unlockBuffer();
209            } catch (IOException e) {
210                buffer.unlockBuffer();
211                throw e;
212            }
213        }
214    }
215
216    private IOEvent<ByteBuffer> ioEvent(ManagedBuffer<ByteBuffer> buffer,
217            boolean eor) {
218        if (sendInputEvents) {
219            return Input.fromSink(buffer, eor);
220        }
221        return Output.fromSink(buffer, eor);
222    }
223
224    private Event<?> associate(Event<?> event) {
225        if (eventAssociations != null) {
226            for (var entry : eventAssociations.entrySet()) {
227                event.setAssociated(entry.getKey(), entry.getValue());
228            }
229        }
230        return event;
231    }
232
233}