001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2022, 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.io.Reader;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.CharBuffer;
027import java.nio.charset.Charset;
028import java.nio.charset.CharsetDecoder;
029import java.nio.charset.StandardCharsets;
030import java.util.Objects;
031
032/**
033 * A {@link Reader} that provides the data from the {@link ManagedBuffer}s
034 * fed to it to a consumer. This class is intended to be used as a pipe 
035 * between two threads.  
036 */
037public class ManagedBufferReader extends Reader {
038
039    private boolean isEndOfFeed;
040    private boolean isOpen = true;
041    private ManagedBuffer<? extends Buffer> current;
042    private CharsetDecoder decoder;
043    private CharBuffer decoded;
044    private Charset charset = StandardCharsets.UTF_8;
045
046    /**
047     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
048     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
049     * Must be set before the first invocation of 
050     * {@link #feed(ManagedBuffer)}.  
051     *
052     * @param charset the charset
053     * @return the managed buffer reader
054     */
055    public ManagedBufferReader charset(Charset charset) {
056        if (decoder != null) {
057            throw new IllegalStateException("Charset cannot be changed.");
058        }
059        this.charset = charset;
060        return this;
061    }
062
063    /**
064     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
065     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
066     * Must be set before the first invocation of 
067     * {@link #feed(ManagedBuffer)}.  
068     *
069     * @param charset the charset
070     * @return the managed buffer reader
071     * @deprecated Use {@link #charset(Charset)} instead
072     */
073    @Deprecated
074    public ManagedBufferReader setCharset(Charset charset) {
075        return charset(charset);
076    }
077
078    /**
079     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
080     * is invoked with `ManagedBuffer<ByteBuffer>` to the charset
081     * specified as system property `native.encoding`. If this
082     * property does not specify a valid charset, 
083     * {@link Charset#defaultCharset()} is used.
084     *  
085     * Must be invoked before the first invocation of 
086     * {@link #feed(ManagedBuffer)}.  
087     *
088     * @return the managed buffer reader
089     */
090    @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
091        "PMD.EmptyCatchBlock", "PMD.DataflowAnomalyAnalysis" })
092    public ManagedBufferReader nativeCharset() {
093        Charset toSet = Charset.defaultCharset();
094        var toCheck = System.getProperty("native.encoding");
095        if (toCheck != null) {
096            try {
097                toSet = Charset.forName(toCheck);
098            } catch (Exception e) {
099                // If this fails, simply use default
100            }
101        }
102        charset(toSet);
103        return this;
104    }
105
106    /**
107     * Feed data to the reader. The call blocks while data from a previous
108     * invocation has not been fully read. The buffer passed as argument
109     * is locked (see {@link ManagedBuffer#lockBuffer()}) until all
110     * data has been read.
111     * 
112     * Calling this method with `null` as argument closes the feed.
113     * After consuming any data still available from a previous
114     * invocation, further calls to {@link #read} therefore return -1.
115     *
116     * @param buffer the buffer
117     * @throws IOException Signals that an I/O exception has occurred.
118     */
119    @SuppressWarnings({ "PMD.PreserveStackTrace" })
120    public <W extends Buffer> void feed(ManagedBuffer<W> buffer)
121            throws IOException {
122        synchronized (lock) {
123            if (buffer == null) {
124                isEndOfFeed = true;
125                notifyAll();
126                return;
127            }
128            if (!isOpen || isEndOfFeed) {
129                return;
130            }
131            while (current != null) {
132                try {
133                    lock.wait();
134                } catch (InterruptedException e) {
135                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
136                    var exc = new InterruptedIOException(e.getMessage());
137                    exc.setStackTrace(e.getStackTrace());
138                    throw exc;
139                }
140            }
141            current = buffer;
142            buffer.lockBuffer();
143            lock.notifyAll();
144        }
145    }
146
147    /**
148     * {@inheritDoc}
149     * 
150     * Note that this is the {@link Reader}'s `close` method. In order
151     * to close the feed, call {@link #feed(ManagedBuffer)} with
152     * `null` as argument.
153     */
154    @Override
155    public void close() throws IOException {
156        synchronized (lock) {
157            isOpen = false;
158            if (current != null) {
159                current.unlockBuffer();
160                current = null;
161            }
162            lock.notifyAll();
163        }
164    }
165
166    @Override
167    @SuppressWarnings({ "PMD.PreserveStackTrace", "unchecked",
168        "PMD.CognitiveComplexity", "PMD.DataflowAnomalyAnalysis",
169        "PMD.NcssCount" })
170    public int read(char[] cbuf, int off, int len) throws IOException {
171        Objects.checkFromIndexSize(off, len, cbuf.length);
172        synchronized (lock) {
173            while (isOpen && current == null && !isEndOfFeed) {
174                try {
175                    lock.wait();
176                } catch (InterruptedException e) {
177                    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
178                    var exc = new InterruptedIOException(e.getMessage());
179                    exc.setStackTrace(e.getStackTrace());
180                    throw exc;
181                }
182            }
183            if (!isOpen || isEndOfFeed && current == null) {
184                return -1;
185            }
186            CharBuffer input;
187            if (current.backingBuffer() instanceof CharBuffer) {
188                input = ((ManagedBuffer<CharBuffer>) current).backingBuffer();
189            } else {
190                if (decoder == null) {
191                    decoder = charset.newDecoder();
192                    decoded = CharBuffer.allocate(current.capacity());
193                }
194                var result = decoder.decode(
195                    ((ManagedBuffer<ByteBuffer>) current).backingBuffer(),
196                    decoded, isEndOfFeed);
197                assert !result.isOverflow();
198                decoded.flip();
199                input = decoded;
200            }
201            int transferred;
202            if (input.remaining() <= len) {
203                // Get all remaining.
204                transferred = input.remaining();
205                input.get(cbuf, off, transferred);
206                if (decoded != null) {
207                    decoded.clear();
208                }
209                current.unlockBuffer();
210                current = null;
211                lock.notifyAll();
212            } else {
213                // Get requested.
214                transferred = len;
215                input.get(cbuf, off, transferred);
216            }
217            return transferred;
218        }
219    }
220
221}