001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.webconsole.base; 020 021import java.io.IOException; 022import java.lang.ref.WeakReference; 023import java.nio.CharBuffer; 024import java.util.Optional; 025import java.util.logging.Logger; 026import org.jdrupes.json.JsonBeanDecoder; 027import org.jdrupes.json.JsonDecodeException; 028import org.jdrupes.json.JsonRpc; 029import org.jdrupes.json.JsonRpc.DefaultJsonRpc; 030import org.jgrapes.core.Components; 031import org.jgrapes.core.EventPipeline; 032import org.jgrapes.io.events.Input; 033import org.jgrapes.io.util.ManagedBuffer; 034import org.jgrapes.io.util.ManagedBufferReader; 035import org.jgrapes.io.util.ThreadCleaner; 036import org.jgrapes.webconsole.base.events.JsonInput; 037 038/** 039 * Assembles {@link Input} events until a complete 040 * JSON message has been collected and then fires 041 * the message as {@link JsonInput} event. 042 */ 043public class WebSocketInputSink extends Thread { 044 045 @SuppressWarnings("PMD.FieldNamingConventions") 046 private static final Logger logger 047 = Logger.getLogger(WebSocketInputSink.class.getName()); 048 049 private final WeakReference<ConsoleConnection> channelRef; 050 private final WeakReference<EventPipeline> pipelineRef; 051 private ManagedBufferReader jsonSource; 052 053 /** 054 * Instantiates a new web socket input reader. 055 * 056 * @param wsInPipeline the ws in pipeline 057 * @param consoleChannel the web console channel 058 */ 059 public WebSocketInputSink(EventPipeline wsInPipeline, 060 ConsoleConnection consoleChannel) { 061 channelRef = new WeakReference<>(consoleChannel); 062 pipelineRef = new WeakReference<>(wsInPipeline); 063 ThreadCleaner.watch(wsInPipeline, this); 064 ThreadCleaner.watch(consoleChannel, this); 065 } 066 067 /** 068 * Forward the data to the JSON decoder. 069 * 070 * @param input the data to be converted 071 * @throws IOException Signals that an I/O exception has occurred. 072 */ 073 public void feed(ManagedBuffer<CharBuffer> input) throws IOException { 074 // Delayed initialization, allows adaption to buffer size. 075 if (jsonSource == null) { 076 jsonSource = new ManagedBufferReader(); 077 ofVirtual().start(this); 078 } 079 jsonSource.feed(input); 080 } 081 082 /** 083 * Forward the close to the JSON decoder. 084 * 085 * @throws IOException Signals that an I/O exception has occurred. 086 */ 087 public void close() throws IOException { 088 if (jsonSource == null) { 089 // Never started 090 return; 091 } 092 jsonSource.close(); 093 try { 094 join(1000); 095 } catch (InterruptedException e) { 096 // Just in case 097 interrupt(); 098 } 099 } 100 101 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 102 "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis", 103 "PMD.GuardLogStatement", "PMD.CognitiveComplexity" }) 104 @Override 105 public void run() { 106 while (true) { 107 JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource); 108 @SuppressWarnings("PMD.UnusedAssignment") 109 JsonRpc rpc = null; 110 try { 111 rpc = jsonDecoder.readObject(DefaultJsonRpc.class); 112 } catch (JsonDecodeException e) { 113 logger.severe( 114 () -> toString() + " cannot decode request from console: " 115 + e.getMessage()); 116 break; 117 } 118 if (rpc == null) { 119 break; 120 } 121 // Fully decoded JSON available. 122 ConsoleConnection connection = channelRef.get(); 123 EventPipeline eventPipeline = pipelineRef.get(); 124 if (eventPipeline == null || connection == null) { 125 break; 126 } 127 // WebConsole connection established, check for special disconnect 128 if ("disconnect".equals(rpc.method()) 129 && connection.consoleConnectionId() 130 .equals(rpc.params().asString(0))) { 131 connection.close(); 132 return; 133 } 134 // Ordinary message from web console (view) to server. 135 connection.refresh(); 136 if ("keepAlive".equals(rpc.method())) { 137 continue; 138 } 139 eventPipeline.fire(new JsonInput(rpc), connection); 140 } 141 } 142 143 @Override 144 public String toString() { 145 StringBuilder res = new StringBuilder() 146 .append(Components.objectName(this)).append(" ["); 147 Optional.ofNullable(channelRef.get()).ifPresentOrElse( 148 c -> res.append(c.toString()), 149 () -> res.append('?')); 150 return res.append(']').toString(); 151 } 152 153}