001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.nio.CharBuffer;
024
025import org.jgrapes.core.EventPipeline;
026import org.jgrapes.io.IOSubchannel;
027import org.jgrapes.io.events.Close;
028import org.jgrapes.io.events.Input;
029import org.jgrapes.io.events.Output;
030
031/**
032 * An {@link Writer} that is backed by {@link CharBuffer}s obtained from a
033 * queue. When a byte buffer is full, an {@link Output} event (default) is
034 * generated and a new buffer is fetched from the queue.
035 */
036public class CharBufferWriter extends Writer {
037
038    private IOSubchannel channel;
039    private EventPipeline eventPipeline;
040    private boolean sendInputEvents;
041    private ManagedBuffer<CharBuffer> buffer;
042    private boolean sendClose = true;
043    private boolean sendEor = true;
044    private boolean eorSent;
045    private boolean isClosed;
046
047    /**
048     * Creates a new instance that uses {@link Output} events to dispatch
049     * buffers on the given channel, using the given event pipeline.
050     * 
051     * @param channel
052     *            the channel to fire events on
053     * @param eventPipeline
054     *            the event pipeline used for firing events
055     */
056    public CharBufferWriter(IOSubchannel channel, EventPipeline eventPipeline) {
057        this.channel = channel;
058        this.eventPipeline = eventPipeline;
059    }
060
061    /**
062     * Creates a new instance that uses {@link Output} events to dispatch
063     * buffers on the given channel, using the channel's response pipeline.
064     * 
065     * @param channel
066     *            the channel to fire events on
067     */
068    public CharBufferWriter(IOSubchannel channel) {
069        this(channel, channel.responsePipeline());
070    }
071
072    /**
073     * Causes the data to be fired as {@link Input} events rather
074     * than the usual {@link Output} events. 
075     * 
076     * @return the stream for easy chaining
077     */
078    public CharBufferWriter sendInputEvents() {
079        sendInputEvents = true;
080        return this;
081    }
082
083    /**
084     * Suppresses sending of a close event when the stream is closed. 
085     * 
086     * @return the stream for easy chaining
087     */
088    public CharBufferWriter suppressClose() {
089        sendClose = false;
090        return this;
091    }
092
093    /**
094     * Suppresses setting the end of record flag when the stream is 
095     * flushed or closed.
096     * 
097     * @return the stream for easy chaining
098     * @see Output#isEndOfRecord()
099     */
100    public CharBufferWriter suppressEndOfRecord() {
101        sendEor = false;
102        return this;
103    }
104
105    private void ensureBufferAvailable() throws IOException {
106        if (buffer != null) {
107            return;
108        }
109        try {
110            buffer = channel.charBufferPool().acquire();
111        } catch (InterruptedException e) {
112            throw new IOException(e);
113        }
114    }
115
116    /*
117     * (non-Javadoc)
118     * 
119     * @see java.io.Writer#write(char[], int, int)
120     */
121    @Override
122    public void write(char[] data, int offset, int length) throws IOException {
123        while (true) {
124            ensureBufferAvailable();
125            if (buffer.remaining() > length) {
126                buffer.backingBuffer().put(data, offset, length);
127                break;
128            } else if (buffer.remaining() == length) {
129                buffer.backingBuffer().put(data, offset, length);
130                flush(false);
131                break;
132            } else {
133                int chunkSize = buffer.remaining();
134                buffer.backingBuffer().put(data, offset, chunkSize);
135                flush(false);
136                length -= chunkSize;
137                offset += chunkSize;
138            }
139        }
140    }
141
142    /**
143     * Creates and fires an {@link Output} event with the buffer being filled. 
144     * The end of record flag of the event is set according to the parameter.
145     * Frees any allocated buffer.
146     */
147    private void flush(boolean endOfRecord) throws IOException {
148        if (buffer == null) {
149            if (!endOfRecord || eorSent) {
150                return;
151            }
152            ensureBufferAvailable();
153        }
154        if (buffer.position() == 0 && (!endOfRecord || eorSent)) {
155            // Nothing to flush
156            buffer.unlockBuffer();
157        } else {
158            if (sendInputEvents) {
159                eventPipeline.fire(Input.fromSink(buffer, endOfRecord),
160                    channel);
161            } else {
162                eventPipeline.fire(Output.fromSink(buffer, endOfRecord),
163                    channel);
164            }
165            eorSent = endOfRecord;
166        }
167        buffer = null;
168    }
169
170    /**
171     * Creates and fires a {@link Output} event with the buffer being filled
172     * if it contains any data.
173     * 
174     * By default, the {@link Output} event is created with the end of record
175     * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 
176     * flush as event. This implies that an {@link Output} event with no data
177     * (but the end of record flag set) may be fired. This behavior can
178     * be disabled with {@link #suppressEndOfRecord()}.
179     */
180    @Override
181    public void flush() throws IOException {
182        flush(sendEor);
183    }
184
185    /**
186     * Flushes any remaining data with the end of record flag set
187     * (unless {@link #suppressEndOfRecord()} has been called)
188     * and fires a {@link Close} event (unless {@link #suppressClose()}
189     * has been called).
190     */
191    @Override
192    public void close() throws IOException {
193        if (isClosed) {
194            return;
195        }
196        flush(sendEor);
197        if (sendClose) {
198            eventPipeline.fire(new Close(), channel);
199        }
200        isClosed = true;
201    }
202}