001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016, 2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.util.Map; 024import org.jgrapes.core.Event; 025import org.jgrapes.core.EventPipeline; 026import org.jgrapes.io.IOSubchannel; 027import org.jgrapes.io.events.IOError; 028import org.jgrapes.io.events.Input; 029import org.jgrapes.io.events.Output; 030 031/** 032 * Forwards the lines from a {@link BufferedReader} as a sequence of 033 * {@link Output} (or optionally {@link Input}) events. 034 */ 035@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 036public class BufferedReaderPipeline implements Runnable { 037 038 private BufferedReader reader; 039 private IOSubchannel channel; 040 private EventPipeline eventPipeline; 041 private boolean sendClosed = true; 042 private Map<Object, Object> eventAssociations; 043 private boolean sendInputEvents; 044 045 /** 046 * Creates a new pipeline that sends the lines from the given reader 047 * as events on the given channel, using the given event pipeline. 048 * 049 * @param in the reader to read from 050 * @param channel the channel to send to 051 * @param eventPipeline the event pipeline used for firing events 052 */ 053 @SuppressWarnings("PMD.ShortVariable") 054 public BufferedReaderPipeline(BufferedReader in, IOSubchannel channel, 055 EventPipeline eventPipeline) { 056 this.reader = in; 057 this.channel = channel; 058 this.eventPipeline = eventPipeline; 059 } 060 061 /** 062 * Creates a new pipeline that sends the lines from the given reader 063 * as events on the given channel, using the channel's response pipeline. 064 * 065 * @param in the reader to read from 066 * @param channel the channel to send to 067 */ 068 @SuppressWarnings("PMD.ShortVariable") 069 public BufferedReaderPipeline(BufferedReader in, IOSubchannel channel) { 070 this(in, channel, channel.responsePipeline()); 071 } 072 073 /** 074 * Causes the data to be fired as {@link Input} events rather 075 * than the usual {@link Output} events. 076 * 077 * @return the stream for easy chaining 078 */ 079 public BufferedReaderPipeline sendInputEvents() { 080 sendInputEvents = true; 081 return this; 082 } 083 084 /** 085 * Suppresses the sending of a closed event when the stream is closed. 086 * 087 * @return the stream for easy chaining 088 */ 089 public BufferedReaderPipeline suppressClosed() { 090 sendClosed = false; 091 return this; 092 } 093 094 /** 095 * Configure associations that are applied to the generated 096 * Output events, see {@link Event#setAssociated}. 097 * 098 * @param associations the associations to apply 099 * @return the pipeline for easy chaining 100 */ 101 public BufferedReaderPipeline 102 setEventAssociations(Map<Object, Object> associations) { 103 eventAssociations = associations; 104 return this; 105 } 106 107 @Override 108 public void run() { 109 try (var out = new CharBufferWriter(channel, eventPipeline)) { 110 if (sendInputEvents) { 111 out.sendInputEvents(); 112 } 113 if (!sendClosed) { 114 out.suppressClose(); 115 } 116 out.setEventAssociations(eventAssociations); 117 while (true) { 118 String line = reader.readLine(); 119 if (line == null) { 120 break; 121 } 122 out.write(line); 123 out.flush(); 124 } 125 } catch (IOException e) { 126 eventPipeline.fire(associate(new IOError(null, e)), channel); 127 } 128 } 129 130 private Event<?> associate(Event<?> event) { 131 if (eventAssociations != null) { 132 for (var entry : eventAssociations.entrySet()) { 133 event.setAssociated(entry.getKey(), entry.getValue()); 134 } 135 } 136 return event; 137 } 138 139}