001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.nio.Buffer;
022import java.nio.ByteBuffer;
023import java.nio.CharBuffer;
024import java.nio.charset.Charset;
025import java.nio.charset.CharsetDecoder;
026import java.nio.charset.StandardCharsets;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.function.Consumer;
030
031/**
032 * Collects character data from buffers and makes it available as
033 * complete lines.
034 * 
035 * Lines end with a LF which may optionally be followed by a CR.
036 * Neither character is part of the result returned by {@link #getLine()}.
037 * If no more input is expected and characters without trailing LF
038 * remain, these remaining character are returned as a line as well.   
039 */
040@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
041public class LineCollector implements InputConsumer {
042    private boolean isEof;
043    private CharsetDecoder decoder;
044    private Charset charset = StandardCharsets.UTF_8;
045    private CharBuffer pending;
046    private CharBuffer rest;
047    private boolean endedWithLF;
048    private final Queue<String> lines = new ConcurrentLinkedQueue<>();
049    private Consumer<String> consumer = lines::add;
050
051    /**
052     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
053     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
054     * Must be set before the first invocation of 
055     * {@link #feed(ManagedBuffer)}.  
056     *
057     * @param charset the charset
058     * @return the managed buffer reader
059     */
060    public LineCollector charset(Charset charset) {
061        if (decoder != null) {
062            throw new IllegalStateException("Charset cannot be changed.");
063        }
064        this.charset = charset;
065        return this;
066    }
067
068    /**
069     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
070     * is invoked with `ManagedBuffer<ByteBuffer>` to the charset
071     * specified as system property `native.encoding`. If this
072     * property does not specify a valid charset, 
073     * {@link Charset#defaultCharset()} is used.
074     *  
075     * Must be invoked before the first invocation of 
076     * {@link #feed(ManagedBuffer)}.  
077     *
078     * @return the managed buffer reader
079     */
080    @SuppressWarnings({ "PMD.AvoidCatchingGenericException",
081        "PMD.EmptyCatchBlock" })
082    public LineCollector nativeCharset() {
083        Charset toSet = Charset.defaultCharset();
084        var toCheck = System.getProperty("native.encoding");
085        if (toCheck != null) {
086            try {
087                toSet = Charset.forName(toCheck);
088            } catch (Exception e) {
089                // If this fails, simply use default
090            }
091        }
092        charset(toSet);
093        return this;
094    }
095
096    /**
097     * Configures a consumer for lines. The consumer is invoked when
098     * a complete line has been detected. If a consumer is configured,
099     * {@link #getLine()} may not be used (always returns `null`).
100     *
101     * @param consumer the consumer
102     * @return the line collector
103     */
104    public LineCollector consumer(Consumer<String> consumer) {
105        this.consumer = consumer;
106        return this;
107    }
108
109    /**
110     * Feed data to the collector. 
111     * 
112     * Calling this method with `null` as argument closes the feed.
113     *
114     * @param buffer the buffer
115     */
116    public <W extends Buffer> void feed(W buffer) {
117        if (isEof) {
118            return;
119        }
120        if (buffer == null) {
121            isEof = true;
122        } else {
123            copyToPending(buffer);
124        }
125        extractLines();
126
127    }
128
129    /**
130     * Feed data to the collector. 
131     * 
132     * Calling this method with `null` as argument closes the feed.
133     *
134     * @param buffer the buffer
135     */
136    public <W extends Buffer> void feed(ManagedBuffer<W> buffer) {
137        if (buffer == null) {
138            feed((W) null);
139        } else {
140            feed(buffer.backingBuffer());
141        }
142    }
143
144    private <W extends Buffer> void copyToPending(W buffer) {
145        try {
146            buffer.mark();
147            if (pending == null) {
148                pending = CharBuffer.allocate(buffer.capacity());
149            }
150            if (buffer instanceof CharBuffer charBuf) {
151                if (pending.remaining() < charBuf.remaining()) {
152                    resizePending(charBuf);
153                }
154                pending.put(charBuf);
155                return;
156            }
157            if (decoder == null) {
158                decoder = charset.newDecoder();
159            }
160            while (true) {
161                var result
162                    = decoder.decode((ByteBuffer) buffer, pending, isEof);
163                if (!result.isOverflow()) {
164                    break;
165                }
166                // Need larger buffer
167                resizePending(buffer);
168            }
169        } finally {
170            buffer.reset();
171        }
172    }
173
174    private void resizePending(Buffer toAppend) {
175        var old = pending;
176        pending = CharBuffer.allocate(old.capacity() + toAppend.capacity());
177        old.flip();
178        pending.put(old);
179    }
180
181    @SuppressWarnings({ "PMD.CognitiveComplexity", "PMD.NcssCount",
182        "PMD.NPathComplexity", "PMD.AvoidLiteralsInIfCondition",
183        "PMD.AvoidReassigningLoopVariables",
184        "PMD.AvoidBranchingStatementAsLastInLoop", "PMD.CyclomaticComplexity",
185        "PMD.AvoidInstantiatingObjectsInLoops" })
186    private void extractLines() {
187        pending.flip();
188        if (!pending.hasRemaining()) {
189            pending.clear();
190            return;
191        }
192        if (endedWithLF && pending.get(pending.position()) == '\r') {
193            pending.get();
194        }
195        int end = pending.limit();
196        endedWithLF = false;
197        while (pending.hasRemaining()) {
198            int start = pending.position();
199            for (int pos = start; pos < end;) {
200                if (pending.get(pos) != '\n') {
201                    pos += 1;
202                    continue;
203                }
204                consumer
205                    .accept(new String(pending.array(), start, pos - start));
206                pos += 1;
207                endedWithLF = pos >= end;
208                if (pos < end && pending.get(pos) == '\r') {
209                    pos += 1;
210                }
211                pending.position(pos);
212                break;
213            }
214            if (pending.position() == start) {
215                // No LF found
216                break;
217            }
218        }
219        if (!pending.hasRemaining()) {
220            // Last input was or ended with complete line
221            pending.clear();
222            return;
223        }
224        // Incomplete line
225        if (isEof) {
226            consumer.accept(new String(pending.array(), pending.position(),
227                pending.remaining()));
228            return;
229        }
230        if (pending.position() == 0) {
231            // Nothing consumed, continue to write into pending
232            var limit = pending.limit();
233            pending.clear();
234            pending.position(limit);
235            return;
236        }
237        // Transfer remaining to beginning of pending
238        if (rest == null || rest.capacity() < pending.remaining()) {
239            rest = CharBuffer.allocate(pending.capacity());
240        }
241        rest.put(pending);
242        rest.flip();
243        pending.clear();
244        pending.put(rest);
245        rest.clear();
246    }
247
248    /**
249     * Checks if more input may become available.
250     *
251     * @return true, if successful
252     */
253    public boolean eof() {
254        return isEof;
255    }
256
257    /**
258     * Gets the next line.
259     *
260     * @return the line
261     */
262    public String getLine() {
263        return lines.poll();
264    }
265}