001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016, 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.ByteArrayInputStream; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.ByteBuffer; 026import java.nio.channels.SeekableByteChannel; 027import java.util.Map; 028import org.jgrapes.core.Event; 029import org.jgrapes.core.EventPipeline; 030import org.jgrapes.io.IOSubchannel; 031import org.jgrapes.io.events.Closed; 032import org.jgrapes.io.events.IOError; 033import org.jgrapes.io.events.IOEvent; 034import org.jgrapes.io.events.Input; 035import org.jgrapes.io.events.Output; 036 037/** 038 * Forwards the content of an input stream as a sequence of 039 * {@link Output} (or optionally {@link Input}) events. 040 * 041 * The default settings and the constructor 042 * {@link #InputStreamPipeline(InputStream, IOSubchannel)} reflect 043 * the usage of this class for generating a response (e.g. provide 044 * the content of a file in response to a request from a client). 045 * Using the class with a "downstream" event pipeline, generating 046 * {@link Input} events is used when an input stream generates events 047 * that should be processed as requests by the application. 048 */ 049@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 050public class InputStreamPipeline implements Runnable { 051 052 private InputStream inStream; 053 private IOSubchannel channel; 054 private EventPipeline eventPipeline; 055 private boolean sendClosed = true; 056 private Map<Object, Object> eventAssociations; 057 private boolean sendInputEvents; 058 059 /** 060 * Creates a new pipeline that sends the data from the given input stream 061 * as events on the given channel, using the given event pipeline. 062 * 063 * @param in the input stream to read from 064 * @param channel the channel to send to 065 * @param eventPipeline 066 * the event pipeline used for firing events 067 */ 068 @SuppressWarnings("PMD.ShortVariable") 069 public InputStreamPipeline(InputStream in, IOSubchannel channel, 070 EventPipeline eventPipeline) { 071 this.inStream = in; 072 this.channel = channel; 073 this.eventPipeline = eventPipeline; 074 } 075 076 /** 077 * Creates a new pipeline that sends the data from the given input stream 078 * as events on the given channel, using the channel's response pipeline. 079 * 080 * @param in the input stream to read from 081 * @param channel the channel to send to 082 */ 083 @SuppressWarnings("PMD.ShortVariable") 084 public InputStreamPipeline(InputStream in, IOSubchannel channel) { 085 this(in, channel, channel.responsePipeline()); 086 } 087 088 /** 089 * Causes the data to be fired as {@link Input} events rather 090 * than the usual {@link Output} events. 091 * 092 * @return the stream for easy chaining 093 */ 094 public InputStreamPipeline sendInputEvents() { 095 sendInputEvents = true; 096 return this; 097 } 098 099 /** 100 * Suppresses the sending of a closed event when the stream is closed. 101 * 102 * @return the stream for easy chaining 103 */ 104 public InputStreamPipeline suppressClosed() { 105 sendClosed = false; 106 return this; 107 } 108 109 /** 110 * Configure associations that are applied to the generated 111 * Output events, see {@link Event#setAssociated}. 112 * 113 * @param associations the associations to apply 114 * @return the pipeline for easy chaining 115 */ 116 public InputStreamPipeline 117 setEventAssociations(Map<Object, Object> associations) { 118 eventAssociations = associations; 119 return this; 120 } 121 122 @Override 123 public void run() { 124 try { 125 if (inStream instanceof FileInputStream fip) { 126 seekableTransfer(fip.getChannel()); 127 } else { 128 defaultTransfer(); 129 } 130 if (sendClosed) { 131 eventPipeline.fire(associate(new Closed<Void>()), channel); 132 } 133 } catch (InterruptedException e) { 134 // Just stop 135 } catch (IOException e) { 136 eventPipeline.fire(associate(new IOError(null, e)), channel); 137 } 138 } 139 140 private void defaultTransfer() throws InterruptedException, IOException { 141 // If available() returns remaining, we can optimize. 142 // Regrettably, there is no marker interface for this, but 143 // the assumption should be true for ByteArrayInputStream. 144 boolean availableIsRemaining = inStream instanceof ByteArrayInputStream; 145 while (true) { 146 ManagedBuffer<ByteBuffer> buffer = null; 147 try { 148 buffer = channel.byteBufferPool().acquire(); 149 var backing = buffer.backing; 150 int recvd = inStream.read(backing.array(), 151 backing.position(), backing.remaining()); 152 if (recvd > 0) { 153 boolean eof 154 = availableIsRemaining && inStream.available() == 0; 155 backing.position(backing.position() + recvd); 156 eventPipeline.fire(associate(ioEvent(buffer, eof)), 157 channel); 158 if (eof) { 159 break; 160 } 161 continue; 162 } 163 if (recvd == -1) { 164 eventPipeline.fire(associate(ioEvent(buffer, true)), 165 channel); 166 break; 167 } 168 // Reading 0 bytes shouldn't happen. 169 buffer.unlockBuffer(); 170 } catch (IOException e) { 171 buffer.unlockBuffer(); 172 throw e; 173 } 174 } 175 } 176 177 /** 178 * A seekable channel allows us to avoid generating an event with 179 * no data and eof set, because we can check after reading if there 180 * is remaining data. 181 * 182 * @param input the input 183 * @throws InterruptedException the interrupted exception 184 * @throws IOException Signals that an I/O exception has occurred. 185 */ 186 private void seekableTransfer(SeekableByteChannel input) 187 throws InterruptedException, IOException { 188 while (true) { 189 ManagedBuffer<ByteBuffer> buffer = null; 190 try { 191 buffer = channel.byteBufferPool().acquire(); 192 int recvd = input.read(buffer.backing); 193 if (recvd > 0) { 194 boolean eof = input.position() == input.size(); 195 eventPipeline.fire(associate(ioEvent(buffer, eof)), 196 channel); 197 if (eof) { 198 break; 199 } 200 continue; 201 } 202 if (recvd == -1) { 203 eventPipeline.fire(associate(ioEvent(buffer, true)), 204 channel); 205 break; 206 } 207 // Reading 0 bytes shouldn't happen. 208 buffer.unlockBuffer(); 209 } catch (IOException e) { 210 buffer.unlockBuffer(); 211 throw e; 212 } 213 } 214 } 215 216 private IOEvent<ByteBuffer> ioEvent(ManagedBuffer<ByteBuffer> buffer, 217 boolean eor) { 218 if (sendInputEvents) { 219 return Input.fromSink(buffer, eor); 220 } 221 return Output.fromSink(buffer, eor); 222 } 223 224 private Event<?> associate(Event<?> event) { 225 if (eventAssociations != null) { 226 for (var entry : eventAssociations.entrySet()) { 227 event.setAssociated(entry.getKey(), entry.getValue()); 228 } 229 } 230 return event; 231 } 232 233}