001/******************************************************************************* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016, 2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 *******************************************************************************/ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.nio.ByteBuffer; 024import java.nio.channels.Channels; 025import java.nio.channels.ReadableByteChannel; 026import java.util.Optional; 027import org.jgrapes.core.Channel; 028import org.jgrapes.core.Component; 029import org.jgrapes.core.Components; 030import org.jgrapes.core.Manager; 031import org.jgrapes.core.annotation.Handler; 032import org.jgrapes.core.events.Start; 033import org.jgrapes.core.events.Stop; 034import org.jgrapes.io.events.IOError; 035import org.jgrapes.io.events.Input; 036import org.jgrapes.io.util.ManagedBuffer; 037import org.jgrapes.io.util.ManagedBufferPool; 038import org.jgrapes.util.events.ConfigurationUpdate; 039 040/** 041 * A component that watches for new input on an 042 * {@link InputStream}. If new input becomes 043 * available, it is fired as {@link Input} event. 044 * 045 * This component should only be used to monitor an 046 * input stream that is available during the complete 047 * lifetime of the application. A typical usage is 048 * to make data from `System.in` available as events. 049 */ 050@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 051public class InputStreamMonitor extends Component implements Runnable { 052 053 @SuppressWarnings("PMD.SingularField") 054 private Channel dataChannel; 055 @SuppressWarnings("PMD.SingularField") 056 private InputStream input; 057 private boolean registered; 058 private Thread runner; 059 @SuppressWarnings("PMD.SingularField") 060 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers; 061 private int bufferSize = 2048; 062 063 /** 064 * Creates a new input stream monitor with its channel set to the given 065 * channel. The channel is also used for firing the {@link Input} 066 * events. 067 * 068 * @param componentChannel the component channel 069 * @param input the input stream 070 * @param dataChannel the data channel 071 */ 072 public InputStreamMonitor( 073 Channel componentChannel, InputStream input, Channel dataChannel) { 074 super(componentChannel); 075 this.input = input; 076 this.dataChannel = dataChannel; 077 } 078 079 /** 080 * Creates a new input stream monitor with its channel set to the given 081 * channel. The channel is also used for firing the {@link Input} 082 * events. 083 * 084 * @param componentChannel the component channel 085 * @param input the input 086 */ 087 public InputStreamMonitor(Channel componentChannel, InputStream input) { 088 this(componentChannel, input, componentChannel); 089 } 090 091 /** 092 * Sets the buffer size. 093 * 094 * @param bufferSize the buffer size 095 * @return the input stream monitor for easy chaining 096 */ 097 public InputStreamMonitor setBufferSize(int bufferSize) { 098 this.bufferSize = bufferSize; 099 return this; 100 } 101 102 /** 103 * Returns the buffer size. 104 * 105 * @return the buffer size 106 */ 107 public int getBufferSize() { 108 return bufferSize; 109 } 110 111 /** 112 * The component can be configured with events that include 113 * a path (see @link {@link ConfigurationUpdate#paths()}) 114 * that matches this components path (see {@link Manager#componentPath()}). 115 * 116 * The following properties are recognized: 117 * 118 * `bufferSize` 119 * : See {@link #setBufferSize(int)}. 120 * 121 * @param event the event 122 */ 123 @Handler 124 public void onConfigurationUpdate(ConfigurationUpdate event) { 125 event.values(componentPath()).ifPresent(values -> { 126 Optional.ofNullable(values.get("bufferSize")).ifPresent( 127 value -> setBufferSize(Integer.parseInt(value))); 128 }); 129 } 130 131 /** 132 * Starts a thread that continuously reads available 133 * data from the input stream. 134 * 135 * @param event the event 136 */ 137 @Handler 138 public void onStart(Start event) { 139 synchronized (this) { 140 if (runner != null) { 141 return; 142 } 143 buffers = new ManagedBufferPool<>(ManagedBuffer::new, 144 () -> { 145 return ByteBuffer.allocateDirect(bufferSize); 146 }, 2); 147 runner = (Components.useVirtualThreads() ? Thread.ofVirtual() 148 : Thread.ofPlatform()).name(Components.simpleObjectName(this)) 149 .start(this); 150 } 151 } 152 153 /** 154 * Stops the thread that reads data from the input stream. 155 * Note that the input stream is not closed. 156 * 157 * @param event the event 158 * @throws InterruptedException the interrupted exception 159 */ 160 @Handler(priority = -10_000) 161 public void onStop(Stop event) throws InterruptedException { 162 synchronized (this) { 163 if (runner == null) { 164 return; 165 } 166 runner.interrupt(); 167 synchronized (this) { 168 if (registered) { 169 unregisterAsGenerator(); 170 registered = false; 171 } 172 } 173 runner = null; 174 } 175 } 176 177 @Override 178 public void run() { 179 Thread.currentThread().setName(Components.simpleObjectName(this)); 180 try { 181 synchronized (this) { 182 registerAsGenerator(); 183 registered = true; 184 } 185 @SuppressWarnings("PMD.CloseResource") 186 ReadableByteChannel inChannel = Channels.newChannel(input); 187 while (!Thread.currentThread().isInterrupted()) { 188 ManagedBuffer<ByteBuffer> buffer = buffers.acquire(); 189 int read = buffer.fillFromChannel(inChannel); 190 boolean eof = read == -1; 191 fire(Input.fromSink(buffer, eof), dataChannel); 192 if (eof) { 193 break; 194 } 195 } 196 } catch (InterruptedException e) { // NOPMD 197 // Some called stop(), so what? 198 } catch (IOException e) { 199 fire(new IOError(null, e), channel()); 200 } finally { 201 synchronized (this) { 202 if (registered) { 203 unregisterAsGenerator(); 204 registered = false; 205 } 206 } 207 } 208 } 209 210}