001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.webconsole.base;
020
021import com.fasterxml.jackson.core.JsonFactory;
022import com.fasterxml.jackson.core.JsonParser;
023import com.fasterxml.jackson.databind.ObjectMapper;
024import com.fasterxml.jackson.databind.json.JsonMapper;
025import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
026import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
027import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
028import java.io.IOException;
029import java.lang.ref.WeakReference;
030import java.nio.CharBuffer;
031import java.util.Optional;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034import org.jgrapes.core.Components;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.io.events.Input;
037import org.jgrapes.io.util.ManagedBuffer;
038import org.jgrapes.io.util.ManagedBufferReader;
039import org.jgrapes.io.util.ThreadCleaner;
040import org.jgrapes.webconsole.base.events.JsonInput;
041
042/**
043 * Assembles {@link Input} events until a complete
044 * JSON message has been collected and then fires
045 * the message as {@link JsonInput} event.
046 */
047public class WebSocketInputSink extends Thread {
048
049    @SuppressWarnings("PMD.FieldNamingConventions")
050    private static final Logger logger
051        = Logger.getLogger(WebSocketInputSink.class.getName());
052    @SuppressWarnings("PMD.FieldNamingConventions")
053    protected static final ObjectMapper mapper = JsonMapper.builder()
054        .addModule(new ParameterNamesModule()).addModule(new Jdk8Module())
055        .addModule(new JavaTimeModule()).build();
056
057    private final WeakReference<ConsoleConnection> channelRef;
058    private final WeakReference<EventPipeline> pipelineRef;
059    private ManagedBufferReader jsonSource;
060
061    /**
062     * Instantiates a new web socket input reader.
063     *
064     * @param wsInPipeline the ws in pipeline
065     * @param consoleChannel the web console channel
066     */
067    public WebSocketInputSink(EventPipeline wsInPipeline,
068            ConsoleConnection consoleChannel) {
069        channelRef = new WeakReference<>(consoleChannel);
070        pipelineRef = new WeakReference<>(wsInPipeline);
071        ThreadCleaner.watch(wsInPipeline, this);
072        ThreadCleaner.watch(consoleChannel, this);
073    }
074
075    /**
076     * Forward the data to the JSON decoder.
077     *
078     * @param input the data to be converted
079     * @throws IOException Signals that an I/O exception has occurred.
080     */
081    public void feed(ManagedBuffer<CharBuffer> input) throws IOException {
082        // Delayed initialization, allows adaption to buffer size.
083        if (jsonSource == null) {
084            jsonSource = new ManagedBufferReader();
085            (Components.useVirtualThreads() ? ofVirtual()
086                : ofPlatform()).start(this);
087        }
088        jsonSource.feed(input);
089    }
090
091    /**
092     * Forward the close to the JSON decoder.
093     *
094     * @throws IOException Signals that an I/O exception has occurred.
095     */
096    public void close() throws IOException {
097        if (jsonSource == null) {
098            // Never started
099            return;
100        }
101        jsonSource.close();
102        try {
103            join(1000);
104        } catch (InterruptedException e) {
105            // Just in case
106            interrupt();
107        }
108    }
109
110    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
111        "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis",
112        "PMD.GuardLogStatement", "PMD.CognitiveComplexity" })
113    @Override
114    public void run() {
115        @SuppressWarnings("PMD.CloseResource")
116        JsonParser parser;
117        try {
118            parser = new JsonFactory().createParser(jsonSource);
119        } catch (IOException e) {
120            logger.severe(() -> toString() + " cannot create JSON parser: "
121                + e.getMessage());
122            return;
123        }
124        while (true) {
125            JsonRpc rpc;
126            try {
127                if (parser.nextToken() == null) {
128                    break;
129                }
130                rpc = mapper.readValue(parser, WcJsonRpc.class);
131            } catch (IOException e) {
132                logger.log(Level.SEVERE, e, () -> toString()
133                    + " cannot read JSON: " + e.getMessage());
134                break;
135            }
136            // Fully decoded JSON available.
137            ConsoleConnection connection = channelRef.get();
138            EventPipeline eventPipeline = pipelineRef.get();
139            if (eventPipeline == null || connection == null) {
140                break;
141            }
142            // WebConsole connection established, check for special disconnect
143            if ("disconnect".equals(rpc.method())
144                && connection.consoleConnectionId().equals(rpc.param(0))) {
145                connection.close();
146                return;
147            }
148            // Ordinary message from web console (view) to server.
149            connection.refresh();
150            if ("keepAlive".equals(rpc.method())) {
151                continue;
152            }
153            eventPipeline.fire(new JsonInput(rpc), connection);
154        }
155    }
156
157    @Override
158    public String toString() {
159        StringBuilder res = new StringBuilder()
160            .append(Components.objectName(this)).append(" [");
161        Optional.ofNullable(channelRef.get()).ifPresentOrElse(
162            c -> res.append(c.toString()),
163            () -> res.append('?'));
164        return res.append(']').toString();
165    }
166
167}