001/*******************************************************************************
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 *******************************************************************************/
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.Channels;
025import java.nio.channels.ReadableByteChannel;
026import java.util.Optional;
027import org.jgrapes.core.Channel;
028import org.jgrapes.core.Component;
029import org.jgrapes.core.Components;
030import org.jgrapes.core.Manager;
031import org.jgrapes.core.annotation.Handler;
032import org.jgrapes.core.events.Start;
033import org.jgrapes.core.events.Stop;
034import org.jgrapes.io.events.IOError;
035import org.jgrapes.io.events.Input;
036import org.jgrapes.io.util.ManagedBuffer;
037import org.jgrapes.io.util.ManagedBufferPool;
038import org.jgrapes.util.events.ConfigurationUpdate;
039
040/**
041 * A component that watches for new input on an
042 * {@link InputStream}. If new input becomes
043 * available, it is fired as {@link Input} event.
044 * 
045 * This component should only be used to monitor an
046 * input stream that is available during the complete
047 * lifetime of the application. A typical usage is
048 * to make data from `System.in` available as events.
049 */
050@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
051public class InputStreamMonitor extends Component implements Runnable {
052
053    @SuppressWarnings("PMD.SingularField")
054    private Channel dataChannel;
055    @SuppressWarnings("PMD.SingularField")
056    private InputStream input;
057    private boolean registered;
058    private Thread runner;
059    @SuppressWarnings("PMD.SingularField")
060    private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers;
061    private int bufferSize = 2048;
062
063    /**
064     * Creates a new input stream monitor with its channel set to the given 
065     * channel. The channel is also used for firing the {@link Input}
066     * events.
067     *
068     * @param componentChannel the component channel
069     * @param input the input stream
070     * @param dataChannel the data channel
071     */
072    public InputStreamMonitor(
073            Channel componentChannel, InputStream input, Channel dataChannel) {
074        super(componentChannel);
075        this.input = input;
076        this.dataChannel = dataChannel;
077    }
078
079    /**
080     * Creates a new input stream monitor with its channel set to the given 
081     * channel. The channel is also used for firing the {@link Input}
082     * events.
083     *
084     * @param componentChannel the component channel
085     * @param input the input
086     */
087    public InputStreamMonitor(Channel componentChannel, InputStream input) {
088        this(componentChannel, input, componentChannel);
089    }
090
091    /**
092     * Sets the buffer size.
093     *
094     * @param bufferSize the buffer size
095     * @return the input stream monitor for easy chaining
096     */
097    public InputStreamMonitor setBufferSize(int bufferSize) {
098        this.bufferSize = bufferSize;
099        return this;
100    }
101
102    /**
103     * Returns the buffer size.
104     *
105     * @return the buffer size
106     */
107    public int getBufferSize() {
108        return bufferSize;
109    }
110
111    /**
112     * The component can be configured with events that include
113     * a path (see @link {@link ConfigurationUpdate#paths()})
114     * that matches this components path (see {@link Manager#componentPath()}).
115     * 
116     * The following properties are recognized:
117     * 
118     * `bufferSize`
119     * : See {@link #setBufferSize(int)}.
120     * 
121     * @param event the event
122     */
123    @Handler
124    public void onConfigurationUpdate(ConfigurationUpdate event) {
125        event.values(componentPath()).ifPresent(values -> {
126            Optional.ofNullable(values.get("bufferSize")).ifPresent(
127                value -> setBufferSize(Integer.parseInt(value)));
128        });
129    }
130
131    /**
132     * Starts a thread that continuously reads available
133     * data from the input stream. 
134     *
135     * @param event the event
136     */
137    @Handler
138    public void onStart(Start event) {
139        synchronized (this) {
140            if (runner != null) {
141                return;
142            }
143            buffers = new ManagedBufferPool<>(ManagedBuffer::new,
144                () -> {
145                    return ByteBuffer.allocateDirect(bufferSize);
146                }, 2);
147            runner = (Components.useVirtualThreads() ? Thread.ofVirtual()
148                : Thread.ofPlatform()).name(Components.simpleObjectName(this))
149                    .start(this);
150        }
151    }
152
153    /**
154     * Stops the thread that reads data from the input stream.
155     * Note that the input stream is not closed.
156     *
157     * @param event the event
158     * @throws InterruptedException the interrupted exception
159     */
160    @Handler(priority = -10_000)
161    public void onStop(Stop event) throws InterruptedException {
162        synchronized (this) {
163            if (runner == null) {
164                return;
165            }
166            runner.interrupt();
167            synchronized (this) {
168                if (registered) {
169                    unregisterAsGenerator();
170                    registered = false;
171                }
172            }
173            runner = null;
174        }
175    }
176
177    @Override
178    public void run() {
179        Thread.currentThread().setName(Components.simpleObjectName(this));
180        try {
181            synchronized (this) {
182                registerAsGenerator();
183                registered = true;
184            }
185            @SuppressWarnings("PMD.CloseResource")
186            ReadableByteChannel inChannel = Channels.newChannel(input);
187            while (!Thread.currentThread().isInterrupted()) {
188                ManagedBuffer<ByteBuffer> buffer = buffers.acquire();
189                int read = buffer.fillFromChannel(inChannel);
190                boolean eof = read == -1;
191                fire(Input.fromSink(buffer, eof), dataChannel);
192                if (eof) {
193                    break;
194                }
195            }
196        } catch (InterruptedException e) { // NOPMD
197            // Some called stop(), so what?
198        } catch (IOException e) {
199            fire(new IOError(null, e), channel());
200        } finally {
201            synchronized (this) {
202                if (registered) {
203                    unregisterAsGenerator();
204                    registered = false;
205                }
206            }
207        }
208    }
209
210}