001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.webconsole.base; 020 021import java.io.IOException; 022import java.lang.ref.ReferenceQueue; 023import java.lang.ref.WeakReference; 024import java.nio.CharBuffer; 025import java.util.Optional; 026import java.util.logging.Logger; 027import org.jdrupes.json.JsonBeanDecoder; 028import org.jdrupes.json.JsonDecodeException; 029import org.jdrupes.json.JsonRpc; 030import org.jdrupes.json.JsonRpc.DefaultJsonRpc; 031import org.jgrapes.core.Components; 032import org.jgrapes.core.EventPipeline; 033import org.jgrapes.io.events.Input; 034import org.jgrapes.io.util.ManagedBuffer; 035import org.jgrapes.io.util.ManagedBufferReader; 036import org.jgrapes.webconsole.base.events.JsonInput; 037 038/** 039 * Assembles {@link Input} events until a complete 040 * JSON message has been collected and then fires 041 * the message as {@link JsonInput} event. 042 */ 043public class WebSocketInputSink extends Thread { 044 045 @SuppressWarnings("PMD.FieldNamingConventions") 046 private static final Logger logger 047 = Logger.getLogger(WebSocketInputSink.class.getName()); 048 049 private final WeakReference<EventPipeline> pipelineRef; 050 private final WeakReference<ConsoleConnection> channelRef; 051 private ManagedBufferReader jsonSource; 052 053 private static ReferenceQueue<Object> abandoned 054 = new ReferenceQueue<>(); 055 056 /** 057 * Weak references to `T` that interrupt the input collecting 058 * thread if the referent has been garbage collected. 059 * 060 * @param <T> the generic type 061 */ 062 private static class RefWithThread<T> extends WeakReference<T> { 063 public Thread watched; 064 065 /** 066 * Creates a new instance. 067 * 068 * @param referent the referent 069 * @param thread the thread 070 */ 071 public RefWithThread(T referent, Thread thread) { 072 super(referent, abandoned); 073 watched = thread; 074 } 075 } 076 077 static { 078 Thread watchdog = new Thread(() -> { 079 while (true) { 080 try { 081 @SuppressWarnings("unchecked") 082 WebSocketInputSink.RefWithThread<Object> ref 083 = (WebSocketInputSink.RefWithThread<Object>) abandoned 084 .remove(); 085 ref.watched.interrupt(); 086 } catch (InterruptedException e) { 087 // Nothing to do 088 } 089 } 090 }); 091 watchdog.setDaemon(true); 092 watchdog.start(); 093 } 094 095 /** 096 * Instantiates a new web socket input reader. 097 * 098 * @param wsInPipeline the ws in pipeline 099 * @param consoleChannel the web console channel 100 */ 101 public WebSocketInputSink(EventPipeline wsInPipeline, 102 ConsoleConnection consoleChannel) { 103 pipelineRef 104 = new WebSocketInputSink.RefWithThread<>(wsInPipeline, this); 105 channelRef 106 = new WebSocketInputSink.RefWithThread<>(consoleChannel, this); 107 setDaemon(true); 108 } 109 110 /** 111 * Forward the data to the JSON decoder. 112 * 113 * @param input the data to be converted 114 * @throws IOException Signals that an I/O exception has occurred. 115 */ 116 public void feed(ManagedBuffer<CharBuffer> input) throws IOException { 117 // Delayed initialization, allows adaption to buffer size. 118 if (jsonSource == null) { 119 jsonSource = new ManagedBufferReader(); 120 start(); 121 } 122 jsonSource.feed(input); 123 } 124 125 /** 126 * Forward the close to the JSON decoder. 127 * 128 * @throws IOException Signals that an I/O exception has occurred. 129 */ 130 public void close() throws IOException { 131 if (jsonSource == null) { 132 // Never started 133 return; 134 } 135 jsonSource.close(); 136 try { 137 join(1000); 138 } catch (InterruptedException e) { 139 // Just in case 140 interrupt(); 141 } 142 } 143 144 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 145 "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis", 146 "PMD.GuardLogStatement", "PMD.CognitiveComplexity" }) 147 @Override 148 public void run() { 149 while (true) { 150 JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource); 151 @SuppressWarnings("PMD.UnusedAssignment") 152 JsonRpc rpc = null; 153 try { 154 rpc = jsonDecoder.readObject(DefaultJsonRpc.class); 155 } catch (JsonDecodeException e) { 156 logger.severe( 157 () -> toString() + " cannot decode request from console: " 158 + e.getMessage()); 159 break; 160 } 161 if (rpc == null) { 162 break; 163 } 164 // Fully decoded JSON available. 165 ConsoleConnection connection = channelRef.get(); 166 EventPipeline eventPipeline = pipelineRef.get(); 167 if (eventPipeline == null || connection == null) { 168 break; 169 } 170 // WebConsole connection established, check for special disconnect 171 if ("disconnect".equals(rpc.method()) 172 && connection.consoleConnectionId() 173 .equals(rpc.params().asString(0))) { 174 connection.close(); 175 return; 176 } 177 // Ordinary message from web console (view) to server. 178 connection.refresh(); 179 if ("keepAlive".equals(rpc.method())) { 180 continue; 181 } 182 eventPipeline.fire(new JsonInput(rpc), connection); 183 } 184 } 185 186 @Override 187 public String toString() { 188 StringBuilder res = new StringBuilder() 189 .append(Components.objectName(this)).append(" ["); 190 Optional.ofNullable(channelRef.get()).ifPresentOrElse( 191 c -> res.append(c.toString()), 192 () -> res.append('?')); 193 return res.append(']').toString(); 194 } 195 196}