001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024
025import org.jgrapes.core.EventPipeline;
026import org.jgrapes.io.IOSubchannel;
027import org.jgrapes.io.events.Close;
028import org.jgrapes.io.events.Input;
029import org.jgrapes.io.events.Output;
030
031/**
032 * An {@link OutputStream} that is backed by {@link ByteBuffer}s obtained from a
033 * queue. When a byte buffer is full, a {@link Output} event (default) is
034 * generated and a new buffer is fetched from the queue.
035 *
036 */
037public class ByteBufferOutputStream extends OutputStream {
038
039    private IOSubchannel channel;
040    private EventPipeline eventPipeline;
041    private boolean sendInputEvents;
042    private ManagedBuffer<ByteBuffer> buffer;
043    private boolean sendClose = true;
044    private boolean sendEor = true;
045    private boolean eorSent;
046    private boolean isClosed;
047
048    /**
049     * Creates a new instance that uses {@link Output} events to dispatch
050     * buffers on the given channel, using the given event pipeline.
051     * 
052     * @param channel
053     *            the channel to fire events on
054     * @param eventPipeline
055     *            the event pipeline used for firing events
056     */
057    public ByteBufferOutputStream(IOSubchannel channel,
058            EventPipeline eventPipeline) {
059        this.channel = channel;
060        this.eventPipeline = eventPipeline;
061    }
062
063    /**
064     * Creates a new instance that uses {@link Output} events to dispatch
065     * buffers on the given channel, using the channel's response pipeline.
066     * 
067     * @param channel
068     *            the channel to fire events on
069     */
070    public ByteBufferOutputStream(IOSubchannel channel) {
071        this(channel, channel.responsePipeline());
072    }
073
074    /**
075     * Causes the data to be fired as {@link Input} events rather
076     * than the usual {@link Output} events. 
077     * 
078     * @return the stream for easy chaining
079     */
080    public ByteBufferOutputStream sendInputEvents() {
081        sendInputEvents = true;
082        return this;
083    }
084
085    /**
086     * Suppresses sending of a close event when the stream is closed. 
087     * 
088     * @return the stream for easy chaining
089     */
090    public ByteBufferOutputStream suppressClose() {
091        sendClose = false;
092        return this;
093    }
094
095    /**
096     * Suppresses setting the end of record flag when the stream is 
097     * flushed or closed.
098     * 
099     * @return the stream for easy chaining
100     * @see Output#isEndOfRecord()
101     */
102    public ByteBufferOutputStream suppressEndOfRecord() {
103        sendEor = false;
104        return this;
105    }
106
107    private void ensureBufferAvailable() throws IOException {
108        if (buffer != null) {
109            return;
110        }
111        try {
112            buffer = channel.byteBufferPool().acquire();
113        } catch (InterruptedException e) {
114            throw new IOException(e);
115        }
116    }
117
118    /*
119     * (non-Javadoc)
120     * 
121     * @see java.io.OutputStream#write(int)
122     */
123    @Override
124    public void write(int data) throws IOException {
125        ensureBufferAvailable();
126        buffer.backingBuffer().put((byte) data);
127        if (!buffer.hasRemaining()) {
128            flush(false);
129        }
130    }
131
132    /*
133     * (non-Javadoc)
134     * 
135     * @see java.io.OutputStream#write(byte[], int, int)
136     */
137    @Override
138    public void write(byte[] data, int offset, int length) throws IOException {
139        while (true) {
140            ensureBufferAvailable();
141            if (buffer.remaining() > length) {
142                buffer.backingBuffer().put(data, offset, length);
143                break;
144            } else if (buffer.remaining() == length) {
145                buffer.backingBuffer().put(data, offset, length);
146                flush(false);
147                break;
148            } else {
149                int chunkSize = buffer.remaining();
150                buffer.backingBuffer().put(data, offset, chunkSize);
151                flush(false);
152                length -= chunkSize;
153                offset += chunkSize;
154            }
155        }
156    }
157
158    /**
159     * Creates and fires an {@link Output} event with the buffer being filled. 
160     * The end of record flag of the event is set according to the parameter.
161     * Frees any allocated buffer.
162     */
163    private void flush(boolean endOfRecord) throws IOException {
164        if (buffer == null) {
165            if (!endOfRecord || eorSent) {
166                return;
167            }
168            ensureBufferAvailable();
169        }
170        if (buffer.position() == 0 && (!endOfRecord || eorSent)) {
171            // Nothing to flush
172            buffer.unlockBuffer();
173        } else {
174            if (sendInputEvents) {
175                eventPipeline.fire(Input.fromSink(buffer, endOfRecord),
176                    channel);
177            } else {
178                eventPipeline.fire(Output.fromSink(buffer, endOfRecord),
179                    channel);
180            }
181            eorSent = endOfRecord;
182        }
183        buffer = null;
184    }
185
186    /**
187     * Creates and fires a {@link Output} event with the buffer being filled
188     * if it contains any data.
189     * 
190     * By default, the {@link Output} event is created with the end of record
191     * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 
192     * flush as event. This implies that an {@link Output} event with no data
193     * (but the end of record flag set) may be fired. This behavior can
194     * be disabled with {@link #suppressEndOfRecord()}.
195     */
196    @Override
197    public void flush() throws IOException {
198        flush(sendEor);
199    }
200
201    /**
202     * Flushes any remaining data with the end of record flag set
203     * (unless {@link #suppressEndOfRecord()} has been called)
204     * and fires a {@link Close} event (unless {@link #suppressClose()}
205     * has been called).
206     */
207    @Override
208    public void close() throws IOException {
209        if (isClosed) {
210            return;
211        }
212        flush(sendEor);
213        if (sendClose) {
214            eventPipeline.fire(new Close(), channel);
215        }
216        isClosed = true;
217    }
218}