001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.OutputStreamWriter;
023import java.io.Writer;
024import java.nio.ByteBuffer;
025import java.nio.CharBuffer;
026import java.nio.charset.Charset;
027import java.nio.charset.CharsetEncoder;
028import java.nio.charset.StandardCharsets;
029import java.util.Map;
030import org.jgrapes.core.EventPipeline;
031import org.jgrapes.io.IOSubchannel;
032import org.jgrapes.io.events.Output;
033
034/**
035 * An {@link Writer} that encodes the data written to it and stores it
036 * in a {@link ByteBuffer} obtained from a queue. When a byte buffer 
037 * is full, an {@link Output} event (default) is generated and a 
038 * new buffer is fetched from the queue.
039 * 
040 * The function of this class can also be achieved by wrapping a
041 * {@link ByteBufferOutputStream} in a {@link OutputStreamWriter}.
042 * The major advantage of this class is that it drops the
043 * {@link IOException}s (which cannot happen) from the methods.
044 * Besides, it should be more resource efficient.
045 */
046public class ByteBufferWriter extends AbstractBufferWriter<ByteBuffer> {
047
048    private CharBuffer written;
049    private Charset charset = StandardCharsets.UTF_8;
050    private CharsetEncoder encoder;
051
052    /**
053     * Creates a new instance that uses {@link Output} events to dispatch
054     * buffers on the given channel, using the given event pipeline.
055     * 
056     * @param channel
057     *            the channel to fire events on
058     * @param eventPipeline
059     *            the event pipeline used for firing events
060     */
061    public ByteBufferWriter(IOSubchannel channel, EventPipeline eventPipeline) {
062        super(channel, eventPipeline);
063    }
064
065    /**
066     * Creates a new instance that uses {@link Output} events to dispatch
067     * buffers on the given channel, using the channel's response pipeline.
068     * 
069     * @param channel the channel to fire events on
070     */
071    public ByteBufferWriter(IOSubchannel channel) {
072        super(channel);
073    }
074
075    @Override
076    public ByteBufferWriter sendInputEvents() {
077        super.sendInputEvents();
078        return this;
079    }
080
081    @Override
082    public ByteBufferWriter suppressClose() {
083        super.suppressClose();
084        return this;
085    }
086
087    @Override
088    public ByteBufferWriter suppressEndOfRecord() {
089        super.suppressEndOfRecord();
090        return this;
091    }
092
093    @Override
094    public ByteBufferWriter
095            setEventAssociations(Map<Object, Object> associations) {
096        super.setEventAssociations(associations);
097        return this;
098    }
099
100    /**
101     * Sets the charset to be used for converting the written data
102     * to bytes which defaults to UTF-8. Must be set before the first 
103     * invocation of any write method.  
104     *
105     * @param charset the charset
106     * @return the writer
107     */
108    public ByteBufferWriter charset(Charset charset) {
109        if (encoder != null) {
110            throw new IllegalStateException("Charset cannot be changed.");
111        }
112        this.charset = charset;
113        return this;
114    }
115
116    /**
117     * Sets the charset to be used for converting the written data
118     * to bytes to the charset specified as system property 
119     * `native.encoding`. If this property does not specify a valid 
120     * charset, {@link Charset#defaultCharset()} is used.
121     *  
122     * Must be invoked before the first write (or append) operation.  
123     *
124     * @return the writer
125     */
126    @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
127        "PMD.EmptyCatchBlock", "PMD.DataflowAnomalyAnalysis" })
128    public ByteBufferWriter nativeCharset() {
129        Charset toSet = Charset.defaultCharset();
130        var toCheck = System.getProperty("native.encoding");
131        if (toCheck != null) {
132            try {
133                toSet = Charset.forName(toCheck);
134            } catch (Exception e) {
135                // If this fails, simply use default
136            }
137        }
138        charset(toSet);
139        return this;
140    }
141
142    @Override
143    protected void ensureBufferAvailable() throws InterruptedException {
144        if (buffer != null) {
145            return;
146        }
147        buffer = channel.byteBufferPool().acquire();
148    }
149
150    private void ensureWrittenAvailable() throws InterruptedException {
151        if (written == null) {
152            ensureBufferAvailable();
153            written = CharBuffer.allocate(buffer.capacity());
154            encoder = charset.newEncoder();
155        }
156    }
157
158    private void encode() throws InterruptedException {
159        written.flip();
160        while (true) {
161            ensureBufferAvailable();
162            var res = encoder.encode(written, buffer.backingBuffer(), false);
163            if (res.isUnderflow()) {
164                // This should not be possible (incomplete character?).
165                var carryOver = CharBuffer.allocate(written.capacity());
166                carryOver.put(written);
167                written = carryOver;
168                return;
169            }
170            if (!res.isOverflow()) {
171                break;
172            }
173            flush(false);
174        }
175        // written processed
176        written.clear();
177    }
178
179    /*
180     * (non-Javadoc)
181     * 
182     * @see java.io.Writer#write(char[], int, int)
183     */
184    @Override
185    public void write(char[] data, int offset, int length) {
186        while (true) {
187            try {
188                ensureWrittenAvailable();
189                if (written.remaining() >= length) {
190                    written.put(data, offset, length);
191                    encode();
192                    break;
193                }
194                int chunkSize = written.remaining();
195                written.put(data, offset, chunkSize);
196                length -= chunkSize;
197                offset += chunkSize;
198                encode();
199            } catch (InterruptedException e) {
200                Thread.currentThread().interrupt();
201                return;
202            }
203        }
204    }
205
206    @Override
207    public void write(char[] cbuf) {
208        write(cbuf, 0, cbuf.length);
209    }
210
211    @Override
212    public void write(String str, int offset, int length) {
213        while (true) {
214            try {
215                ensureWrittenAvailable();
216                if (written.remaining() >= length) {
217                    str.getChars(offset, offset + length, written.array(),
218                        written.position());
219                    written.position(written.position() + length);
220                    encode();
221                    break;
222                }
223                int chunkSize = buffer.remaining();
224                str.getChars(offset, offset + chunkSize, written.array(),
225                    written.position());
226                written.position(written.position() + chunkSize);
227                length -= chunkSize;
228                offset += chunkSize;
229                encode();
230            } catch (InterruptedException e) {
231                Thread.currentThread().interrupt();
232                return;
233            }
234        }
235    }
236
237    @Override
238    public void write(String str) {
239        write(str, 0, str.length());
240    }
241
242    @Override
243    @SuppressWarnings("PMD.ShortVariable")
244    public void write(int ch) {
245        char[] buff = { (char) ch };
246        write(buff, 0, 1);
247    }
248
249    @Override
250    @SuppressWarnings("PMD.ShortVariable")
251    public ByteBufferWriter append(char ch) {
252        write(ch);
253        return this;
254    }
255
256    @Override
257    public ByteBufferWriter append(CharSequence csq) {
258        write(String.valueOf(csq));
259        return this;
260    }
261
262    @Override
263    public ByteBufferWriter append(CharSequence csq, int start, int end) {
264        if (csq == null) {
265            csq = "null";
266        }
267        return append(csq.subSequence(start, end));
268    }
269
270}