001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.Writer; 022import java.nio.Buffer; 023import java.util.Map; 024import org.jgrapes.core.Event; 025import org.jgrapes.core.EventPipeline; 026import org.jgrapes.io.IOSubchannel; 027import org.jgrapes.io.events.Close; 028import org.jgrapes.io.events.Input; 029import org.jgrapes.io.events.Output; 030 031/** 032 * A base class for {@link CharBufferWriter} and {@link ByteBufferWriter}. 033 */ 034public abstract class AbstractBufferWriter<B extends Buffer> extends Writer { 035 036 protected IOSubchannel channel; 037 private EventPipeline eventPipeline; 038 private boolean sendInputEvents; 039 protected ManagedBuffer<B> buffer; 040 private boolean sendClose = true; 041 private boolean sendEor = true; 042 private boolean eorSent; 043 private boolean isClosed; 044 private Map<Object, Object> eventAssociations; 045 046 /** 047 * Creates a new instance that uses {@link Output} events to dispatch 048 * buffers on the given channel, using the given event pipeline. 049 * 050 * @param channel 051 * the channel to fire events on 052 * @param eventPipeline 053 * the event pipeline used for firing events 054 */ 055 public AbstractBufferWriter(IOSubchannel channel, 056 EventPipeline eventPipeline) { 057 this.channel = channel; 058 this.eventPipeline = eventPipeline; 059 } 060 061 /** 062 * Creates a new instance that uses {@link Output} events to dispatch 063 * buffers on the given channel, using the channel's response pipeline. 064 * 065 * @param channel the channel to fire events on 066 */ 067 public AbstractBufferWriter(IOSubchannel channel) { 068 this(channel, channel.responsePipeline()); 069 } 070 071 /** 072 * Causes the data to be fired as {@link Input} events rather 073 * than the usual {@link Output} events. 074 * 075 * @return this object for easy chaining 076 */ 077 protected AbstractBufferWriter<B> sendInputEvents() { 078 sendInputEvents = true; 079 return this; 080 } 081 082 /** 083 * Suppresses sending of a close event when the stream is closed. 084 * 085 * @return this object for easy chaining 086 */ 087 public AbstractBufferWriter<B> suppressClose() { 088 sendClose = false; 089 return this; 090 } 091 092 /** 093 * Suppresses setting the end of record flag when the stream is 094 * flushed or closed. 095 * 096 * @return this object for easy chaining 097 * @see Output#isEndOfRecord() 098 */ 099 public AbstractBufferWriter<B> suppressEndOfRecord() { 100 sendEor = false; 101 return this; 102 } 103 104 /** 105 * Configure associations that are applied to the generated 106 * Output events, see {@link Event#setAssociated}. 107 * 108 * @param associations the associations to apply 109 * @return this object for easy chaining 110 */ 111 public AbstractBufferWriter<B> 112 setEventAssociations(Map<Object, Object> associations) { 113 eventAssociations = associations; 114 return this; 115 } 116 117 /** 118 * Ensure that a buffer for output data is available. 119 * 120 * @throws InterruptedException the interrupted exception 121 */ 122 protected abstract void ensureBufferAvailable() throws InterruptedException; 123 124 @Override 125 public abstract void write(char[] data, int offset, int length); 126 127 @Override 128 public void write(char[] cbuf) { 129 write(cbuf, 0, cbuf.length); 130 } 131 132 @Override 133 public abstract void write(String str, int offset, int length); 134 135 @Override 136 public void write(String str) { 137 write(str, 0, str.length()); 138 } 139 140 @Override 141 @SuppressWarnings("PMD.ShortVariable") 142 public void write(int ch) { 143 char[] buff = { (char) ch }; 144 write(buff, 0, 1); 145 } 146 147 /** 148 * Creates and fires an {@link Output} event with the buffer being filled. 149 * The end of record flag of the event is set according to the parameter. 150 * Frees any allocated buffer. 151 */ 152 protected void flush(boolean endOfRecord) { 153 if (buffer == null) { 154 if (!endOfRecord || eorSent) { 155 return; 156 } 157 try { 158 ensureBufferAvailable(); 159 } catch (InterruptedException e) { 160 Thread.currentThread().interrupt(); 161 return; 162 } 163 } 164 if (buffer.position() == 0 && (!endOfRecord || eorSent)) { 165 // Nothing to flush 166 buffer.unlockBuffer(); 167 } else { 168 if (sendInputEvents) { 169 eventPipeline.fire( 170 associate(Input.fromSink(buffer, endOfRecord)), channel); 171 } else { 172 eventPipeline.fire( 173 associate(Output.fromSink(buffer, endOfRecord)), channel); 174 } 175 eorSent = endOfRecord; 176 } 177 buffer = null; 178 } 179 180 /** 181 * Creates and fires a {@link Output} event with the buffer being filled 182 * if it contains any data. 183 * 184 * By default, the {@link Output} event is created with the end of record 185 * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 186 * flush as event. This implies that an {@link Output} event with no data 187 * (but the end of record flag set) may be fired. This behavior can 188 * be disabled with {@link #suppressEndOfRecord()}. 189 */ 190 @Override 191 public void flush() { 192 flush(sendEor); 193 } 194 195 /** 196 * Flushes any remaining data with the end of record flag set 197 * (unless {@link #suppressEndOfRecord()} has been called) 198 * and fires a {@link Close} event (unless {@link #suppressClose()} 199 * has been called). 200 */ 201 @Override 202 public void close() { 203 if (isClosed) { 204 return; 205 } 206 flush(sendEor); 207 if (sendClose) { 208 eventPipeline.fire(associate(new Close()), channel); 209 } 210 isClosed = true; 211 } 212 213 private Event<?> associate(Event<?> event) { 214 if (eventAssociations != null) { 215 for (var entry : eventAssociations.entrySet()) { 216 event.setAssociated(entry.getKey(), entry.getValue()); 217 } 218 } 219 return event; 220 } 221}