001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.webconsole.base;
020
021import java.io.IOException;
022import java.lang.ref.WeakReference;
023import java.nio.CharBuffer;
024import java.util.Optional;
025import java.util.logging.Logger;
026import org.jdrupes.json.JsonBeanDecoder;
027import org.jdrupes.json.JsonDecodeException;
028import org.jdrupes.json.JsonRpc;
029import org.jdrupes.json.JsonRpc.DefaultJsonRpc;
030import org.jgrapes.core.Components;
031import org.jgrapes.core.EventPipeline;
032import org.jgrapes.io.events.Input;
033import org.jgrapes.io.util.ManagedBuffer;
034import org.jgrapes.io.util.ManagedBufferReader;
035import org.jgrapes.io.util.ThreadCleaner;
036import org.jgrapes.webconsole.base.events.JsonInput;
037
038/**
039 * Assembles {@link Input} events until a complete
040 * JSON message has been collected and then fires
041 * the message as {@link JsonInput} event.
042 */
043public class WebSocketInputSink extends Thread {
044
045    @SuppressWarnings("PMD.FieldNamingConventions")
046    private static final Logger logger
047        = Logger.getLogger(WebSocketInputSink.class.getName());
048
049    private final WeakReference<ConsoleConnection> channelRef;
050    private final WeakReference<EventPipeline> pipelineRef;
051    private ManagedBufferReader jsonSource;
052
053    /**
054     * Instantiates a new web socket input reader.
055     *
056     * @param wsInPipeline the ws in pipeline
057     * @param consoleChannel the web console channel
058     */
059    public WebSocketInputSink(EventPipeline wsInPipeline,
060            ConsoleConnection consoleChannel) {
061        channelRef = new WeakReference<>(consoleChannel);
062        pipelineRef = new WeakReference<>(wsInPipeline);
063        ThreadCleaner.watch(wsInPipeline, this);
064        ThreadCleaner.watch(consoleChannel, this);
065    }
066
067    /**
068     * Forward the data to the JSON decoder.
069     *
070     * @param input the data to be converted
071     * @throws IOException Signals that an I/O exception has occurred.
072     */
073    public void feed(ManagedBuffer<CharBuffer> input) throws IOException {
074        // Delayed initialization, allows adaption to buffer size.
075        if (jsonSource == null) {
076            jsonSource = new ManagedBufferReader();
077            ofVirtual().start(this);
078        }
079        jsonSource.feed(input);
080    }
081
082    /**
083     * Forward the close to the JSON decoder.
084     *
085     * @throws IOException Signals that an I/O exception has occurred.
086     */
087    public void close() throws IOException {
088        if (jsonSource == null) {
089            // Never started
090            return;
091        }
092        jsonSource.close();
093        try {
094            join(1000);
095        } catch (InterruptedException e) {
096            // Just in case
097            interrupt();
098        }
099    }
100
101    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
102        "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis",
103        "PMD.GuardLogStatement", "PMD.CognitiveComplexity" })
104    @Override
105    public void run() {
106        while (true) {
107            JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource);
108            @SuppressWarnings("PMD.UnusedAssignment")
109            JsonRpc rpc = null;
110            try {
111                rpc = jsonDecoder.readObject(DefaultJsonRpc.class);
112            } catch (JsonDecodeException e) {
113                logger.severe(
114                    () -> toString() + " cannot decode request from console: "
115                        + e.getMessage());
116                break;
117            }
118            if (rpc == null) {
119                break;
120            }
121            // Fully decoded JSON available.
122            ConsoleConnection connection = channelRef.get();
123            EventPipeline eventPipeline = pipelineRef.get();
124            if (eventPipeline == null || connection == null) {
125                break;
126            }
127            // WebConsole connection established, check for special disconnect
128            if ("disconnect".equals(rpc.method())
129                && connection.consoleConnectionId()
130                    .equals(rpc.params().asString(0))) {
131                connection.close();
132                return;
133            }
134            // Ordinary message from web console (view) to server.
135            connection.refresh();
136            if ("keepAlive".equals(rpc.method())) {
137                continue;
138            }
139            eventPipeline.fire(new JsonInput(rpc), connection);
140        }
141    }
142
143    @Override
144    public String toString() {
145        StringBuilder res = new StringBuilder()
146            .append(Components.objectName(this)).append(" [");
147        Optional.ofNullable(channelRef.get()).ifPresentOrElse(
148            c -> res.append(c.toString()),
149            () -> res.append('?'));
150        return res.append(']').toString();
151    }
152
153}