001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.Writer;
022import java.nio.Buffer;
023import java.util.Map;
024import org.jgrapes.core.Event;
025import org.jgrapes.core.EventPipeline;
026import org.jgrapes.io.IOSubchannel;
027import org.jgrapes.io.events.Close;
028import org.jgrapes.io.events.Input;
029import org.jgrapes.io.events.Output;
030
031/**
032 * A base class for {@link CharBufferWriter} and {@link ByteBufferWriter}.
033 */
034public abstract class AbstractBufferWriter<B extends Buffer> extends Writer {
035
036    protected IOSubchannel channel;
037    private EventPipeline eventPipeline;
038    private boolean sendInputEvents;
039    protected ManagedBuffer<B> buffer;
040    private boolean sendClose = true;
041    private boolean sendEor = true;
042    private boolean eorSent;
043    private boolean isClosed;
044    private Map<Object, Object> eventAssociations;
045
046    /**
047     * Creates a new instance that uses {@link Output} events to dispatch
048     * buffers on the given channel, using the given event pipeline.
049     * 
050     * @param channel
051     *            the channel to fire events on
052     * @param eventPipeline
053     *            the event pipeline used for firing events
054     */
055    public AbstractBufferWriter(IOSubchannel channel,
056            EventPipeline eventPipeline) {
057        this.channel = channel;
058        this.eventPipeline = eventPipeline;
059    }
060
061    /**
062     * Creates a new instance that uses {@link Output} events to dispatch
063     * buffers on the given channel, using the channel's response pipeline.
064     * 
065     * @param channel the channel to fire events on
066     */
067    public AbstractBufferWriter(IOSubchannel channel) {
068        this(channel, channel.responsePipeline());
069    }
070
071    /**
072     * Causes the data to be fired as {@link Input} events rather
073     * than the usual {@link Output} events. 
074     * 
075     * @return this object for easy chaining
076     */
077    protected AbstractBufferWriter<B> sendInputEvents() {
078        sendInputEvents = true;
079        return this;
080    }
081
082    /**
083     * Suppresses sending of a close event when the stream is closed. 
084     * 
085     * @return this object for easy chaining
086     */
087    public AbstractBufferWriter<B> suppressClose() {
088        sendClose = false;
089        return this;
090    }
091
092    /**
093     * Suppresses setting the end of record flag when the stream is 
094     * flushed or closed.
095     * 
096     * @return this object for easy chaining
097     * @see Output#isEndOfRecord()
098     */
099    public AbstractBufferWriter<B> suppressEndOfRecord() {
100        sendEor = false;
101        return this;
102    }
103
104    /**
105     * Configure associations that are applied to the generated
106     * Output events, see {@link Event#setAssociated}.
107     * 
108     * @param associations the associations to apply
109     * @return this object for easy chaining
110     */
111    public AbstractBufferWriter<B>
112            setEventAssociations(Map<Object, Object> associations) {
113        eventAssociations = associations;
114        return this;
115    }
116
117    /**
118     * Ensure that a buffer for output data is available.
119     *
120     * @throws InterruptedException the interrupted exception
121     */
122    protected abstract void ensureBufferAvailable() throws InterruptedException;
123
124    @Override
125    public abstract void write(char[] data, int offset, int length);
126
127    @Override
128    public void write(char[] cbuf) {
129        write(cbuf, 0, cbuf.length);
130    }
131
132    @Override
133    public abstract void write(String str, int offset, int length);
134
135    @Override
136    public void write(String str) {
137        write(str, 0, str.length());
138    }
139
140    @Override
141    @SuppressWarnings("PMD.ShortVariable")
142    public void write(int ch) {
143        char[] buff = { (char) ch };
144        write(buff, 0, 1);
145    }
146
147    /**
148     * Creates and fires an {@link Output} event with the buffer being filled. 
149     * The end of record flag of the event is set according to the parameter.
150     * Frees any allocated buffer.
151     */
152    protected void flush(boolean endOfRecord) {
153        if (buffer == null) {
154            if (!endOfRecord || eorSent) {
155                return;
156            }
157            try {
158                ensureBufferAvailable();
159            } catch (InterruptedException e) {
160                Thread.currentThread().interrupt();
161                return;
162            }
163        }
164        if (buffer.position() == 0 && (!endOfRecord || eorSent)) {
165            // Nothing to flush
166            buffer.unlockBuffer();
167        } else {
168            if (sendInputEvents) {
169                eventPipeline.fire(
170                    associate(Input.fromSink(buffer, endOfRecord)), channel);
171            } else {
172                eventPipeline.fire(
173                    associate(Output.fromSink(buffer, endOfRecord)), channel);
174            }
175            eorSent = endOfRecord;
176        }
177        buffer = null;
178    }
179
180    /**
181     * Creates and fires a {@link Output} event with the buffer being filled
182     * if it contains any data.
183     * 
184     * By default, the {@link Output} event is created with the end of record
185     * flag set (see {@link Output#isEndOfRecord()}) in order to forward the 
186     * flush as event. This implies that an {@link Output} event with no data
187     * (but the end of record flag set) may be fired. This behavior can
188     * be disabled with {@link #suppressEndOfRecord()}.
189     */
190    @Override
191    public void flush() {
192        flush(sendEor);
193    }
194
195    /**
196     * Flushes any remaining data with the end of record flag set
197     * (unless {@link #suppressEndOfRecord()} has been called)
198     * and fires a {@link Close} event (unless {@link #suppressClose()}
199     * has been called).
200     */
201    @Override
202    public void close() {
203        if (isClosed) {
204            return;
205        }
206        flush(sendEor);
207        if (sendClose) {
208            eventPipeline.fire(associate(new Close()), channel);
209        }
210        isClosed = true;
211    }
212
213    private Event<?> associate(Event<?> event) {
214        if (eventAssociations != null) {
215            for (var entry : eventAssociations.entrySet()) {
216                event.setAssociated(entry.getKey(), entry.getValue());
217            }
218        }
219        return event;
220    }
221}