001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016, 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.ByteArrayInputStream; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.ByteBuffer; 026import java.nio.channels.SeekableByteChannel; 027import java.util.Map; 028import org.jgrapes.core.Event; 029import org.jgrapes.core.EventPipeline; 030import org.jgrapes.io.IOSubchannel; 031import org.jgrapes.io.events.Closed; 032import org.jgrapes.io.events.IOError; 033import org.jgrapes.io.events.IOEvent; 034import org.jgrapes.io.events.Input; 035import org.jgrapes.io.events.Output; 036 037/** 038 * Forwards the content of an input stream as a sequence of 039 * {@link Output} (or optionally {@link Input}) events. 040 * 041 * The default settings and the constructor 042 * {@link #InputStreamPipeline(InputStream, IOSubchannel)} reflect 043 * the usage of this class for generating a response (e.g. provide 044 * the content of a file in response to a request from a client). 045 * Using the class with a "downstream" event pipeline, generating 046 * {@link Input} events is used when an input stream generates events 047 * that should be processed as requests by the application. 048 */ 049@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 050public class InputStreamPipeline implements Runnable { 051 052 private InputStream inStream; 053 private IOSubchannel channel; 054 private EventPipeline eventPipeline; 055 private boolean sendClosed = true; 056 private Map<Object, Object> eventAssociations; 057 private boolean sendInputEvents; 058 059 /** 060 * Creates a new pipeline that sends the data from the given input stream 061 * as events on the given channel, using the given event pipeline. 062 * 063 * @param in the input stream to read from 064 * @param channel the channel to send to 065 * @param eventPipeline 066 * the event pipeline used for firing events 067 */ 068 @SuppressWarnings("PMD.ShortVariable") 069 public InputStreamPipeline(InputStream in, IOSubchannel channel, 070 EventPipeline eventPipeline) { 071 this.inStream = in; 072 this.channel = channel; 073 this.eventPipeline = eventPipeline; 074 } 075 076 /** 077 * Creates a new pipeline that sends the data from the given input stream 078 * as events on the given channel, using the channel's response pipeline. 079 * 080 * @param in the input stream to read from 081 * @param channel the channel to send to 082 */ 083 @SuppressWarnings("PMD.ShortVariable") 084 public InputStreamPipeline(InputStream in, IOSubchannel channel) { 085 this(in, channel, channel.responsePipeline()); 086 } 087 088 /** 089 * Causes the data to be fired as {@link Input} events rather 090 * than the usual {@link Output} events. 091 * 092 * @return the stream for easy chaining 093 */ 094 public InputStreamPipeline sendInputEvents() { 095 sendInputEvents = true; 096 return this; 097 } 098 099 /** 100 * Suppresses the sending of a closed event when the stream is closed. 101 * 102 * @return the stream for easy chaining 103 */ 104 public InputStreamPipeline suppressClosed() { 105 sendClosed = false; 106 return this; 107 } 108 109 /** 110 * Configure associations that are applied to the generated 111 * Output events, see {@link Event#setAssociated}. 112 * 113 * @param associations the associations to apply 114 * @return the pipeline for easy chaining 115 */ 116 public InputStreamPipeline 117 setEventAssociations(Map<Object, Object> associations) { 118 eventAssociations = associations; 119 return this; 120 } 121 122 @Override 123 @SuppressWarnings("PMD.CloseResource") 124 public void run() { 125 try { 126 if (inStream instanceof FileInputStream fip) { 127 seekableTransfer(fip.getChannel()); 128 } else { 129 defaultTransfer(); 130 } 131 if (sendClosed) { 132 eventPipeline.fire(associate(new Closed<Void>()), channel); 133 } 134 } catch (InterruptedException e) { // NOPMD 135 // Just stop 136 } catch (IOException e) { 137 eventPipeline.fire(associate(new IOError(null, e)), channel); 138 } 139 } 140 141 private void defaultTransfer() throws InterruptedException, IOException { 142 // If available() returns remaining, we can optimize. 143 // Regrettably, there is no marker interface for this, but 144 // the assumption should be true for ByteArrayInputStream. 145 boolean availableIsRemaining = inStream instanceof ByteArrayInputStream; 146 while (true) { 147 ManagedBuffer<ByteBuffer> buffer = null; 148 try { 149 buffer = channel.byteBufferPool().acquire(); 150 var backing = buffer.backing; 151 int recvd = inStream.read(backing.array(), 152 backing.position(), backing.remaining()); 153 if (recvd > 0) { 154 boolean eof 155 = availableIsRemaining && inStream.available() == 0; 156 backing.position(backing.position() + recvd); 157 eventPipeline.fire(associate(ioEvent(buffer, eof)), 158 channel); 159 if (eof) { 160 break; 161 } 162 continue; 163 } 164 if (recvd == -1) { 165 eventPipeline.fire(associate(ioEvent(buffer, true)), 166 channel); 167 break; 168 } 169 // Reading 0 bytes shouldn't happen. 170 buffer.unlockBuffer(); 171 } catch (IOException e) { 172 buffer.unlockBuffer(); 173 throw e; 174 } 175 } 176 } 177 178 /** 179 * A seekable channel allows us to avoid generating an event with 180 * no data and eof set, because we can check after reading if there 181 * is remaining data. 182 * 183 * @param input the input 184 * @throws InterruptedException the interrupted exception 185 * @throws IOException Signals that an I/O exception has occurred. 186 */ 187 private void seekableTransfer(SeekableByteChannel input) 188 throws InterruptedException, IOException { 189 while (true) { 190 ManagedBuffer<ByteBuffer> buffer = null; 191 try { 192 buffer = channel.byteBufferPool().acquire(); 193 int recvd = input.read(buffer.backing); 194 if (recvd > 0) { 195 boolean eof = input.position() == input.size(); 196 eventPipeline.fire(associate(ioEvent(buffer, eof)), 197 channel); 198 if (eof) { 199 break; 200 } 201 continue; 202 } 203 if (recvd == -1) { 204 eventPipeline.fire(associate(ioEvent(buffer, true)), 205 channel); 206 break; 207 } 208 // Reading 0 bytes shouldn't happen. 209 buffer.unlockBuffer(); 210 } catch (IOException e) { 211 buffer.unlockBuffer(); 212 throw e; 213 } 214 } 215 } 216 217 private IOEvent<ByteBuffer> ioEvent(ManagedBuffer<ByteBuffer> buffer, 218 boolean eor) { 219 if (sendInputEvents) { 220 return Input.fromSink(buffer, eor); 221 } 222 return Output.fromSink(buffer, eor); 223 } 224 225 private Event<?> associate(Event<?> event) { 226 if (eventAssociations != null) { 227 for (var entry : eventAssociations.entrySet()) { 228 event.setAssociated(entry.getKey(), entry.getValue()); 229 } 230 } 231 return event; 232 } 233 234}