001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.nio.Buffer; 022import java.nio.ByteBuffer; 023import java.nio.CharBuffer; 024import java.nio.charset.Charset; 025import java.nio.charset.CharsetDecoder; 026import java.nio.charset.StandardCharsets; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.function.Consumer; 030 031/** 032 * Collects character data from buffers and makes it available as 033 * complete lines. 034 * 035 * Lines end with a LF which may optionally be followed by a CR. 036 * Neither character is part of the result returned by {@link #getLine()}. 037 * If no more input is expected and characters without trailing LF 038 * remain, these remaining character are returned as a line as well. 039 */ 040@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 041public class LineCollector implements InputConsumer { 042 private boolean isEof; 043 private CharsetDecoder decoder; 044 private Charset charset = StandardCharsets.UTF_8; 045 private CharBuffer pending; 046 private CharBuffer rest; 047 private boolean endedWithLF; 048 private final Queue<String> lines = new ConcurrentLinkedQueue<>(); 049 private Consumer<String> consumer = lines::add; 050 051 /** 052 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 053 * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 054 * Must be set before the first invocation of 055 * {@link #feed(ManagedBuffer)}. 056 * 057 * @param charset the charset 058 * @return the managed buffer reader 059 */ 060 public LineCollector charset(Charset charset) { 061 if (decoder != null) { 062 throw new IllegalStateException("Charset cannot be changed."); 063 } 064 this.charset = charset; 065 return this; 066 } 067 068 /** 069 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 070 * is invoked with `ManagedBuffer<ByteBuffer>` to the charset 071 * specified as system property `native.encoding`. If this 072 * property does not specify a valid charset, 073 * {@link Charset#defaultCharset()} is used. 074 * 075 * Must be invoked before the first invocation of 076 * {@link #feed(ManagedBuffer)}. 077 * 078 * @return the managed buffer reader 079 */ 080 @SuppressWarnings({ "PMD.AvoidCatchingGenericException", 081 "PMD.EmptyCatchBlock" }) 082 public LineCollector nativeCharset() { 083 Charset toSet = Charset.defaultCharset(); 084 var toCheck = System.getProperty("native.encoding"); 085 if (toCheck != null) { 086 try { 087 toSet = Charset.forName(toCheck); 088 } catch (Exception e) { 089 // If this fails, simply use default 090 } 091 } 092 charset(toSet); 093 return this; 094 } 095 096 /** 097 * Configures a consumer for lines. The consumer is invoked when 098 * a complete line has been detected. If a consumer is configured, 099 * {@link #getLine()} may not be used (always returns `null`). 100 * 101 * @param consumer the consumer 102 * @return the line collector 103 */ 104 public LineCollector consumer(Consumer<String> consumer) { 105 this.consumer = consumer; 106 return this; 107 } 108 109 /** 110 * Feed data to the collector. 111 * 112 * Calling this method with `null` as argument closes the feed. 113 * 114 * @param buffer the buffer 115 */ 116 public <W extends Buffer> void feed(W buffer) { 117 if (isEof) { 118 return; 119 } 120 if (buffer == null) { 121 isEof = true; 122 } else { 123 copyToPending(buffer); 124 } 125 extractLines(); 126 127 } 128 129 /** 130 * Feed data to the collector. 131 * 132 * Calling this method with `null` as argument closes the feed. 133 * 134 * @param buffer the buffer 135 */ 136 public <W extends Buffer> void feed(ManagedBuffer<W> buffer) { 137 if (buffer == null) { 138 feed((W) null); 139 } else { 140 feed(buffer.backingBuffer()); 141 } 142 } 143 144 private <W extends Buffer> void copyToPending(W buffer) { 145 try { 146 buffer.mark(); 147 if (pending == null) { 148 pending = CharBuffer.allocate(buffer.capacity()); 149 } 150 if (buffer instanceof CharBuffer charBuf) { 151 if (pending.remaining() < charBuf.remaining()) { 152 resizePending(charBuf); 153 } 154 pending.put(charBuf); 155 return; 156 } 157 if (decoder == null) { 158 decoder = charset.newDecoder(); 159 } 160 while (true) { 161 var result 162 = decoder.decode((ByteBuffer) buffer, pending, isEof); 163 if (!result.isOverflow()) { 164 break; 165 } 166 // Need larger buffer 167 resizePending(buffer); 168 } 169 } finally { 170 buffer.reset(); 171 } 172 } 173 174 private void resizePending(Buffer toAppend) { 175 var old = pending; 176 pending = CharBuffer.allocate(old.capacity() + toAppend.capacity()); 177 old.flip(); 178 pending.put(old); 179 } 180 181 @SuppressWarnings({ "PMD.CognitiveComplexity", "PMD.NcssCount", 182 "PMD.NPathComplexity", "PMD.AvoidLiteralsInIfCondition", 183 "PMD.AvoidReassigningLoopVariables", 184 "PMD.AvoidBranchingStatementAsLastInLoop", "PMD.CyclomaticComplexity", 185 "PMD.AvoidInstantiatingObjectsInLoops" }) 186 private void extractLines() { 187 pending.flip(); 188 if (!pending.hasRemaining()) { 189 pending.clear(); 190 return; 191 } 192 if (endedWithLF && pending.get(pending.position()) == '\r') { 193 pending.get(); 194 } 195 int end = pending.limit(); 196 endedWithLF = false; 197 while (pending.hasRemaining()) { 198 int start = pending.position(); 199 for (int pos = start; pos < end;) { 200 if (pending.get(pos) != '\n') { 201 pos += 1; 202 continue; 203 } 204 consumer 205 .accept(new String(pending.array(), start, pos - start)); 206 pos += 1; 207 endedWithLF = pos >= end; 208 if (pos < end && pending.get(pos) == '\r') { 209 pos += 1; 210 } 211 pending.position(pos); 212 break; 213 } 214 if (pending.position() == start) { 215 // No LF found 216 break; 217 } 218 } 219 if (!pending.hasRemaining()) { 220 // Last input was or ended with complete line 221 pending.clear(); 222 return; 223 } 224 // Incomplete line 225 if (isEof) { 226 consumer.accept(new String(pending.array(), pending.position(), 227 pending.remaining())); 228 return; 229 } 230 if (pending.position() == 0) { 231 // Nothing consumed, continue to write into pending 232 var limit = pending.limit(); 233 pending.clear(); 234 pending.position(limit); 235 return; 236 } 237 // Transfer remaining to beginning of pending 238 if (rest == null || rest.capacity() < pending.remaining()) { 239 rest = CharBuffer.allocate(pending.capacity()); 240 } 241 rest.put(pending); 242 rest.flip(); 243 pending.clear(); 244 pending.put(rest); 245 rest.clear(); 246 } 247 248 /** 249 * Checks if more input may become available. 250 * 251 * @return true, if successful 252 */ 253 public boolean eof() { 254 return isEof; 255 } 256 257 /** 258 * Gets the next line. 259 * 260 * @return the line 261 */ 262 public String getLine() { 263 return lines.poll(); 264 } 265}