001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.IOException;
022import java.io.Reader;
023import java.nio.Buffer;
024import java.nio.charset.Charset;
025import java.util.function.Consumer;
026
027import org.jgrapes.core.Components;
028import org.jgrapes.io.events.Input;
029
030/**
031 * Starts a thread with a synchronous consumer of input provided by a
032 * {@link Reader} which is fed with data from {@link ManagedBuffer}s
033 * (usually obtained from {@link Input} events).
034 *  
035 * @since 2.8.0
036 */
037public class ManagedBufferStreamer implements InputConsumer {
038
039    private final ManagedBufferReader reader = new ManagedBufferReader();
040
041    /**
042     * Instantiates a new managed buffer streamer.
043     *
044     * @param processor the processor
045     */
046    public ManagedBufferStreamer(Consumer<Reader> processor) {
047        Thread thread = (Components.useVirtualThreads() ? Thread.ofVirtual()
048            : Thread.ofPlatform()).start(() -> {
049                processor.accept(reader);
050            });
051        ThreadCleaner.watch(this, thread);
052    }
053
054    /**
055     * Sets the charset to be used if {@link #feed(ManagedBuffer)}
056     * is invoked with `ManagedBuffer<ByteBuffer>`. Defaults to UTF-8. 
057     * Must be set before the first invocation of 
058     * {@link #feed(ManagedBuffer)}.  
059     *
060     * @param charset the charset
061     * @return the managed buffer streamer
062     */
063    public ManagedBufferStreamer charset(Charset charset) {
064        reader.charset(charset);
065        return this;
066    }
067
068    /**
069     * Feed data to underlying the reader
070     * (see {@link ManagedBufferReader#feed). 
071     * 
072     * @param buffer the buffer
073     * @throws IOException Signals that an I/O exception has occurred.
074     */
075    @SuppressWarnings({ "PMD.PreserveStackTrace" })
076    public <W extends Buffer> void feed(ManagedBuffer<W> buffer) {
077        reader.feed(buffer);
078    }
079
080}