001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2022, 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.io.Reader;
024import java.io.UncheckedIOException;
025import java.nio.Buffer;
026import java.nio.ByteBuffer;
027import java.nio.CharBuffer;
028import java.nio.charset.Charset;
029import java.nio.charset.CharsetDecoder;
030import java.nio.charset.StandardCharsets;
031import java.util.Objects;
032
033/**
034 * A {@link Reader} that provides the data from the {@link ManagedBuffer}s
035 * fed to it to a consumer. This class is intended to be used as a pipe 
036 * between two threads.  
037 */
038public class ManagedBufferReader extends Reader implements InputConsumer {
039
040    private boolean isEndOfFeed;
041    private boolean isOpen = true;
042    private ManagedBuffer<? extends Buffer> current;
043    private CharsetDecoder decoder;
044    private CharBuffer decoded;
045    private Charset charset = StandardCharsets.UTF_8;
046
047    /**
048     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
049     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
050     * Must be set before the first invocation of 
051     * {@link #feed(ManagedBuffer)}.  
052     *
053     * @param charset the charset
054     * @return the managed buffer reader
055     */
056    public ManagedBufferReader charset(Charset charset) {
057        if (decoder != null) {
058            throw new IllegalStateException("Charset cannot be changed.");
059        }
060        this.charset = charset;
061        return this;
062    }
063
064    /**
065     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
066     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
067     * Must be set before the first invocation of 
068     * {@link #feed(ManagedBuffer)}.  
069     *
070     * @param charset the charset
071     * @return the managed buffer reader
072     * @deprecated Use {@link #charset(Charset)} instead
073     */
074    @Deprecated
075    public ManagedBufferReader setCharset(Charset charset) {
076        return charset(charset);
077    }
078
079    /**
080     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
081     * is invoked with `ManagedBuffer<ByteBuffer>` to the charset
082     * specified as system property `native.encoding`. If this
083     * property does not specify a valid charset, 
084     * {@link Charset#defaultCharset()} is used.
085     *  
086     * Must be invoked before the first invocation of 
087     * {@link #feed(ManagedBuffer)}.  
088     *
089     * @return the managed buffer reader
090     */
091    @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
092        "PMD.EmptyCatchBlock", "PMD.DataflowAnomalyAnalysis" })
093    public ManagedBufferReader nativeCharset() {
094        Charset toSet = Charset.defaultCharset();
095        var toCheck = System.getProperty("native.encoding");
096        if (toCheck != null) {
097            try {
098                toSet = Charset.forName(toCheck);
099            } catch (Exception e) {
100                // If this fails, simply use default
101            }
102        }
103        charset(toSet);
104        return this;
105    }
106
107    /**
108     * Feed data to the reader. The call blocks while data from a previous
109     * invocation has not been fully read. The buffer passed as argument
110     * is locked (see {@link ManagedBuffer#lockBuffer()}) until all
111     * data has been read.
112     * 
113     * Calling this method with `null` as argument closes the feed.
114     * After consuming any data still available from a previous
115     * invocation, further calls to {@link #read} therefore return -1.
116     *
117     * @param buffer the buffer
118     * @throws IOException Signals that an I/O exception has occurred.
119     */
120    @SuppressWarnings({ "PMD.PreserveStackTrace" })
121    public <W extends Buffer> void feed(ManagedBuffer<W> buffer) {
122        synchronized (lock) {
123            if (buffer == null) {
124                isEndOfFeed = true;
125                notifyAll();
126                return;
127            }
128            if (!isOpen || isEndOfFeed) {
129                return;
130            }
131            while (current != null) {
132                try {
133                    lock.wait();
134                } catch (InterruptedException e) {
135                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
136                    var exc = new InterruptedIOException(e.getMessage());
137                    exc.setStackTrace(e.getStackTrace());
138                    throw new UncheckedIOException(exc);
139                }
140            }
141            current = buffer;
142            buffer.lockBuffer();
143            lock.notifyAll();
144        }
145    }
146
147    /**
148     * {@inheritDoc}
149     * 
150     * Note that this is the {@link Reader}'s `close` method. In order
151     * to close the feed, call {@link #feed(ManagedBuffer)} with
152     * `null` as argument.
153     */
154    @Override
155    public void close() throws IOException {
156        synchronized (lock) {
157            isOpen = false;
158            if (current != null) {
159                current.unlockBuffer();
160                current = null;
161            }
162            lock.notifyAll();
163        }
164    }
165
166    @Override
167    @SuppressWarnings({ "PMD.PreserveStackTrace", "unchecked",
168        "PMD.CognitiveComplexity", "PMD.DataflowAnomalyAnalysis",
169        "PMD.NcssCount" })
170    public int read(char[] cbuf, int off, int len) throws IOException {
171        Objects.checkFromIndexSize(off, len, cbuf.length);
172        synchronized (lock) {
173            while (isOpen && current == null && !isEndOfFeed) {
174                try {
175                    lock.wait();
176                } catch (InterruptedException e) {
177                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
178                    var exc = new InterruptedIOException(e.getMessage());
179                    exc.setStackTrace(e.getStackTrace());
180                    throw exc;
181                }
182            }
183            if (!isOpen || isEndOfFeed && current == null) {
184                return -1;
185            }
186            CharBuffer input;
187            if (current.backingBuffer() instanceof CharBuffer) {
188                input = ((ManagedBuffer<CharBuffer>) current).backingBuffer();
189            } else {
190                if (decoder == null) {
191                    decoder = charset.newDecoder();
192                    decoded = CharBuffer.allocate(current.capacity());
193                }
194                var result = decoder.decode(
195                    ((ManagedBuffer<ByteBuffer>) current).backingBuffer(),
196                    decoded, isEndOfFeed);
197                assert !result.isOverflow();
198                decoded.flip();
199                input = decoded;
200            }
201            int transferred;
202            if (input.remaining() <= len) {
203                // Get all remaining.
204                transferred = input.remaining();
205                input.get(cbuf, off, transferred);
206                if (decoded != null) {
207                    decoded.clear();
208                }
209                current.unlockBuffer();
210                current = null;
211                lock.notifyAll();
212            } else {
213                // Get requested.
214                transferred = len;
215                input.get(cbuf, off, transferred);
216            }
217            return transferred;
218        }
219    }
220
221}