001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016, 2018  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.util.Map;
024import org.jgrapes.core.Event;
025import org.jgrapes.core.EventPipeline;
026import org.jgrapes.io.IOSubchannel;
027import org.jgrapes.io.events.IOError;
028import org.jgrapes.io.events.Input;
029import org.jgrapes.io.events.Output;
030
031/**
032 * Forwards the lines from a {@link BufferedReader} as a sequence of 
033 * {@link Output} (or optionally {@link Input}) events.
034 */
035@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
036public class BufferedReaderPipeline implements Runnable {
037
038    private BufferedReader reader;
039    private IOSubchannel channel;
040    private EventPipeline eventPipeline;
041    private boolean sendClosed = true;
042    private Map<Object, Object> eventAssociations;
043    private boolean sendInputEvents;
044
045    /**
046     * Creates a new pipeline that sends the lines from the given reader
047     * as events on the given channel, using the given event pipeline.
048     * 
049     * @param in the reader to read from
050     * @param channel the channel to send to
051     * @param eventPipeline the event pipeline used for firing events
052     */
053    @SuppressWarnings("PMD.ShortVariable")
054    public BufferedReaderPipeline(BufferedReader in, IOSubchannel channel,
055            EventPipeline eventPipeline) {
056        this.reader = in;
057        this.channel = channel;
058        this.eventPipeline = eventPipeline;
059    }
060
061    /**
062     * Creates a new pipeline that sends the lines from the given reader
063     * as events on the given channel, using the channel's response pipeline.
064     * 
065     * @param in the reader to read from
066     * @param channel the channel to send to
067     */
068    @SuppressWarnings("PMD.ShortVariable")
069    public BufferedReaderPipeline(BufferedReader in, IOSubchannel channel) {
070        this(in, channel, channel.responsePipeline());
071    }
072
073    /**
074     * Causes the data to be fired as {@link Input} events rather
075     * than the usual {@link Output} events. 
076     * 
077     * @return the stream for easy chaining
078     */
079    public BufferedReaderPipeline sendInputEvents() {
080        sendInputEvents = true;
081        return this;
082    }
083
084    /**
085     * Suppresses the sending of a closed event when the stream is closed. 
086     * 
087     * @return the stream for easy chaining
088     */
089    public BufferedReaderPipeline suppressClosed() {
090        sendClosed = false;
091        return this;
092    }
093
094    /**
095     * Configure associations that are applied to the generated
096     * Output events, see {@link Event#setAssociated}.
097     * 
098     * @param associations the associations to apply
099     * @return the pipeline for easy chaining
100     */
101    public BufferedReaderPipeline
102            setEventAssociations(Map<Object, Object> associations) {
103        eventAssociations = associations;
104        return this;
105    }
106
107    @Override
108    public void run() {
109        try (var out = new CharBufferWriter(channel, eventPipeline)) {
110            if (sendInputEvents) {
111                out.sendInputEvents();
112            }
113            if (!sendClosed) {
114                out.suppressClose();
115            }
116            out.setEventAssociations(eventAssociations);
117            while (true) {
118                String line = reader.readLine();
119                if (line == null) {
120                    break;
121                }
122                out.write(line);
123                out.flush();
124            }
125        } catch (IOException e) {
126            eventPipeline.fire(associate(new IOError(null, e)), channel);
127        }
128    }
129
130    private Event<?> associate(Event<?> event) {
131        if (eventAssociations != null) {
132            for (var entry : eventAssociations.entrySet()) {
133                event.setAssociated(entry.getKey(), entry.getValue());
134            }
135        }
136        return event;
137    }
138
139}