001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.IOException; 022import java.io.OutputStreamWriter; 023import java.io.Writer; 024import java.nio.ByteBuffer; 025import java.nio.CharBuffer; 026import java.nio.charset.Charset; 027import java.nio.charset.CharsetEncoder; 028import java.nio.charset.StandardCharsets; 029import java.util.Map; 030import org.jgrapes.core.EventPipeline; 031import org.jgrapes.io.IOSubchannel; 032import org.jgrapes.io.events.Output; 033 034/** 035 * An {@link Writer} that encodes the data written to it and stores it 036 * in a {@link ByteBuffer} obtained from a queue. When a byte buffer 037 * is full, an {@link Output} event (default) is generated and a 038 * new buffer is fetched from the queue. 039 * 040 * The function of this class can also be achieved by wrapping a 041 * {@link ByteBufferOutputStream} in a {@link OutputStreamWriter}. 042 * The major advantage of this class is that it drops the 043 * {@link IOException}s (which cannot happen) from the methods. 044 * Besides, it should be more resource efficient. 045 */ 046public class ByteBufferWriter extends AbstractBufferWriter<ByteBuffer> { 047 048 private CharBuffer written; 049 private Charset charset = StandardCharsets.UTF_8; 050 private CharsetEncoder encoder; 051 052 /** 053 * Creates a new instance that uses {@link Output} events to dispatch 054 * buffers on the given channel, using the given event pipeline. 055 * 056 * @param channel 057 * the channel to fire events on 058 * @param eventPipeline 059 * the event pipeline used for firing events 060 */ 061 public ByteBufferWriter(IOSubchannel channel, EventPipeline eventPipeline) { 062 super(channel, eventPipeline); 063 } 064 065 /** 066 * Creates a new instance that uses {@link Output} events to dispatch 067 * buffers on the given channel, using the channel's response pipeline. 068 * 069 * @param channel the channel to fire events on 070 */ 071 public ByteBufferWriter(IOSubchannel channel) { 072 super(channel); 073 } 074 075 @Override 076 public ByteBufferWriter sendInputEvents() { 077 super.sendInputEvents(); 078 return this; 079 } 080 081 @Override 082 public ByteBufferWriter suppressClose() { 083 super.suppressClose(); 084 return this; 085 } 086 087 @Override 088 public ByteBufferWriter suppressEndOfRecord() { 089 super.suppressEndOfRecord(); 090 return this; 091 } 092 093 @Override 094 public ByteBufferWriter 095 setEventAssociations(Map<Object, Object> associations) { 096 super.setEventAssociations(associations); 097 return this; 098 } 099 100 /** 101 * Sets the charset to be used for converting the written data 102 * to bytes which defaults to UTF-8. Must be set before the first 103 * invocation of any write method. 104 * 105 * @param charset the charset 106 * @return the writer 107 */ 108 public ByteBufferWriter charset(Charset charset) { 109 if (encoder != null) { 110 throw new IllegalStateException("Charset cannot be changed."); 111 } 112 this.charset = charset; 113 return this; 114 } 115 116 /** 117 * Sets the charset to be used for converting the written data 118 * to bytes to the charset specified as system property 119 * `native.encoding`. If this property does not specify a valid 120 * charset, {@link Charset#defaultCharset()} is used. 121 * 122 * Must be invoked before the first write (or append) operation. 123 * 124 * @return the writer 125 */ 126 @SuppressWarnings({ "PMD.AvoidCatchingGenericException", 127 "PMD.EmptyCatchBlock", "PMD.DataflowAnomalyAnalysis" }) 128 public ByteBufferWriter nativeCharset() { 129 Charset toSet = Charset.defaultCharset(); 130 var toCheck = System.getProperty("native.encoding"); 131 if (toCheck != null) { 132 try { 133 toSet = Charset.forName(toCheck); 134 } catch (Exception e) { 135 // If this fails, simply use default 136 } 137 } 138 charset(toSet); 139 return this; 140 } 141 142 @Override 143 protected void ensureBufferAvailable() throws InterruptedException { 144 if (buffer != null) { 145 return; 146 } 147 buffer = channel.byteBufferPool().acquire(); 148 } 149 150 private void ensureWrittenAvailable() throws InterruptedException { 151 if (written == null) { 152 ensureBufferAvailable(); 153 written = CharBuffer.allocate(buffer.capacity()); 154 encoder = charset.newEncoder(); 155 } 156 } 157 158 private void encode() throws InterruptedException { 159 written.flip(); 160 while (true) { 161 ensureBufferAvailable(); 162 var res = encoder.encode(written, buffer.backingBuffer(), false); 163 if (res.isUnderflow()) { 164 // This should not be possible (incomplete character?). 165 var carryOver = CharBuffer.allocate(written.capacity()); 166 carryOver.put(written); 167 written = carryOver; 168 return; 169 } 170 if (!res.isOverflow()) { 171 break; 172 } 173 flush(false); 174 } 175 // written processed 176 written.clear(); 177 } 178 179 /* 180 * (non-Javadoc) 181 * 182 * @see java.io.Writer#write(char[], int, int) 183 */ 184 @Override 185 public void write(char[] data, int offset, int length) { 186 while (true) { 187 try { 188 ensureWrittenAvailable(); 189 if (written.remaining() >= length) { 190 written.put(data, offset, length); 191 encode(); 192 break; 193 } 194 int chunkSize = written.remaining(); 195 written.put(data, offset, chunkSize); 196 length -= chunkSize; 197 offset += chunkSize; 198 encode(); 199 } catch (InterruptedException e) { 200 Thread.currentThread().interrupt(); 201 return; 202 } 203 } 204 } 205 206 @Override 207 public void write(char[] cbuf) { 208 write(cbuf, 0, cbuf.length); 209 } 210 211 @Override 212 public void write(String str, int offset, int length) { 213 while (true) { 214 try { 215 ensureWrittenAvailable(); 216 if (written.remaining() >= length) { 217 str.getChars(offset, offset + length, written.array(), 218 written.position()); 219 written.position(written.position() + length); 220 encode(); 221 break; 222 } 223 int chunkSize = buffer.remaining(); 224 str.getChars(offset, offset + chunkSize, written.array(), 225 written.position()); 226 written.position(written.position() + chunkSize); 227 length -= chunkSize; 228 offset += chunkSize; 229 encode(); 230 } catch (InterruptedException e) { 231 Thread.currentThread().interrupt(); 232 return; 233 } 234 } 235 } 236 237 @Override 238 public void write(String str) { 239 write(str, 0, str.length()); 240 } 241 242 @Override 243 @SuppressWarnings("PMD.ShortVariable") 244 public void write(int ch) { 245 char[] buff = { (char) ch }; 246 write(buff, 0, 1); 247 } 248 249 @Override 250 @SuppressWarnings("PMD.ShortVariable") 251 public ByteBufferWriter append(char ch) { 252 write(ch); 253 return this; 254 } 255 256 @Override 257 public ByteBufferWriter append(CharSequence csq) { 258 write(String.valueOf(csq)); 259 return this; 260 } 261 262 @Override 263 public ByteBufferWriter append(CharSequence csq, int start, int end) { 264 if (csq == null) { 265 csq = "null"; 266 } 267 return append(csq.subSequence(start, end)); 268 } 269 270}