001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2022, 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.io.Reader; 024import java.io.UncheckedIOException; 025import java.nio.Buffer; 026import java.nio.ByteBuffer; 027import java.nio.CharBuffer; 028import java.nio.charset.Charset; 029import java.nio.charset.CharsetDecoder; 030import java.nio.charset.StandardCharsets; 031import java.util.Objects; 032 033/** 034 * A {@link Reader} that provides the data from the {@link ManagedBuffer}s 035 * fed to it to a consumer. This class is intended to be used as a pipe 036 * between two threads. 037 */ 038public class ManagedBufferReader extends Reader implements InputConsumer { 039 040 private boolean isEndOfFeed; 041 private boolean isOpen = true; 042 private ManagedBuffer<? extends Buffer> current; 043 private CharsetDecoder decoder; 044 private CharBuffer decoded; 045 private Charset charset = StandardCharsets.UTF_8; 046 047 /** 048 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 049 * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 050 * Must be set before the first invocation of 051 * {@link #feed(ManagedBuffer)}. 052 * 053 * @param charset the charset 054 * @return the managed buffer reader 055 */ 056 public ManagedBufferReader charset(Charset charset) { 057 if (decoder != null) { 058 throw new IllegalStateException("Charset cannot be changed."); 059 } 060 this.charset = charset; 061 return this; 062 } 063 064 /** 065 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 066 * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 067 * Must be set before the first invocation of 068 * {@link #feed(ManagedBuffer)}. 069 * 070 * @param charset the charset 071 * @return the managed buffer reader 072 * @deprecated Use {@link #charset(Charset)} instead 073 */ 074 @Deprecated 075 public ManagedBufferReader setCharset(Charset charset) { 076 return charset(charset); 077 } 078 079 /** 080 * Sets the charset to be used if {@link #feed(ManagedBuffer)} 081 * is invoked with `ManagedBuffer<ByteBuffer>` to the charset 082 * specified as system property `native.encoding`. If this 083 * property does not specify a valid charset, 084 * {@link Charset#defaultCharset()} is used. 085 * 086 * Must be invoked before the first invocation of 087 * {@link #feed(ManagedBuffer)}. 088 * 089 * @return the managed buffer reader 090 */ 091 @SuppressWarnings({ "PMD.AvoidCatchingGenericException", 092 "PMD.EmptyCatchBlock", "PMD.DataflowAnomalyAnalysis" }) 093 public ManagedBufferReader nativeCharset() { 094 Charset toSet = Charset.defaultCharset(); 095 var toCheck = System.getProperty("native.encoding"); 096 if (toCheck != null) { 097 try { 098 toSet = Charset.forName(toCheck); 099 } catch (Exception e) { 100 // If this fails, simply use default 101 } 102 } 103 charset(toSet); 104 return this; 105 } 106 107 /** 108 * Feed data to the reader. The call blocks while data from a previous 109 * invocation has not been fully read. The buffer passed as argument 110 * is locked (see {@link ManagedBuffer#lockBuffer()}) until all 111 * data has been read. 112 * 113 * Calling this method with `null` as argument closes the feed. 114 * After consuming any data still available from a previous 115 * invocation, further calls to {@link #read} therefore return -1. 116 * 117 * @param buffer the buffer 118 * @throws IOException Signals that an I/O exception has occurred. 119 */ 120 @SuppressWarnings({ "PMD.PreserveStackTrace" }) 121 public <W extends Buffer> void feed(ManagedBuffer<W> buffer) { 122 synchronized (lock) { 123 if (buffer == null) { 124 isEndOfFeed = true; 125 notifyAll(); 126 return; 127 } 128 if (!isOpen || isEndOfFeed) { 129 return; 130 } 131 while (current != null) { 132 try { 133 lock.wait(); 134 } catch (InterruptedException e) { 135 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 136 var exc = new InterruptedIOException(e.getMessage()); 137 exc.setStackTrace(e.getStackTrace()); 138 throw new UncheckedIOException(exc); 139 } 140 } 141 current = buffer; 142 buffer.lockBuffer(); 143 lock.notifyAll(); 144 } 145 } 146 147 /** 148 * {@inheritDoc} 149 * 150 * Note that this is the {@link Reader}'s `close` method. In order 151 * to close the feed, call {@link #feed(ManagedBuffer)} with 152 * `null` as argument. 153 */ 154 @Override 155 public void close() throws IOException { 156 synchronized (lock) { 157 isOpen = false; 158 if (current != null) { 159 current.unlockBuffer(); 160 current = null; 161 } 162 lock.notifyAll(); 163 } 164 } 165 166 @Override 167 @SuppressWarnings({ "PMD.PreserveStackTrace", "unchecked", 168 "PMD.CognitiveComplexity", "PMD.DataflowAnomalyAnalysis", 169 "PMD.NcssCount" }) 170 public int read(char[] cbuf, int off, int len) throws IOException { 171 Objects.checkFromIndexSize(off, len, cbuf.length); 172 synchronized (lock) { 173 while (isOpen && current == null && !isEndOfFeed) { 174 try { 175 lock.wait(); 176 } catch (InterruptedException e) { 177 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 178 var exc = new InterruptedIOException(e.getMessage()); 179 exc.setStackTrace(e.getStackTrace()); 180 throw exc; 181 } 182 } 183 if (!isOpen || isEndOfFeed && current == null) { 184 return -1; 185 } 186 CharBuffer input; 187 if (current.backingBuffer() instanceof CharBuffer) { 188 input = ((ManagedBuffer<CharBuffer>) current).backingBuffer(); 189 } else { 190 if (decoder == null) { 191 decoder = charset.newDecoder(); 192 decoded = CharBuffer.allocate(current.capacity()); 193 } 194 var result = decoder.decode( 195 ((ManagedBuffer<ByteBuffer>) current).backingBuffer(), 196 decoded, isEndOfFeed); 197 assert !result.isOverflow(); 198 decoded.flip(); 199 input = decoded; 200 } 201 int transferred; 202 if (input.remaining() <= len) { 203 // Get all remaining. 204 transferred = input.remaining(); 205 input.get(cbuf, off, transferred); 206 if (decoded != null) { 207 decoded.clear(); 208 } 209 current.unlockBuffer(); 210 current = null; 211 lock.notifyAll(); 212 } else { 213 // Get requested. 214 transferred = len; 215 input.get(cbuf, off, transferred); 216 } 217 return transferred; 218 } 219 } 220 221}