001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.nio.Buffer; 022import java.nio.ByteBuffer; 023import java.nio.CharBuffer; 024import java.nio.charset.Charset; 025import java.nio.charset.CharsetDecoder; 026import java.nio.charset.StandardCharsets; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.function.Consumer; 030import org.jgrapes.io.events.Input; 031 032/** 033 * Collects character data from buffers and makes it available as 034 * complete lines. 035 * 036 * Lines end with a LF which may optionally be followed by a CR. 037 * Neither character is part of the result returned by {@link #getLine()}. 038 * If no more input is expected and characters without trailing LF 039 * remain, these remaining character are returned as a line as well. 040 */ 041@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 042public class LineCollector { 043 private boolean isEof; 044 private CharsetDecoder decoder; 045 private Charset charset = StandardCharsets.UTF_8; 046 private CharBuffer pending; 047 private CharBuffer rest; 048 private boolean endedWithLF; 049 private final Queue<String> lines = new ConcurrentLinkedQueue<>(); 050 private Consumer<String> consumer = s -> lines.add(s); 051 052 /** 053 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 054 * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 055 * Must be set before the first invocation of 056 * {@link #feed(ManagedBuffer)}. 057 * 058 * @param charset the charset 059 * @return the managed buffer reader 060 */ 061 public LineCollector charset(Charset charset) { 062 if (decoder != null) { 063 throw new IllegalStateException("Charset cannot be changed."); 064 } 065 this.charset = charset; 066 return this; 067 } 068 069 /** 070 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 071 * is invoked with `ManagedBuffer<ByteBuffer>` to the charset 072 * specified as system property `native.encoding`. If this 073 * property does not specify a valid charset, 074 * {@link Charset#defaultCharset()} is used. 075 * 076 * Must be invoked before the first invocation of 077 * {@link #feed(ManagedBuffer)}. 078 * 079 * @return the managed buffer reader 080 */ 081 @SuppressWarnings({ "PMD.AvoidCatchingGenericException", 082 "PMD.EmptyCatchBlock" }) 083 public LineCollector nativeCharset() { 084 Charset toSet = Charset.defaultCharset(); 085 var toCheck = System.getProperty("native.encoding"); 086 if (toCheck != null) { 087 try { 088 toSet = Charset.forName(toCheck); 089 } catch (Exception e) { 090 // If this fails, simply use default 091 } 092 } 093 charset(toSet); 094 return this; 095 } 096 097 /** 098 * Configures a consumer for lines. The consumer is invoked when 099 * a complete line has been detected. If a consumer is configured, 100 * {@link #getLine()} may not be used (always returns `null`). 101 * 102 * @param consumer the consumer 103 * @return the line collector 104 */ 105 public LineCollector consumer(Consumer<String> consumer) { 106 this.consumer = consumer; 107 return this; 108 } 109 110 /** 111 * Feed data to the collector. 112 * 113 * Calling this method with `null` as argument closes the feed. 114 * 115 * @param buffer the buffer 116 */ 117 public <W extends Buffer> void feed(W buffer) { 118 if (isEof) { 119 return; 120 } 121 if (buffer == null) { 122 isEof = true; 123 } else { 124 copyToPending(buffer); 125 } 126 extractLines(); 127 128 } 129 130 /** 131 * Feed data to the collector. 132 * 133 * Calling this method with `null` as argument closes the feed. 134 * 135 * @param buffer the buffer 136 */ 137 public <W extends Buffer> void feed(ManagedBuffer<W> buffer) { 138 if (buffer == null) { 139 feed((W) null); 140 } else { 141 feed(buffer.backingBuffer()); 142 } 143 } 144 145 /** 146 * Feed data to the collector. 147 * 148 * Calling this method with `null` as argument closes the feed. 149 * 150 * @param event the event 151 */ 152 public <W extends Buffer> void feed(Input<W> event) { 153 if (event == null) { 154 feed((W) null); 155 } else { 156 feed(event.buffer()); 157 } 158 } 159 160 private <W extends Buffer> void copyToPending(W buffer) { 161 try { 162 buffer.mark(); 163 if (pending == null) { 164 pending = CharBuffer.allocate(buffer.capacity()); 165 } 166 if (buffer instanceof CharBuffer charBuf) { 167 if (pending.remaining() < charBuf.remaining()) { 168 resizePending(charBuf); 169 } 170 pending.put(charBuf); 171 return; 172 } 173 if (decoder == null) { 174 decoder = charset.newDecoder(); 175 } 176 while (true) { 177 var result 178 = decoder.decode((ByteBuffer) buffer, pending, isEof); 179 if (!result.isOverflow()) { 180 break; 181 } 182 // Need larger buffer 183 resizePending(buffer); 184 } 185 } finally { 186 buffer.reset(); 187 } 188 } 189 190 private void resizePending(Buffer toAppend) { 191 var old = pending; 192 pending = CharBuffer.allocate(old.capacity() + toAppend.capacity()); 193 old.flip(); 194 pending.put(old); 195 } 196 197 @SuppressWarnings({ "PMD.CognitiveComplexity", "PMD.NcssCount", 198 "PMD.NPathComplexity", "PMD.AvoidLiteralsInIfCondition", 199 "PMD.AvoidReassigningLoopVariables", 200 "PMD.AvoidBranchingStatementAsLastInLoop", "PMD.CyclomaticComplexity", 201 "PMD.AvoidInstantiatingObjectsInLoops" }) 202 private void extractLines() { 203 pending.flip(); 204 if (!pending.hasRemaining()) { 205 pending.clear(); 206 return; 207 } 208 if (endedWithLF && pending.get(pending.position()) == '\r') { 209 pending.get(); 210 } 211 int end = pending.limit(); 212 endedWithLF = false; 213 while (pending.hasRemaining()) { 214 int start = pending.position(); 215 for (int pos = start; pos < end;) { 216 if (pending.get(pos) != '\n') { 217 pos += 1; 218 continue; 219 } 220 consumer 221 .accept(new String(pending.array(), start, pos - start)); 222 pos += 1; 223 endedWithLF = pos >= end; 224 if (pos < end && pending.get(pos) == '\r') { 225 pos += 1; 226 } 227 pending.position(pos); 228 break; 229 } 230 if (pending.position() == start) { 231 // No LF found 232 break; 233 } 234 } 235 if (!pending.hasRemaining()) { 236 // Last input was or ended with complete line 237 pending.clear(); 238 return; 239 } 240 // Incomplete line 241 if (isEof) { 242 consumer.accept(new String(pending.array(), pending.position(), 243 pending.remaining())); 244 return; 245 } 246 if (pending.position() == 0) { 247 // Nothing consumed, continue to write into pending 248 var limit = pending.limit(); 249 pending.clear(); 250 pending.position(limit); 251 return; 252 } 253 // Transfer remaining to beginning of pending 254 if (rest == null || rest.capacity() < pending.remaining()) { 255 rest = CharBuffer.allocate(pending.capacity()); 256 } 257 rest.put(pending); 258 rest.flip(); 259 pending.clear(); 260 pending.put(rest); 261 rest.clear(); 262 } 263 264 /** 265 * Checks if more input may become available. 266 * 267 * @return true, if successful 268 */ 269 public boolean eof() { 270 return isEof; 271 } 272 273 /** 274 * Gets the next line. 275 * 276 * @return the line 277 */ 278 public String getLine() { 279 return lines.poll(); 280 } 281}