001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.OutputStream; 022import java.nio.ByteBuffer; 023import org.jgrapes.core.EventPipeline; 024import org.jgrapes.io.IOSubchannel; 025import org.jgrapes.io.events.Close; 026import org.jgrapes.io.events.Input; 027import org.jgrapes.io.events.Output; 028 029/** 030 * An {@link OutputStream} that is backed by {@link ByteBuffer}s obtained from a 031 * queue. When a byte buffer is full, a {@link Output} event (default) is 032 * generated and a new buffer is fetched from the queue. 033 */ 034public class ByteBufferOutputStream extends OutputStream { 035 036 private IOSubchannel channel; 037 private EventPipeline eventPipeline; 038 private boolean sendInputEvents; 039 private ManagedBuffer<ByteBuffer> buffer; 040 private boolean sendClose = true; 041 private boolean sendEor = true; 042 private boolean eorSent; 043 private boolean isClosed; 044 045 /** 046 * Creates a new instance that uses {@link Output} events to dispatch 047 * buffers on the given channel, using the given event pipeline. 048 * 049 * @param channel 050 * the channel to fire events on 051 * @param eventPipeline 052 * the event pipeline used for firing events 053 */ 054 public ByteBufferOutputStream(IOSubchannel channel, 055 EventPipeline eventPipeline) { 056 this.channel = channel; 057 this.eventPipeline = eventPipeline; 058 } 059 060 /** 061 * Creates a new instance that uses {@link Output} events to dispatch 062 * buffers on the given channel, using the channel's response pipeline. 063 * 064 * @param channel 065 * the channel to fire events on 066 */ 067 public ByteBufferOutputStream(IOSubchannel channel) { 068 this(channel, channel.responsePipeline()); 069 } 070 071 /** 072 * Causes the data to be fired as {@link Input} events rather 073 * than the usual {@link Output} events. 074 * 075 * @return the stream for easy chaining 076 */ 077 public ByteBufferOutputStream sendInputEvents() { 078 sendInputEvents = true; 079 return this; 080 } 081 082 /** 083 * Suppresses sending of a close event when the stream is closed. 084 * 085 * @return the stream for easy chaining 086 */ 087 public ByteBufferOutputStream suppressClose() { 088 sendClose = false; 089 return this; 090 } 091 092 /** 093 * Suppresses setting the end of record flag when the stream is 094 * flushed or closed. 095 * 096 * @return the stream for easy chaining 097 * @see Output#isEndOfRecord() 098 */ 099 public ByteBufferOutputStream suppressEndOfRecord() { 100 sendEor = false; 101 return this; 102 } 103 104 private void ensureBufferAvailable() throws InterruptedException { 105 if (buffer != null) { 106 return; 107 } 108 buffer = channel.byteBufferPool().acquire(); 109 } 110 111 /* 112 * (non-Javadoc) 113 * 114 * @see java.io.OutputStream#write(int) 115 */ 116 @Override 117 public void write(int data) { 118 try { 119 ensureBufferAvailable(); 120 } catch (InterruptedException e) { 121 Thread.currentThread().interrupt(); 122 return; 123 } 124 buffer.backingBuffer().put((byte) data); 125 if (!buffer.hasRemaining()) { 126 flush(false); 127 } 128 } 129 130 /* 131 * (non-Javadoc) 132 * 133 * @see java.io.OutputStream#write(byte[], int, int) 134 */ 135 @Override 136 public void write(byte[] data, int offset, int length) { 137 while (true) { 138 try { 139 ensureBufferAvailable(); 140 } catch (InterruptedException e) { 141 Thread.currentThread().interrupt(); 142 return; 143 } 144 if (buffer.remaining() > length) { 145 buffer.backingBuffer().put(data, offset, length); 146 break; 147 } else if (buffer.remaining() == length) { 148 buffer.backingBuffer().put(data, offset, length); 149 flush(false); 150 break; 151 } else { 152 int chunkSize = buffer.remaining(); 153 buffer.backingBuffer().put(data, offset, chunkSize); 154 flush(false); 155 length -= chunkSize; 156 offset += chunkSize; 157 } 158 } 159 } 160 161 @Override 162 public void write(byte[] data) { 163 write(data, 0, data.length); 164 } 165 166 /** 167 * Creates and fires an {@link Output} event with the buffer being filled. 168 * The end of record flag of the event is set according to the parameter. 169 * Frees any allocated buffer. 170 */ 171 private void flush(boolean endOfRecord) { 172 if (buffer == null) { 173 if (!endOfRecord || eorSent) { 174 return; 175 } 176 try { 177 ensureBufferAvailable(); 178 } catch (InterruptedException e) { 179 Thread.currentThread().interrupt(); 180 return; 181 } 182 } 183 if (buffer.position() == 0 && (!endOfRecord || eorSent)) { 184 // Nothing to flush 185 buffer.unlockBuffer(); 186 } else { 187 if (sendInputEvents) { 188 eventPipeline.fire(Input.fromSink(buffer, endOfRecord), 189 channel); 190 } else { 191 eventPipeline.fire(Output.fromSink(buffer, endOfRecord), 192 channel); 193 } 194 eorSent = endOfRecord; 195 } 196 buffer = null; 197 } 198 199 /** 200 * Creates and fires a {@link Output} event with the buffer being filled 201 * if it contains any data. 202 * 203 * By default, the {@link Output} event is created with the end of record 204 * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 205 * flush as event. This implies that an {@link Output} event with no data 206 * (but the end of record flag set) may be fired. This behavior can 207 * be disabled with {@link #suppressEndOfRecord()}. 208 */ 209 @Override 210 public void flush() { 211 flush(sendEor); 212 } 213 214 /** 215 * Flushes any remaining data with the end of record flag set 216 * (unless {@link #suppressEndOfRecord()} has been called) 217 * and fires a {@link Close} event (unless {@link #suppressClose()} 218 * has been called). 219 */ 220 @Override 221 public void close() { 222 if (isClosed) { 223 return; 224 } 225 flush(sendEor); 226 if (sendClose) { 227 eventPipeline.fire(new Close(), channel); 228 } 229 isClosed = true; 230 } 231}