001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.Writer; 022import java.nio.CharBuffer; 023import java.util.Map; 024import org.jgrapes.core.EventPipeline; 025import org.jgrapes.io.IOSubchannel; 026import org.jgrapes.io.events.Output; 027 028/** 029 * An {@link Writer} that is backed by {@link CharBuffer}s obtained from a 030 * queue. When a byte buffer is full, an {@link Output} event (default) is 031 * generated and a new buffer is fetched from the queue. 032 */ 033public class CharBufferWriter extends AbstractBufferWriter<CharBuffer> { 034 035 /** 036 * Creates a new instance that uses {@link Output} events to dispatch 037 * buffers on the given channel, using the given event pipeline. 038 * 039 * @param channel 040 * the channel to fire events on 041 * @param eventPipeline 042 * the event pipeline used for firing events 043 */ 044 public CharBufferWriter(IOSubchannel channel, EventPipeline eventPipeline) { 045 super(channel, eventPipeline); 046 } 047 048 /** 049 * Creates a new instance that uses {@link Output} events to dispatch 050 * buffers on the given channel, using the channel's response pipeline. 051 * 052 * @param channel the channel to fire events on 053 */ 054 public CharBufferWriter(IOSubchannel channel) { 055 super(channel); 056 } 057 058 @Override 059 public CharBufferWriter sendInputEvents() { 060 super.sendInputEvents(); 061 return this; 062 } 063 064 @Override 065 public CharBufferWriter suppressClose() { 066 super.suppressClose(); 067 return this; 068 } 069 070 @Override 071 public CharBufferWriter suppressEndOfRecord() { 072 super.suppressEndOfRecord(); 073 return this; 074 } 075 076 @Override 077 public CharBufferWriter 078 setEventAssociations(Map<Object, Object> associations) { 079 super.setEventAssociations(associations); 080 return this; 081 } 082 083 /** 084 * Ensure that a buffer for output data is available. 085 * 086 * @throws InterruptedException the interrupted exception 087 */ 088 @Override 089 protected void ensureBufferAvailable() throws InterruptedException { 090 if (buffer != null) { 091 return; 092 } 093 buffer = channel.charBufferPool().acquire(); 094 } 095 096 /* 097 * (non-Javadoc) 098 * 099 * @see java.io.Writer#write(char[], int, int) 100 */ 101 @Override 102 public void write(char[] data, int offset, int length) { 103 while (true) { 104 try { 105 ensureBufferAvailable(); 106 } catch (InterruptedException e) { 107 Thread.currentThread().interrupt(); 108 return; 109 } 110 var buf = buffer.backingBuffer(); 111 if (buf.remaining() > length) { 112 buf.put(data, offset, length); 113 break; 114 } else if (buf.remaining() == length) { 115 buf.put(data, offset, length); 116 flush(false); 117 break; 118 } else { 119 int chunkSize = buf.remaining(); 120 buf.put(data, offset, chunkSize); 121 flush(false); 122 length -= chunkSize; 123 offset += chunkSize; 124 } 125 } 126 } 127 128 @Override 129 public void write(String str, int offset, int length) { 130 while (true) { 131 try { 132 ensureBufferAvailable(); 133 } catch (InterruptedException e) { 134 Thread.currentThread().interrupt(); 135 return; 136 } 137 var buf = buffer.backingBuffer(); 138 if (buf.remaining() >= length) { 139 str.getChars(offset, offset + length, buf.array(), 140 buf.position()); 141 buf.position(buf.position() + length); 142 if (buf.remaining() == 0) { 143 flush(false); 144 } 145 break; 146 } 147 int chunkSize = buf.remaining(); 148 str.getChars(offset, offset + chunkSize, buf.array(), 149 buf.position()); 150 buf.position(buf.position() + chunkSize); 151 flush(false); 152 length -= chunkSize; 153 offset += chunkSize; 154 } 155 } 156 157 @Override 158 @SuppressWarnings("PMD.ShortVariable") 159 public CharBufferWriter append(char ch) { 160 write(ch); 161 return this; 162 } 163 164 @Override 165 public CharBufferWriter append(CharSequence csq) { 166 write(String.valueOf(csq)); 167 return this; 168 } 169 170 @Override 171 public CharBufferWriter append(CharSequence csq, int start, int end) { 172 if (csq == null) { 173 csq = "null"; 174 } 175 return append(csq.subSequence(start, end)); 176 } 177 178}