001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.OutputStream;
022import java.nio.ByteBuffer;
023import org.jgrapes.core.EventPipeline;
024import org.jgrapes.io.IOSubchannel;
025import org.jgrapes.io.events.Close;
026import org.jgrapes.io.events.Input;
027import org.jgrapes.io.events.Output;
028
029/**
030 * An {@link OutputStream} that is backed by {@link ByteBuffer}s obtained from a
031 * queue. When a byte buffer is full, a {@link Output} event (default) is
032 * generated and a new buffer is fetched from the queue.
033 */
034public class ByteBufferOutputStream extends OutputStream {
035
036    private IOSubchannel channel;
037    private EventPipeline eventPipeline;
038    private boolean sendInputEvents;
039    private ManagedBuffer<ByteBuffer> buffer;
040    private boolean sendClose = true;
041    private boolean sendEor = true;
042    private boolean eorSent;
043    private boolean isClosed;
044
045    /**
046     * Creates a new instance that uses {@link Output} events to dispatch
047     * buffers on the given channel, using the given event pipeline.
048     * 
049     * @param channel
050     *            the channel to fire events on
051     * @param eventPipeline
052     *            the event pipeline used for firing events
053     */
054    public ByteBufferOutputStream(IOSubchannel channel,
055            EventPipeline eventPipeline) {
056        this.channel = channel;
057        this.eventPipeline = eventPipeline;
058    }
059
060    /**
061     * Creates a new instance that uses {@link Output} events to dispatch
062     * buffers on the given channel, using the channel's response pipeline.
063     * 
064     * @param channel
065     *            the channel to fire events on
066     */
067    public ByteBufferOutputStream(IOSubchannel channel) {
068        this(channel, channel.responsePipeline());
069    }
070
071    /**
072     * Causes the data to be fired as {@link Input} events rather
073     * than the usual {@link Output} events. 
074     * 
075     * @return the stream for easy chaining
076     */
077    public ByteBufferOutputStream sendInputEvents() {
078        sendInputEvents = true;
079        return this;
080    }
081
082    /**
083     * Suppresses sending of a close event when the stream is closed. 
084     * 
085     * @return the stream for easy chaining
086     */
087    public ByteBufferOutputStream suppressClose() {
088        sendClose = false;
089        return this;
090    }
091
092    /**
093     * Suppresses setting the end of record flag when the stream is 
094     * flushed or closed.
095     * 
096     * @return the stream for easy chaining
097     * @see Output#isEndOfRecord()
098     */
099    public ByteBufferOutputStream suppressEndOfRecord() {
100        sendEor = false;
101        return this;
102    }
103
104    private void ensureBufferAvailable() throws InterruptedException {
105        if (buffer != null) {
106            return;
107        }
108        buffer = channel.byteBufferPool().acquire();
109    }
110
111    /*
112     * (non-Javadoc)
113     * 
114     * @see java.io.OutputStream#write(int)
115     */
116    @Override
117    public void write(int data) {
118        try {
119            ensureBufferAvailable();
120        } catch (InterruptedException e) {
121            Thread.currentThread().interrupt();
122            return;
123        }
124        buffer.backingBuffer().put((byte) data);
125        if (!buffer.hasRemaining()) {
126            flush(false);
127        }
128    }
129
130    /*
131     * (non-Javadoc)
132     * 
133     * @see java.io.OutputStream#write(byte[], int, int)
134     */
135    @Override
136    public void write(byte[] data, int offset, int length) {
137        while (true) {
138            try {
139                ensureBufferAvailable();
140            } catch (InterruptedException e) {
141                Thread.currentThread().interrupt();
142                return;
143            }
144            if (buffer.remaining() > length) {
145                buffer.backingBuffer().put(data, offset, length);
146                break;
147            } else if (buffer.remaining() == length) {
148                buffer.backingBuffer().put(data, offset, length);
149                flush(false);
150                break;
151            } else {
152                int chunkSize = buffer.remaining();
153                buffer.backingBuffer().put(data, offset, chunkSize);
154                flush(false);
155                length -= chunkSize;
156                offset += chunkSize;
157            }
158        }
159    }
160
161    @Override
162    public void write(byte[] data) {
163        write(data, 0, data.length);
164    }
165
166    /**
167     * Creates and fires an {@link Output} event with the buffer being filled. 
168     * The end of record flag of the event is set according to the parameter.
169     * Frees any allocated buffer.
170     */
171    private void flush(boolean endOfRecord) {
172        if (buffer == null) {
173            if (!endOfRecord || eorSent) {
174                return;
175            }
176            try {
177                ensureBufferAvailable();
178            } catch (InterruptedException e) {
179                Thread.currentThread().interrupt();
180                return;
181            }
182        }
183        if (buffer.position() == 0 && (!endOfRecord || eorSent)) {
184            // Nothing to flush
185            buffer.unlockBuffer();
186        } else {
187            if (sendInputEvents) {
188                eventPipeline.fire(Input.fromSink(buffer, endOfRecord),
189                    channel);
190            } else {
191                eventPipeline.fire(Output.fromSink(buffer, endOfRecord),
192                    channel);
193            }
194            eorSent = endOfRecord;
195        }
196        buffer = null;
197    }
198
199    /**
200     * Creates and fires a {@link Output} event with the buffer being filled
201     * if it contains any data.
202     * 
203     * By default, the {@link Output} event is created with the end of record
204     * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 
205     * flush as event. This implies that an {@link Output} event with no data
206     * (but the end of record flag set) may be fired. This behavior can
207     * be disabled with {@link #suppressEndOfRecord()}.
208     */
209    @Override
210    public void flush() {
211        flush(sendEor);
212    }
213
214    /**
215     * Flushes any remaining data with the end of record flag set
216     * (unless {@link #suppressEndOfRecord()} has been called)
217     * and fires a {@link Close} event (unless {@link #suppressClose()}
218     * has been called).
219     */
220    @Override
221    public void close() {
222        if (isClosed) {
223            return;
224        }
225        flush(sendEor);
226        if (sendClose) {
227            eventPipeline.fire(new Close(), channel);
228        }
229        isClosed = true;
230    }
231}