001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2017-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.portal.base;
020
021import java.io.IOException;
022import java.io.PipedReader;
023import java.io.PipedWriter;
024import java.io.Reader;
025import java.lang.ref.ReferenceQueue;
026import java.lang.ref.WeakReference;
027import java.nio.CharBuffer;
028
029import org.jdrupes.json.JsonBeanDecoder;
030import org.jdrupes.json.JsonDecodeException;
031import org.jdrupes.json.JsonRpc;
032import org.jdrupes.json.JsonRpc.DefaultJsonRpc;
033import org.jgrapes.core.EventPipeline;
034import org.jgrapes.io.events.Input;
035import org.jgrapes.portal.base.events.JsonInput;
036
037/**
038 * Assembles {@link Input} events until a complete
039 * JSON message has been collected and then fires
040 * the message as {@link JsonInput} event.
041 */
042public class WebSocketInputReader extends Thread {
043
044    private final WeakReference<EventPipeline> pipelineRef;
045    private final WeakReference<PortalSession> channelRef;
046    private PipedWriter decodeIn;
047    private Reader jsonSource;
048
049    private static ReferenceQueue<Object> abandoned
050        = new ReferenceQueue<>();
051
052    /**
053     * Weak references to `T` that interrupt the input collecting
054     * thread if the referent has been garbage collected.
055     *
056     * @param <T> the generic type
057     */
058    private static class RefWithThread<T> extends WeakReference<T> {
059        public Thread watched;
060
061        /**
062         * Creates a new instance.
063         *
064         * @param referent the referent
065         * @param thread the thread
066         */
067        public RefWithThread(T referent, Thread thread) {
068            super(referent, abandoned);
069            watched = thread;
070        }
071    }
072
073    static {
074        Thread watchdog = new Thread(() -> {
075            while (true) {
076                try {
077                    @SuppressWarnings("unchecked")
078                    WebSocketInputReader.RefWithThread<Object> ref
079                        = (WebSocketInputReader.RefWithThread<Object>) abandoned
080                            .remove();
081                    ref.watched.interrupt();
082                } catch (InterruptedException e) {
083                    // Nothing to do
084                }
085            }
086        });
087        watchdog.setDaemon(true);
088        watchdog.start();
089    }
090
091    /**
092     * Instantiates a new web socket input reader.
093     *
094     * @param wsInPipeline the ws in pipeline
095     * @param portalChannel the portal channel
096     */
097    public WebSocketInputReader(EventPipeline wsInPipeline,
098            PortalSession portalChannel) {
099        pipelineRef
100            = new WebSocketInputReader.RefWithThread<>(wsInPipeline, this);
101        channelRef
102            = new WebSocketInputReader.RefWithThread<>(portalChannel, this);
103        setDaemon(true);
104    }
105
106    /**
107     * Forward the data to the JSON decoder.
108     *
109     * @param buffer the buffer
110     * @throws IOException Signals that an I/O exception has occurred.
111     */
112    public void write(CharBuffer buffer) throws IOException {
113        // Delayed initialization, allows adaption to buffer size.
114        if (decodeIn == null) {
115            decodeIn = new PipedWriter();
116            jsonSource = new PipedReader(decodeIn, buffer.capacity());
117            start();
118        }
119        decodeIn.append(buffer);
120        decodeIn.flush();
121    }
122
123    /**
124     * Forward the close to the JSON decoder.
125     *
126     * @throws IOException Signals that an I/O exception has occurred.
127     */
128    public void close() throws IOException {
129        if (decodeIn == null) {
130            // Never started
131            return;
132        }
133        decodeIn.close();
134        try {
135            join(1000);
136        } catch (InterruptedException e) {
137            // Just in case
138            interrupt();
139        }
140    }
141
142    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
143        "PMD.AvoidInstantiatingObjectsInLoops",
144        "PMD.DataflowAnomalyAnalysis" })
145    @Override
146    public void run() {
147        while (true) {
148            JsonBeanDecoder jsonDecoder = JsonBeanDecoder.create(jsonSource);
149            JsonRpc rpc = null;
150            try {
151                rpc = jsonDecoder.readObject(DefaultJsonRpc.class);
152            } catch (JsonDecodeException e) {
153                break;
154            }
155            if (rpc == null) {
156                break;
157            }
158            // Fully decoded JSON available.
159            PortalSession portalSession = channelRef.get();
160            EventPipeline eventPipeline = pipelineRef.get();
161            if (eventPipeline == null || portalSession == null) {
162                break;
163            }
164            // Portal session established, check for special disconnect
165            if ("disconnect".equals(rpc.method())
166                && portalSession.portalSessionId().equals(
167                    rpc.params().asString(0))) {
168                portalSession.discard();
169                return;
170            }
171            // Ordinary message from portal (view) to server.
172            portalSession.refresh();
173            if ("keepAlive".equals(rpc.method())) {
174                continue;
175            }
176            eventPipeline.fire(new JsonInput(rpc), portalSession);
177        }
178    }
179}