001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.io.Writer;
022import java.nio.CharBuffer;
023import java.util.Map;
024import org.jgrapes.core.EventPipeline;
025import org.jgrapes.io.IOSubchannel;
026import org.jgrapes.io.events.Output;
027
028/**
029 * An {@link Writer} that is backed by {@link CharBuffer}s obtained from a
030 * queue. When a byte buffer is full, an {@link Output} event (default) is
031 * generated and a new buffer is fetched from the queue.
032 */
033public class CharBufferWriter extends AbstractBufferWriter<CharBuffer> {
034
035    /**
036     * Creates a new instance that uses {@link Output} events to dispatch
037     * buffers on the given channel, using the given event pipeline.
038     * 
039     * @param channel
040     *            the channel to fire events on
041     * @param eventPipeline
042     *            the event pipeline used for firing events
043     */
044    public CharBufferWriter(IOSubchannel channel, EventPipeline eventPipeline) {
045        super(channel, eventPipeline);
046    }
047
048    /**
049     * Creates a new instance that uses {@link Output} events to dispatch
050     * buffers on the given channel, using the channel's response pipeline.
051     * 
052     * @param channel the channel to fire events on
053     */
054    public CharBufferWriter(IOSubchannel channel) {
055        super(channel);
056    }
057
058    @Override
059    public CharBufferWriter sendInputEvents() {
060        super.sendInputEvents();
061        return this;
062    }
063
064    @Override
065    public CharBufferWriter suppressClose() {
066        super.suppressClose();
067        return this;
068    }
069
070    @Override
071    public CharBufferWriter suppressEndOfRecord() {
072        super.suppressEndOfRecord();
073        return this;
074    }
075
076    @Override
077    public CharBufferWriter
078            setEventAssociations(Map<Object, Object> associations) {
079        super.setEventAssociations(associations);
080        return this;
081    }
082
083    /**
084     * Ensure that a buffer for output data is available.
085     *
086     * @throws InterruptedException the interrupted exception
087     */
088    @Override
089    protected void ensureBufferAvailable() throws InterruptedException {
090        if (buffer != null) {
091            return;
092        }
093        buffer = channel.charBufferPool().acquire();
094    }
095
096    /*
097     * (non-Javadoc)
098     * 
099     * @see java.io.Writer#write(char[], int, int)
100     */
101    @Override
102    public void write(char[] data, int offset, int length) {
103        while (true) {
104            try {
105                ensureBufferAvailable();
106            } catch (InterruptedException e) {
107                Thread.currentThread().interrupt();
108                return;
109            }
110            var buf = buffer.backingBuffer();
111            if (buf.remaining() > length) {
112                buf.put(data, offset, length);
113                break;
114            } else if (buf.remaining() == length) {
115                buf.put(data, offset, length);
116                flush(false);
117                break;
118            } else {
119                int chunkSize = buf.remaining();
120                buf.put(data, offset, chunkSize);
121                flush(false);
122                length -= chunkSize;
123                offset += chunkSize;
124            }
125        }
126    }
127
128    @Override
129    public void write(String str, int offset, int length) {
130        while (true) {
131            try {
132                ensureBufferAvailable();
133            } catch (InterruptedException e) {
134                Thread.currentThread().interrupt();
135                return;
136            }
137            var buf = buffer.backingBuffer();
138            if (buf.remaining() >= length) {
139                str.getChars(offset, offset + length, buf.array(),
140                    buf.position());
141                buf.position(buf.position() + length);
142                if (buf.remaining() == 0) {
143                    flush(false);
144                }
145                break;
146            }
147            int chunkSize = buf.remaining();
148            str.getChars(offset, offset + chunkSize, buf.array(),
149                buf.position());
150            buf.position(buf.position() + chunkSize);
151            flush(false);
152            length -= chunkSize;
153            offset += chunkSize;
154        }
155    }
156
157    @Override
158    @SuppressWarnings("PMD.ShortVariable")
159    public CharBufferWriter append(char ch) {
160        write(ch);
161        return this;
162    }
163
164    @Override
165    public CharBufferWriter append(CharSequence csq) {
166        write(String.valueOf(csq));
167        return this;
168    }
169
170    @Override
171    public CharBufferWriter append(CharSequence csq, int start, int end) {
172        if (csq == null) {
173            csq = "null";
174        }
175        return append(csq.subSequence(start, end));
176    }
177
178}