001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.webconsole.base; 020 021import java.io.IOException; 022import java.io.PipedReader; 023import java.io.PipedWriter; 024import java.io.Reader; 025import java.lang.ref.ReferenceQueue; 026import java.lang.ref.WeakReference; 027import java.nio.CharBuffer; 028import org.jdrupes.json.JsonBeanDecoder; 029import org.jdrupes.json.JsonDecodeException; 030import org.jdrupes.json.JsonRpc; 031import org.jdrupes.json.JsonRpc.DefaultJsonRpc; 032import org.jgrapes.core.EventPipeline; 033import org.jgrapes.io.events.Input; 034import org.jgrapes.webconsole.base.events.JsonInput; 035 036/** 037 * Assembles {@link Input} events until a complete 038 * JSON message has been collected and then fires 039 * the message as {@link JsonInput} event. 040 */ 041public class WebSocketInputReader extends Thread { 042 043 private final WeakReference<EventPipeline> pipelineRef; 044 private final WeakReference<ConsoleConnection> channelRef; 045 private PipedWriter decodeIn; 046 private Reader jsonSource; 047 048 private static ReferenceQueue<Object> abandoned 049 = new ReferenceQueue<>(); 050 051 /** 052 * Weak references to `T` that interrupt the input collecting 053 * thread if the referent has been garbage collected. 054 * 055 * @param <T> the generic type 056 */ 057 private static class RefWithThread<T> extends WeakReference<T> { 058 public Thread watched; 059 060 /** 061 * Creates a new instance. 062 * 063 * @param referent the referent 064 * @param thread the thread 065 */ 066 public RefWithThread(T referent, Thread thread) { 067 super(referent, abandoned); 068 watched = thread; 069 } 070 } 071 072 static { 073 Thread watchdog = new Thread(() -> { 074 while (true) { 075 try { 076 @SuppressWarnings("unchecked") 077 WebSocketInputReader.RefWithThread<Object> ref 078 = (WebSocketInputReader.RefWithThread<Object>) abandoned 079 .remove(); 080 ref.watched.interrupt(); 081 } catch (InterruptedException e) { 082 // Nothing to do 083 } 084 } 085 }); 086 watchdog.setDaemon(true); 087 watchdog.start(); 088 } 089 090 /** 091 * Instantiates a new web socket input reader. 092 * 093 * @param wsInPipeline the ws in pipeline 094 * @param consoleChannel the web console channel 095 */ 096 public WebSocketInputReader(EventPipeline wsInPipeline, 097 ConsoleConnection consoleChannel) { 098 pipelineRef 099 = new WebSocketInputReader.RefWithThread<>(wsInPipeline, this); 100 channelRef 101 = new WebSocketInputReader.RefWithThread<>(consoleChannel, this); 102 setDaemon(true); 103 } 104 105 /** 106 * Forward the data to the JSON decoder. 107 * 108 * @param buffer the buffer 109 * @throws IOException Signals that an I/O exception has occurred. 110 */ 111 public void write(CharBuffer buffer) throws IOException { 112 // Delayed initialization, allows adaption to buffer size. 113 if (decodeIn == null) { 114 decodeIn = new PipedWriter(); 115 jsonSource = new PipedReader(decodeIn, buffer.capacity()); 116 start(); 117 } 118 decodeIn.append(buffer); 119 decodeIn.flush(); 120 } 121 122 /** 123 * Forward the close to the JSON decoder. 124 * 125 * @throws IOException Signals that an I/O exception has occurred. 126 */ 127 public void close() throws IOException { 128 if (decodeIn == null) { 129 // Never started 130 return; 131 } 132 decodeIn.close(); 133 try { 134 join(1000); 135 } catch (InterruptedException e) { 136 // Just in case 137 interrupt(); 138 } 139 } 140 141 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 142 "PMD.AvoidInstantiatingObjectsInLoops", 143 "PMD.DataflowAnomalyAnalysis" }) 144 @Override 145 public void run() { 146 while (true) { 147 JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource); 148 @SuppressWarnings("PMD.UnusedAssignment") 149 JsonRpc rpc = null; 150 try { 151 rpc = jsonDecoder.readObject(DefaultJsonRpc.class); 152 } catch (JsonDecodeException e) { 153 break; 154 } 155 if (rpc == null) { 156 break; 157 } 158 // Fully decoded JSON available. 159 ConsoleConnection connection = channelRef.get(); 160 EventPipeline eventPipeline = pipelineRef.get(); 161 if (eventPipeline == null || connection == null) { 162 break; 163 } 164 // WebConsole connection established, check for special disconnect 165 if ("disconnect".equals(rpc.method()) 166 && connection.consoleConnectionId() 167 .equals(rpc.params().asString(0))) { 168 connection.close(); 169 return; 170 } 171 // Ordinary message from web console (view) to server. 172 connection.refresh(); 173 if ("keepAlive".equals(rpc.method())) { 174 continue; 175 } 176 eventPipeline.fire(new JsonInput(rpc), connection); 177 } 178 } 179}