001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.webconsole.base;
020
021import java.io.IOException;
022import java.io.PipedReader;
023import java.io.PipedWriter;
024import java.io.Reader;
025import java.lang.ref.ReferenceQueue;
026import java.lang.ref.WeakReference;
027import java.nio.CharBuffer;
028import org.jdrupes.json.JsonBeanDecoder;
029import org.jdrupes.json.JsonDecodeException;
030import org.jdrupes.json.JsonRpc;
031import org.jdrupes.json.JsonRpc.DefaultJsonRpc;
032import org.jgrapes.core.EventPipeline;
033import org.jgrapes.io.events.Input;
034import org.jgrapes.webconsole.base.events.JsonInput;
035
036/**
037 * Assembles {@link Input} events until a complete
038 * JSON message has been collected and then fires
039 * the message as {@link JsonInput} event.
040 */
041public class WebSocketInputReader extends Thread {
042
043    private final WeakReference<EventPipeline> pipelineRef;
044    private final WeakReference<ConsoleConnection> channelRef;
045    private PipedWriter decodeIn;
046    private Reader jsonSource;
047
048    private static ReferenceQueue<Object> abandoned
049        = new ReferenceQueue<>();
050
051    /**
052     * Weak references to `T` that interrupt the input collecting
053     * thread if the referent has been garbage collected.
054     *
055     * @param <T> the generic type
056     */
057    private static class RefWithThread<T> extends WeakReference<T> {
058        public Thread watched;
059
060        /**
061         * Creates a new instance.
062         *
063         * @param referent the referent
064         * @param thread the thread
065         */
066        public RefWithThread(T referent, Thread thread) {
067            super(referent, abandoned);
068            watched = thread;
069        }
070    }
071
072    static {
073        Thread watchdog = new Thread(() -> {
074            while (true) {
075                try {
076                    @SuppressWarnings("unchecked")
077                    WebSocketInputReader.RefWithThread<Object> ref
078                        = (WebSocketInputReader.RefWithThread<Object>) abandoned
079                            .remove();
080                    ref.watched.interrupt();
081                } catch (InterruptedException e) {
082                    // Nothing to do
083                }
084            }
085        });
086        watchdog.setDaemon(true);
087        watchdog.start();
088    }
089
090    /**
091     * Instantiates a new web socket input reader.
092     *
093     * @param wsInPipeline the ws in pipeline
094     * @param consoleChannel the web console channel
095     */
096    public WebSocketInputReader(EventPipeline wsInPipeline,
097            ConsoleConnection consoleChannel) {
098        pipelineRef
099            = new WebSocketInputReader.RefWithThread<>(wsInPipeline, this);
100        channelRef
101            = new WebSocketInputReader.RefWithThread<>(consoleChannel, this);
102        setDaemon(true);
103    }
104
105    /**
106     * Forward the data to the JSON decoder.
107     *
108     * @param buffer the buffer
109     * @throws IOException Signals that an I/O exception has occurred.
110     */
111    public void write(CharBuffer buffer) throws IOException {
112        // Delayed initialization, allows adaption to buffer size.
113        if (decodeIn == null) {
114            decodeIn = new PipedWriter();
115            jsonSource = new PipedReader(decodeIn, buffer.capacity());
116            start();
117        }
118        decodeIn.append(buffer);
119        decodeIn.flush();
120    }
121
122    /**
123     * Forward the close to the JSON decoder.
124     *
125     * @throws IOException Signals that an I/O exception has occurred.
126     */
127    public void close() throws IOException {
128        if (decodeIn == null) {
129            // Never started
130            return;
131        }
132        decodeIn.close();
133        try {
134            join(1000);
135        } catch (InterruptedException e) {
136            // Just in case
137            interrupt();
138        }
139    }
140
141    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
142        "PMD.AvoidInstantiatingObjectsInLoops",
143        "PMD.DataflowAnomalyAnalysis" })
144    @Override
145    public void run() {
146        while (true) {
147            JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource);
148            @SuppressWarnings("PMD.UnusedAssignment")
149            JsonRpc rpc = null;
150            try {
151                rpc = jsonDecoder.readObject(DefaultJsonRpc.class);
152            } catch (JsonDecodeException e) {
153                break;
154            }
155            if (rpc == null) {
156                break;
157            }
158            // Fully decoded JSON available.
159            ConsoleConnection connection = channelRef.get();
160            EventPipeline eventPipeline = pipelineRef.get();
161            if (eventPipeline == null || connection == null) {
162                break;
163            }
164            // WebConsole connection established, check for special disconnect
165            if ("disconnect".equals(rpc.method())
166                && connection.consoleConnectionId()
167                    .equals(rpc.params().asString(0))) {
168                connection.close();
169                return;
170            }
171            // Ordinary message from web console (view) to server.
172            connection.refresh();
173            if ("keepAlive".equals(rpc.method())) {
174                continue;
175            }
176            eventPipeline.fire(new JsonInput(rpc), connection);
177        }
178    }
179}