001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.nio.Buffer;
022import java.nio.ByteBuffer;
023import java.nio.CharBuffer;
024import java.nio.charset.Charset;
025import java.nio.charset.CharsetDecoder;
026import java.nio.charset.StandardCharsets;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.function.Consumer;
030import org.jgrapes.io.events.Input;
031
032/**
033 * Collects character data from buffers and makes it available as
034 * complete lines.
035 * 
036 * Lines end with a LF which may optionally be followed by a CR.
037 * Neither character is part of the result returned by {@link #getLine()}.
038 * If no more input is expected and characters without trailing LF
039 * remain, these remaining character are returned as a line as well.   
040 */
041@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
042public class LineCollector {
043    private boolean isEof;
044    private CharsetDecoder decoder;
045    private Charset charset = StandardCharsets.UTF_8;
046    private CharBuffer pending;
047    private CharBuffer rest;
048    private boolean endedWithLF;
049    private final Queue<String> lines = new ConcurrentLinkedQueue<>();
050    private Consumer<String> consumer = lines::add;
051
052    /**
053     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
054     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
055     * Must be set before the first invocation of 
056     * {@link #feed(ManagedBuffer)}.  
057     *
058     * @param charset the charset
059     * @return the managed buffer reader
060     */
061    public LineCollector charset(Charset charset) {
062        if (decoder != null) {
063            throw new IllegalStateException("Charset cannot be changed.");
064        }
065        this.charset = charset;
066        return this;
067    }
068
069    /**
070     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
071     * is invoked with `ManagedBuffer<ByteBuffer>` to the charset
072     * specified as system property `native.encoding`. If this
073     * property does not specify a valid charset, 
074     * {@link Charset#defaultCharset()} is used.
075     *  
076     * Must be invoked before the first invocation of 
077     * {@link #feed(ManagedBuffer)}.  
078     *
079     * @return the managed buffer reader
080     */
081    @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
082        "PMD.EmptyCatchBlock" })
083    public LineCollector nativeCharset() {
084        Charset toSet = Charset.defaultCharset();
085        var toCheck = System.getProperty("native.encoding");
086        if (toCheck != null) {
087            try {
088                toSet = Charset.forName(toCheck);
089            } catch (Exception e) {
090                // If this fails, simply use default
091            }
092        }
093        charset(toSet);
094        return this;
095    }
096
097    /**
098     * Configures a consumer for lines. The consumer is invoked when
099     * a complete line has been detected. If a consumer is configured,
100     * {@link #getLine()} may not be used (always returns `null`).
101     *
102     * @param consumer the consumer
103     * @return the line collector
104     */
105    public LineCollector consumer(Consumer<String> consumer) {
106        this.consumer = consumer;
107        return this;
108    }
109
110    /**
111     * Feed data to the collector. 
112     * 
113     * Calling this method with `null` as argument closes the feed.
114     *
115     * @param buffer the buffer
116     */
117    public <W extends Buffer> void feed(W buffer) {
118        if (isEof) {
119            return;
120        }
121        if (buffer == null) {
122            isEof = true;
123        } else {
124            copyToPending(buffer);
125        }
126        extractLines();
127
128    }
129
130    /**
131     * Feed data to the collector. 
132     * 
133     * Calling this method with `null` as argument closes the feed.
134     *
135     * @param buffer the buffer
136     */
137    public <W extends Buffer> void feed(ManagedBuffer<W> buffer) {
138        if (buffer == null) {
139            feed((W) null);
140        } else {
141            feed(buffer.backingBuffer());
142        }
143    }
144
145    /**
146     * Feed data to the collector. 
147     * 
148     * Calling this method with `null` as argument closes the feed.
149     *
150     * @param event the event
151     */
152    public <W extends Buffer> void feed(Input<W> event) {
153        if (event == null) {
154            feed((W) null);
155        } else {
156            feed(event.buffer());
157        }
158    }
159
160    private <W extends Buffer> void copyToPending(W buffer) {
161        try {
162            buffer.mark();
163            if (pending == null) {
164                pending = CharBuffer.allocate(buffer.capacity());
165            }
166            if (buffer instanceof CharBuffer charBuf) {
167                if (pending.remaining() < charBuf.remaining()) {
168                    resizePending(charBuf);
169                }
170                pending.put(charBuf);
171                return;
172            }
173            if (decoder == null) {
174                decoder = charset.newDecoder();
175            }
176            while (true) {
177                var result
178                    = decoder.decode((ByteBuffer) buffer, pending, isEof);
179                if (!result.isOverflow()) {
180                    break;
181                }
182                // Need larger buffer
183                resizePending(buffer);
184            }
185        } finally {
186            buffer.reset();
187        }
188    }
189
190    private void resizePending(Buffer toAppend) {
191        var old = pending;
192        pending = CharBuffer.allocate(old.capacity() + toAppend.capacity());
193        old.flip();
194        pending.put(old);
195    }
196
197    @SuppressWarnings({ "PMD.CognitiveComplexity", "PMD.NcssCount",
198        "PMD.NPathComplexity", "PMD.AvoidLiteralsInIfCondition",
199        "PMD.AvoidReassigningLoopVariables",
200        "PMD.AvoidBranchingStatementAsLastInLoop", "PMD.CyclomaticComplexity",
201        "PMD.AvoidInstantiatingObjectsInLoops" })
202    private void extractLines() {
203        pending.flip();
204        if (!pending.hasRemaining()) {
205            pending.clear();
206            return;
207        }
208        if (endedWithLF && pending.get(pending.position()) == '\r') {
209            pending.get();
210        }
211        int end = pending.limit();
212        endedWithLF = false;
213        while (pending.hasRemaining()) {
214            int start = pending.position();
215            for (int pos = start; pos < end;) {
216                if (pending.get(pos) != '\n') {
217                    pos += 1;
218                    continue;
219                }
220                consumer
221                    .accept(new String(pending.array(), start, pos - start));
222                pos += 1;
223                endedWithLF = pos >= end;
224                if (pos < end && pending.get(pos) == '\r') {
225                    pos += 1;
226                }
227                pending.position(pos);
228                break;
229            }
230            if (pending.position() == start) {
231                // No LF found
232                break;
233            }
234        }
235        if (!pending.hasRemaining()) {
236            // Last input was or ended with complete line
237            pending.clear();
238            return;
239        }
240        // Incomplete line
241        if (isEof) {
242            consumer.accept(new String(pending.array(), pending.position(),
243                pending.remaining()));
244            return;
245        }
246        if (pending.position() == 0) {
247            // Nothing consumed, continue to write into pending
248            var limit = pending.limit();
249            pending.clear();
250            pending.position(limit);
251            return;
252        }
253        // Transfer remaining to beginning of pending
254        if (rest == null || rest.capacity() < pending.remaining()) {
255            rest = CharBuffer.allocate(pending.capacity());
256        }
257        rest.put(pending);
258        rest.flip();
259        pending.clear();
260        pending.put(rest);
261        rest.clear();
262    }
263
264    /**
265     * Checks if more input may become available.
266     *
267     * @return true, if successful
268     */
269    public boolean eof() {
270        return isEof;
271    }
272
273    /**
274     * Gets the next line.
275     *
276     * @return the line
277     */
278    public String getLine() {
279        return lines.poll();
280    }
281}