001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.webconsole.base; 020 021import com.fasterxml.jackson.core.JsonFactory; 022import com.fasterxml.jackson.core.JsonParser; 023import com.fasterxml.jackson.databind.ObjectMapper; 024import com.fasterxml.jackson.databind.json.JsonMapper; 025import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; 026import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; 027import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; 028import java.io.IOException; 029import java.lang.ref.WeakReference; 030import java.nio.CharBuffer; 031import java.util.Optional; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034import org.jgrapes.core.Components; 035import org.jgrapes.core.EventPipeline; 036import org.jgrapes.io.events.Input; 037import org.jgrapes.io.util.ManagedBuffer; 038import org.jgrapes.io.util.ManagedBufferReader; 039import org.jgrapes.io.util.ThreadCleaner; 040import org.jgrapes.webconsole.base.events.JsonInput; 041 042/** 043 * Assembles {@link Input} events until a complete 044 * JSON message has been collected and then fires 045 * the message as {@link JsonInput} event. 046 */ 047public class WebSocketInputSink extends Thread { 048 049 @SuppressWarnings("PMD.FieldNamingConventions") 050 private static final Logger logger 051 = Logger.getLogger(WebSocketInputSink.class.getName()); 052 @SuppressWarnings("PMD.FieldNamingConventions") 053 protected static final ObjectMapper mapper = JsonMapper.builder() 054 .addModule(new ParameterNamesModule()).addModule(new Jdk8Module()) 055 .addModule(new JavaTimeModule()).build(); 056 057 private final WeakReference<ConsoleConnection> channelRef; 058 private final WeakReference<EventPipeline> pipelineRef; 059 private ManagedBufferReader jsonSource; 060 061 /** 062 * Instantiates a new web socket input reader. 063 * 064 * @param wsInPipeline the ws in pipeline 065 * @param consoleChannel the web console channel 066 */ 067 public WebSocketInputSink(EventPipeline wsInPipeline, 068 ConsoleConnection consoleChannel) { 069 channelRef = new WeakReference<>(consoleChannel); 070 pipelineRef = new WeakReference<>(wsInPipeline); 071 ThreadCleaner.watch(wsInPipeline, this); 072 ThreadCleaner.watch(consoleChannel, this); 073 } 074 075 /** 076 * Forward the data to the JSON decoder. 077 * 078 * @param input the data to be converted 079 * @throws IOException Signals that an I/O exception has occurred. 080 */ 081 public void feed(ManagedBuffer<CharBuffer> input) throws IOException { 082 // Delayed initialization, allows adaption to buffer size. 083 if (jsonSource == null) { 084 jsonSource = new ManagedBufferReader(); 085 (Components.useVirtualThreads() ? ofVirtual() 086 : ofPlatform()).start(this); 087 } 088 jsonSource.feed(input); 089 } 090 091 /** 092 * Forward the close to the JSON decoder. 093 * 094 * @throws IOException Signals that an I/O exception has occurred. 095 */ 096 public void close() throws IOException { 097 if (jsonSource == null) { 098 // Never started 099 return; 100 } 101 jsonSource.close(); 102 try { 103 join(1000); 104 } catch (InterruptedException e) { 105 // Just in case 106 interrupt(); 107 } 108 } 109 110 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 111 "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis", 112 "PMD.GuardLogStatement", "PMD.CognitiveComplexity" }) 113 @Override 114 public void run() { 115 @SuppressWarnings("PMD.CloseResource") 116 JsonParser parser; 117 try { 118 parser = new JsonFactory().createParser(jsonSource); 119 } catch (IOException e) { 120 logger.severe(() -> toString() + " cannot create JSON parser: " 121 + e.getMessage()); 122 return; 123 } 124 while (true) { 125 JsonRpc rpc; 126 try { 127 if (parser.nextToken() == null) { 128 break; 129 } 130 rpc = mapper.readValue(parser, WcJsonRpc.class); 131 } catch (IOException e) { 132 logger.log(Level.SEVERE, e, () -> toString() 133 + " cannot read JSON: " + e.getMessage()); 134 break; 135 } 136 // Fully decoded JSON available. 137 ConsoleConnection connection = channelRef.get(); 138 EventPipeline eventPipeline = pipelineRef.get(); 139 if (eventPipeline == null || connection == null) { 140 break; 141 } 142 // WebConsole connection established, check for special disconnect 143 if ("disconnect".equals(rpc.method()) 144 && connection.consoleConnectionId().equals(rpc.param(0))) { 145 connection.close(); 146 return; 147 } 148 // Ordinary message from web console (view) to server. 149 connection.refresh(); 150 if ("keepAlive".equals(rpc.method())) { 151 continue; 152 } 153 eventPipeline.fire(new JsonInput(rpc), connection); 154 } 155 } 156 157 @Override 158 public String toString() { 159 StringBuilder res = new StringBuilder() 160 .append(Components.objectName(this)).append(" ["); 161 Optional.ofNullable(channelRef.get()).ifPresentOrElse( 162 c -> res.append(c.toString()), 163 () -> res.append('?')); 164 return res.append(']').toString(); 165 } 166 167}