001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.webconsole.base;
020
021import java.io.IOException;
022import java.lang.ref.ReferenceQueue;
023import java.lang.ref.WeakReference;
024import java.nio.CharBuffer;
025import java.util.Optional;
026import java.util.logging.Logger;
027import org.jdrupes.json.JsonBeanDecoder;
028import org.jdrupes.json.JsonDecodeException;
029import org.jdrupes.json.JsonRpc;
030import org.jdrupes.json.JsonRpc.DefaultJsonRpc;
031import org.jgrapes.core.Components;
032import org.jgrapes.core.EventPipeline;
033import org.jgrapes.io.events.Input;
034import org.jgrapes.io.util.ManagedBuffer;
035import org.jgrapes.io.util.ManagedBufferReader;
036import org.jgrapes.webconsole.base.events.JsonInput;
037
038/**
039 * Assembles {@link Input} events until a complete
040 * JSON message has been collected and then fires
041 * the message as {@link JsonInput} event.
042 */
043public class WebSocketInputSink extends Thread {
044
045    @SuppressWarnings("PMD.FieldNamingConventions")
046    private static final Logger logger
047        = Logger.getLogger(WebSocketInputSink.class.getName());
048
049    private final WeakReference<EventPipeline> pipelineRef;
050    private final WeakReference<ConsoleConnection> channelRef;
051    private ManagedBufferReader jsonSource;
052
053    private static ReferenceQueue<Object> abandoned
054        = new ReferenceQueue<>();
055
056    /**
057     * Weak references to `T` that interrupt the input collecting
058     * thread if the referent has been garbage collected.
059     *
060     * @param <T> the generic type
061     */
062    private static class RefWithThread<T> extends WeakReference<T> {
063        public Thread watched;
064
065        /**
066         * Creates a new instance.
067         *
068         * @param referent the referent
069         * @param thread the thread
070         */
071        public RefWithThread(T referent, Thread thread) {
072            super(referent, abandoned);
073            watched = thread;
074        }
075    }
076
077    static {
078        Thread watchdog = new Thread(() -> {
079            while (true) {
080                try {
081                    @SuppressWarnings("unchecked")
082                    WebSocketInputSink.RefWithThread<Object> ref
083                        = (WebSocketInputSink.RefWithThread<Object>) abandoned
084                            .remove();
085                    ref.watched.interrupt();
086                } catch (InterruptedException e) {
087                    // Nothing to do
088                }
089            }
090        });
091        watchdog.setDaemon(true);
092        watchdog.start();
093    }
094
095    /**
096     * Instantiates a new web socket input reader.
097     *
098     * @param wsInPipeline the ws in pipeline
099     * @param consoleChannel the web console channel
100     */
101    public WebSocketInputSink(EventPipeline wsInPipeline,
102            ConsoleConnection consoleChannel) {
103        pipelineRef
104            = new WebSocketInputSink.RefWithThread<>(wsInPipeline, this);
105        channelRef
106            = new WebSocketInputSink.RefWithThread<>(consoleChannel, this);
107        setDaemon(true);
108    }
109
110    /**
111     * Forward the data to the JSON decoder.
112     *
113     * @param input the data to be converted
114     * @throws IOException Signals that an I/O exception has occurred.
115     */
116    public void feed(ManagedBuffer<CharBuffer> input) throws IOException {
117        // Delayed initialization, allows adaption to buffer size.
118        if (jsonSource == null) {
119            jsonSource = new ManagedBufferReader();
120            start();
121        }
122        jsonSource.feed(input);
123    }
124
125    /**
126     * Forward the close to the JSON decoder.
127     *
128     * @throws IOException Signals that an I/O exception has occurred.
129     */
130    public void close() throws IOException {
131        if (jsonSource == null) {
132            // Never started
133            return;
134        }
135        jsonSource.close();
136        try {
137            join(1000);
138        } catch (InterruptedException e) {
139            // Just in case
140            interrupt();
141        }
142    }
143
144    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
145        "PMD.AvoidInstantiatingObjectsInLoops", "PMD.DataflowAnomalyAnalysis",
146        "PMD.GuardLogStatement", "PMD.CognitiveComplexity" })
147    @Override
148    public void run() {
149        while (true) {
150            JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource);
151            @SuppressWarnings("PMD.UnusedAssignment")
152            JsonRpc rpc = null;
153            try {
154                rpc = jsonDecoder.readObject(DefaultJsonRpc.class);
155            } catch (JsonDecodeException e) {
156                logger.severe(
157                    () -> toString() + " cannot decode request from console: "
158                        + e.getMessage());
159                break;
160            }
161            if (rpc == null) {
162                break;
163            }
164            // Fully decoded JSON available.
165            ConsoleConnection connection = channelRef.get();
166            EventPipeline eventPipeline = pipelineRef.get();
167            if (eventPipeline == null || connection == null) {
168                break;
169            }
170            // WebConsole connection established, check for special disconnect
171            if ("disconnect".equals(rpc.method())
172                && connection.consoleConnectionId()
173                    .equals(rpc.params().asString(0))) {
174                connection.close();
175                return;
176            }
177            // Ordinary message from web console (view) to server.
178            connection.refresh();
179            if ("keepAlive".equals(rpc.method())) {
180                continue;
181            }
182            eventPipeline.fire(new JsonInput(rpc), connection);
183        }
184    }
185
186    @Override
187    public String toString() {
188        StringBuilder res = new StringBuilder()
189            .append(Components.objectName(this)).append(" [");
190        Optional.ofNullable(channelRef.get()).ifPresentOrElse(
191            c -> res.append(c.toString()),
192            () -> res.append('?'));
193        return res.append(']').toString();
194    }
195
196}