001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.lang.ref.WeakReference;
022import java.net.Inet4Address;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.net.UnknownHostException;
028import java.nio.Buffer;
029import java.nio.ByteBuffer;
030import java.nio.CharBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Optional;
036import javax.net.ssl.SNIHostName;
037import javax.net.ssl.SNIServerName;
038import org.jdrupes.httpcodec.Codec;
039import org.jdrupes.httpcodec.Decoder;
040import org.jdrupes.httpcodec.MessageHeader;
041import org.jdrupes.httpcodec.ProtocolException;
042import org.jdrupes.httpcodec.ServerEngine;
043import org.jdrupes.httpcodec.protocols.http.HttpConstants.HttpStatus;
044import org.jdrupes.httpcodec.protocols.http.HttpField;
045import org.jdrupes.httpcodec.protocols.http.HttpRequest;
046import org.jdrupes.httpcodec.protocols.http.HttpResponse;
047import org.jdrupes.httpcodec.protocols.http.server.HttpRequestDecoder;
048import org.jdrupes.httpcodec.protocols.http.server.HttpResponseEncoder;
049import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
050import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
051import org.jdrupes.httpcodec.types.Converters;
052import org.jdrupes.httpcodec.types.StringList;
053import org.jgrapes.core.Channel;
054import org.jgrapes.core.ClassChannel;
055import org.jgrapes.core.Component;
056import org.jgrapes.core.Components;
057import org.jgrapes.core.EventPipeline;
058import org.jgrapes.core.annotation.Handler;
059import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
060import org.jgrapes.core.internal.EventProcessor;
061import org.jgrapes.http.events.ProtocolSwitchAccepted;
062import org.jgrapes.http.events.Request;
063import org.jgrapes.http.events.Response;
064import org.jgrapes.http.events.Upgraded;
065import org.jgrapes.http.events.WebSocketClose;
066import org.jgrapes.io.IOSubchannel;
067import org.jgrapes.io.events.Close;
068import org.jgrapes.io.events.Closed;
069import org.jgrapes.io.events.Input;
070import org.jgrapes.io.events.Output;
071import org.jgrapes.io.events.Purge;
072import org.jgrapes.io.util.LinkedIOSubchannel;
073import org.jgrapes.io.util.ManagedBuffer;
074import org.jgrapes.io.util.ManagedBufferPool;
075import org.jgrapes.net.SocketServer;
076import org.jgrapes.net.events.Accepted;
077
078/**
079 * A converter component that receives and sends byte buffers on a 
080 * network channel and web application layer messages on
081 * {@link IOSubchannel}s of its channel. 
082 * 
083 * Each {@link IOSubchannel} represents a connection established by 
084 * the browser. The {@link HttpServer} fires {@link Request} events 
085 * (and {@link Input} events, if there is associated data) on the 
086 * subchannels. Web application components (short "weblets") handle 
087 * these events and use 
088 * {@link LinkedIOSubchannel#respond(org.jgrapes.core.Event)}
089 * to send {@link Response} events and, if applicable, {@link Output}
090 * events with data belonging to the response.
091 * 
092 * Events must be fired by weblets while handling the {@link Request}
093 * or {@link Input} events only (to be precise: while handling events 
094 * processed by the associated {@link EventProcessor}) to ensure
095 * that responses and their associated data do not interleave. 
096 */
097@SuppressWarnings("PMD.ExcessiveImports")
098public class HttpServer extends Component {
099
100    private WeakReference<Channel> networkChannelPassBack;
101    private List<Class<? extends Request.In>> providedFallbacks;
102    private int matchLevels = 1;
103    private boolean acceptNoSni;
104    private int applicationBufferSize = -1;
105
106    /**
107     * Denotes the network channel in handler annotations.
108     */
109    private static class NetworkChannel extends ClassChannel {
110    }
111
112    /**
113     * Create a new server that uses the {@code networkChannel} for network
114     * level I/O.
115     * <P>
116     * As a convenience the server can provide fall back handlers for the
117     * specified types of requests. The fall back handler simply returns 404 (
118     * "Not found").
119     * 
120     * @param appChannel
121     *            this component's channel
122     * @param networkChannel
123     *            the channel for network level I/O
124     * @param fallbacks
125     *            the requests for which a fall back handler is provided
126     */
127    @SafeVarargs
128    public HttpServer(Channel appChannel, Channel networkChannel,
129            Class<? extends Request.In>... fallbacks) {
130        super(appChannel, ChannelReplacements.create()
131            .add(NetworkChannel.class, networkChannel));
132        networkChannelPassBack = new WeakReference<>(networkChannel);
133        this.providedFallbacks = Arrays.asList(fallbacks);
134    }
135
136    /**
137     * Create a new server that creates its own {@link SocketServer} with 
138     * the given address and uses it for network level I/O.
139     * 
140     * @param appChannel
141     *            this component's channel
142     * @param serverAddress the address to listen on
143     * @param fallbacks fall backs
144     */
145    @SafeVarargs
146    public HttpServer(Channel appChannel, InetSocketAddress serverAddress,
147            Class<? extends Request.In>... fallbacks) {
148        this(appChannel, new SocketServer().setServerAddress(serverAddress),
149            fallbacks);
150        attach((SocketServer) networkChannelPassBack.get());
151    }
152
153    /**
154     * @return the matchLevels
155     */
156    public int matchLevels() {
157        return matchLevels;
158    }
159
160    /**
161     * Sets the number of elements from the request path used in the match value
162     * of the generated events (see {@link Request#defaultCriterion()}), defaults
163     * to 1.
164     * 
165     * @param matchLevels the matchLevels to set
166     * @return the http server for easy chaining
167     */
168    public HttpServer setMatchLevels(int matchLevels) {
169        this.matchLevels = matchLevels;
170        return this;
171    }
172
173    /**
174     * Sets the size of the buffers used for {@link Output} events
175     * on the application channel. Defaults to the upstream buffer size
176     * minus 512 (estimate for added protocol overhead).
177     * 
178     * @param applicationBufferSize the size to set
179     * @return the http server for easy chaining
180     */
181    public HttpServer setApplicationBufferSize(int applicationBufferSize) {
182        this.applicationBufferSize = applicationBufferSize;
183        return this;
184    }
185
186    /**
187     * Returns the size of the application side (receive) buffers.
188     * 
189     * @return the value or -1 if not set
190     */
191    public int applicationBufferSize() {
192        return applicationBufferSize;
193    }
194
195    /**
196     * Determines if request from secure (TLS) connections without
197     * SNI are accepted.
198     *  
199     * Secure (TLS) requests usually transfer the name of the server that
200     * they want to connect to during handshake. The HTTP server checks
201     * that the `Host` header field of decoded requests matches the
202     * name used to establish the connection. If, however, the connection
203     * is made using the IP-address, the client does not have a host name.
204     * If such connections are to be accepted, this flag, which
205     * defaults to `false`, must be set.
206     * 
207     * Note that in request accepted without SNI, the `Host` header field
208     * will be modified to contain the IP-address of the indicated host
209     * to prevent accidental matching with virtual host names.  
210     * 
211     * @param acceptNoSni the value to set
212     * @return the http server for easy chaining
213     */
214    public HttpServer setAcceptNoSni(boolean acceptNoSni) {
215        this.acceptNoSni = acceptNoSni;
216        return this;
217    }
218
219    /**
220     * Returns if secure (TLS) requests without SNI are allowed.
221     * 
222     * @return the result
223     */
224    public boolean acceptNoSni() {
225        return acceptNoSni;
226    }
227
228    /**
229     * Creates a new downstream connection as {@link LinkedIOSubchannel} 
230     * of the network connection, a {@link HttpRequestDecoder} and a
231     * {@link HttpResponseEncoder}.
232     * 
233     * @param event
234     *            the accepted event
235     */
236    @Handler(channels = NetworkChannel.class)
237    public void onAccepted(Accepted event, IOSubchannel netChannel) {
238        new WebAppMsgChannel(event, netChannel);
239    }
240
241    /**
242     * Handles data from the client (from upstream). The data is send through 
243     * the {@link HttpRequestDecoder} and events are sent downstream according
244     * to the decoding results.
245     * 
246     * @param event the event
247     * @throws ProtocolException if a protocol exception occurs
248     * @throws InterruptedException 
249     */
250    @Handler(channels = NetworkChannel.class)
251    public void onInput(
252            Input<ByteBuffer> event, IOSubchannel netChannel)
253            throws ProtocolException, InterruptedException {
254        @SuppressWarnings("unchecked")
255        final Optional<WebAppMsgChannel> appChannel
256            = (Optional<WebAppMsgChannel>) LinkedIOSubchannel
257                .downstreamChannel(this, netChannel);
258        if (appChannel.isPresent()) {
259            appChannel.get().handleNetInput(event);
260        }
261    }
262
263    /**
264     * Forwards a {@link Closed} event to the application channel. 
265     *
266     * @param event the event
267     * @param netChannel the net channel
268     */
269    @Handler(channels = NetworkChannel.class)
270    public void onClosed(Closed<?> event, IOSubchannel netChannel) {
271        LinkedIOSubchannel.downstreamChannel(this, netChannel,
272            WebAppMsgChannel.class).ifPresent(appChannel -> {
273                appChannel.handleClosed(event);
274            });
275    }
276
277    /**
278     * Forwards a {@link Purge} event to the application channel.
279     *
280     * @param event the event
281     * @param netChannel the net channel
282     */
283    @Handler(channels = NetworkChannel.class)
284    public void onPurge(Purge event, IOSubchannel netChannel) {
285        LinkedIOSubchannel.downstreamChannel(this, netChannel,
286            WebAppMsgChannel.class).ifPresent(appChannel -> {
287                appChannel.handlePurge(event);
288            });
289    }
290
291    /**
292     * Handles a response event from downstream by sending it through an
293     * {@link HttpResponseEncoder} that generates the data (encoded information)
294     * and sends it upstream with {@link Output} events. Depending on whether 
295     * the response has a body, subsequent {@link Output} events can
296     * follow.
297     * 
298     * @param event
299     *            the response event
300     * @throws InterruptedException if the execution was interrupted
301     */
302    @Handler
303    public void onResponse(Response event, WebAppMsgChannel appChannel)
304            throws InterruptedException {
305        appChannel.handleResponse(event);
306    }
307
308    /**
309     * Receives the message body of a response. A {@link Response} event that
310     * has a message body can be followed by one or more {@link Output} events
311     * from downstream that contain the data. An {@code Output} event
312     * with the end of record flag set signals the end of the message body.
313     * 
314     * @param event
315     *            the event with the data
316     * @throws InterruptedException if the execution was interrupted
317     */
318    @Handler
319    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
320            throws InterruptedException {
321        appChannel.handleAppOutput(event);
322    }
323
324    /**
325     * Handles a close event from downstream by closing the upstream
326     * connections.
327     * 
328     * @param event
329     *            the close event
330     * @throws InterruptedException if the execution was interrupted
331     */
332    @Handler
333    public void onClose(Close event, WebAppMsgChannel appChannel)
334            throws InterruptedException {
335        appChannel.handleClose(event);
336    }
337
338    /**
339     * Checks whether the request has been handled (value of {@link Request}
340     * event set to `true`) or the status code in the prepared response
341     * is no longer "Not Implemented". If not, but a fall back has been set, 
342     * send a "Not Found" response. If this isn't the case either, send 
343     * the default response ("Not implemented") to the client.
344     * 
345     * @param event
346     *            the request completed event
347     * @param appChannel the application channel 
348     * @throws InterruptedException if the execution was interrupted
349     */
350    @Handler
351    public void onRequestCompleted(
352            Request.In.Completed event, IOSubchannel appChannel)
353            throws InterruptedException {
354        final Request.In requestEvent = event.event();
355        // A check that also works with null.
356        if (Boolean.TRUE.equals(requestEvent.get())
357            || requestEvent.httpRequest().response().map(
358                response -> response.statusCode() != HttpStatus.NOT_IMPLEMENTED
359                    .statusCode())
360                .orElse(false)) {
361            // Some other component has taken care
362            return;
363        }
364
365        // Check if "Not Found" should be sent
366        if (providedFallbacks != null
367            && providedFallbacks.contains(requestEvent.getClass())) {
368            ResponseCreationSupport.sendResponse(
369                requestEvent.httpRequest(), appChannel, HttpStatus.NOT_FOUND);
370            return;
371        }
372
373        // Last resort
374        ResponseCreationSupport.sendResponse(requestEvent.httpRequest(),
375            appChannel, HttpStatus.NOT_IMPLEMENTED);
376    }
377
378    /**
379     * Provides a fallback handler for an OPTIONS request with asterisk. Simply
380     * responds with "OK".
381     * 
382     * @param event the event
383     * @param appChannel the application channel
384     */
385    @Handler(priority = Integer.MIN_VALUE)
386    public void onOptions(Request.In.Options event, IOSubchannel appChannel) {
387        if (event.requestUri() == HttpRequest.ASTERISK_REQUEST) {
388            HttpResponse response = event.httpRequest().response().get();
389            response.setStatus(HttpStatus.OK);
390            appChannel.respond(new Response(response));
391            event.setResult(true);
392            event.stop();
393        }
394    }
395
396    /**
397     * Send the response indicating that the protocol switch was accepted
398     * and causes subsequent data to be handled as {@link Input} and
399     * {@link Output} events on the channel.
400     * 
401     * As a convenience, the channel is associates with the URI that
402     * was used to request the protocol switch using {@link URI} as key.
403     * 
404     * @param event the event
405     * @param appChannel the channel
406     */
407    @Handler
408    public void onProtocolSwitchAccepted(
409            ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
410        appChannel.handleProtocolSwitchAccepted(event, appChannel);
411    }
412
413    /**
414     * An application layer channel.
415     */
416    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
417    private class WebAppMsgChannel extends LinkedIOSubchannel {
418        // Starts as ServerEngine<HttpRequest,HttpResponse> but may change
419        private final ServerEngine<?, ?> engine;
420        private ManagedBuffer<ByteBuffer> outBuffer;
421        private final boolean secure;
422        private List<String> snis = Collections.emptyList();
423        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
424                ByteBuffer> byteBufferPool;
425        private final ManagedBufferPool<ManagedBuffer<CharBuffer>,
426                CharBuffer> charBufferPool;
427        private ManagedBufferPool<?, ?> currentPool;
428        private final EventPipeline downPipeline;
429        private Upgraded pendingUpgraded;
430        private WsMessageHeader currentWsMessage;
431
432        /**
433         * Instantiates a new channel.
434         *
435         * @param event the event
436         * @param netChannel the net channel
437         */
438        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
439        public WebAppMsgChannel(Accepted event, IOSubchannel netChannel) {
440            super(HttpServer.this, channel(), netChannel, newEventPipeline());
441            engine = new ServerEngine<>(
442                new HttpRequestDecoder(), new HttpResponseEncoder());
443            secure = event.isSecure();
444            if (secure) {
445                snis = new ArrayList<>();
446                for (SNIServerName sni : event.requestedServerNames()) {
447                    if (sni instanceof SNIHostName) {
448                        snis.add(((SNIHostName) sni).getAsciiName());
449                    }
450                }
451            }
452
453            // Calculate "good" application buffer size
454            int bufferSize = applicationBufferSize;
455            if (bufferSize <= 0) {
456                bufferSize = netChannel.byteBufferPool().bufferSize() - 512;
457                if (bufferSize < 4096) {
458                    bufferSize = 4096;
459                }
460            }
461
462            String channelName = Components.objectName(HttpServer.this)
463                + "." + Components.objectName(this);
464            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
465            charBufferPool().setName(channelName + ".upstream.charBuffers");
466            // Allocate downstream buffer pools. Note that decoding WebSocket
467            // network packets may result in several WS frames that are each
468            // delivered in independent events. Therefore provide some
469            // additional buffers.
470            final int bufSize = bufferSize;
471            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
472                () -> {
473                    return ByteBuffer.allocate(bufSize);
474                }, 2, 100)
475                    .setName(channelName + ".downstream.byteBuffers");
476            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
477                () -> {
478                    return CharBuffer.allocate(bufSize);
479                }, 2, 100)
480                    .setName(channelName + ".downstream.charBuffers");
481
482            // Downstream pipeline
483            downPipeline = newEventPipeline();
484        }
485
486        /**
487         * Handle {@link Input} events from the network.
488         *
489         * @param event the event
490         * @throws ProtocolException the protocol exception
491         * @throws InterruptedException the interrupted exception
492         */
493        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
494            "PMD.AvoidInstantiatingObjectsInLoops",
495            "PMD.AvoidDeeplyNestedIfStmts", "PMD.CollapsibleIfStatements",
496            "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" })
497        public void handleNetInput(Input<ByteBuffer> event)
498                throws ProtocolException, InterruptedException {
499            // Send the data from the event through the decoder.
500            ByteBuffer inData = event.data();
501            // Don't unnecessary allocate a buffer, may be header only message
502            ManagedBuffer<?> bodyData = null;
503            boolean wasOverflow = false;
504            while (inData.hasRemaining()) {
505                if (wasOverflow) {
506                    // Message has (more) body
507                    bodyData = currentPool.acquire();
508                }
509                Decoder.Result<?> result = engine.decode(inData,
510                    bodyData == null ? null : bodyData.backingBuffer(),
511                    event.isEndOfRecord());
512                if (result.response().isPresent()) {
513                    // Feedback required, send it "in sync", even if
514                    // event source is not the regular one.
515                    responsePipeline().overrideRestriction().fire(
516                        new Response(result.response().get()), this);
517                    if (result.isResponseOnly()) {
518                        maybeCloseConnection(result);
519                        continue;
520                    }
521                }
522                if (result.isHeaderCompleted()) {
523                    if (!handleRequestHeader(engine.currentRequest().get())) {
524                        maybeCloseConnection(result);
525                        break;
526                    }
527                }
528                if (bodyData != null) {
529                    if (bodyData.position() > 0) {
530                        downPipeline.fire(Input.fromSink(
531                            bodyData, !result.isOverflow()
532                                && !result.isUnderflow()),
533                            this);
534                    } else {
535                        bodyData.unlockBuffer();
536                    }
537                    bodyData = null;
538                }
539                maybeCloseConnection(result);
540                wasOverflow = result.isOverflow();
541            }
542        }
543
544        private void maybeCloseConnection(Decoder.Result<?> result) {
545            if (result.closeConnection()) {
546                // Send close "in sync", even if event source is unexpected.
547                responsePipeline().overrideRestriction()
548                    .fire(new Close(), this);
549            }
550        }
551
552        @SuppressWarnings({ "PMD.CollapsibleIfStatements",
553            "PMD.CognitiveComplexity" })
554        private boolean handleRequestHeader(MessageHeader request) {
555            if (request instanceof HttpRequest) {
556                HttpRequest httpRequest = (HttpRequest) request;
557                if (httpRequest.hasPayload()) {
558                    if (httpRequest.findValue(
559                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
560                        .map(type -> "text"
561                            .equalsIgnoreCase(type.value().topLevelType()))
562                        .orElse(false)) {
563                        currentPool = charBufferPool;
564                    } else {
565                        currentPool = byteBufferPool;
566                    }
567                }
568                if (secure) {
569                    if (!snis.contains(httpRequest.host())) {
570                        if (acceptNoSni && snis.isEmpty()) {
571                            convertHostToNumerical(httpRequest);
572                        } else {
573                            ResponseCreationSupport.sendResponse(httpRequest,
574                                this, 421, "Misdirected Request");
575                            return false;
576                        }
577                    }
578                }
579                try {
580                    downPipeline.fire(Request.In.fromHttpRequest(httpRequest,
581                        secure, matchLevels), this);
582                } catch (URISyntaxException e) {
583                    ResponseCreationSupport.sendResponse(httpRequest, this, 400,
584                        "Bad Request");
585                    return false;
586                }
587            } else if (request instanceof WsMessageHeader) {
588                WsMessageHeader wsMessage = (WsMessageHeader) request;
589                if (wsMessage.hasPayload()) {
590                    if (wsMessage.isTextMode()) {
591                        currentPool = charBufferPool;
592                    } else {
593                        currentPool = byteBufferPool;
594                    }
595                }
596            } else if (request instanceof WsCloseFrame) {
597                downPipeline.fire(
598                    new WebSocketClose((WsCloseFrame) request, this));
599            }
600            return true;
601        }
602
603        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
604            "PMD.UseStringBufferForStringAppends" })
605        private void convertHostToNumerical(HttpRequest request) {
606            int port = request.port();
607            String host;
608            try {
609                InetAddress addr = InetAddress.getByName(
610                    request.host());
611                host = addr.getHostAddress();
612                if (!(addr instanceof Inet4Address)) {
613                    host = "[" + host + "]";
614                }
615            } catch (UnknownHostException e) {
616                host = InetAddress.getLoopbackAddress().getHostAddress();
617            }
618            request.setHostAndPort(host, port);
619        }
620
621        /**
622         * Handle a response event from the application layer.
623         *
624         * @param event the event
625         * @throws InterruptedException the interrupted exception
626         */
627        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
628            "PMD.AvoidBranchingStatementAsLastInLoop",
629            "PMD.CognitiveComplexity" })
630        public void handleResponse(Response event) throws InterruptedException {
631            if (!engine.encoding()
632                .isAssignableFrom(event.response().getClass())) {
633                return;
634            }
635            final MessageHeader response = event.response();
636            // Start sending the response
637            @SuppressWarnings("unchecked")
638            ServerEngine<?, MessageHeader> msgEngine
639                = (ServerEngine<?, MessageHeader>) engine;
640            msgEngine.encode(response);
641            if (pendingUpgraded != null) {
642                if (response instanceof HttpResponse
643                    && ((HttpResponse) response).statusCode() % 100 == 1) {
644                    downPipeline.fire(pendingUpgraded, this);
645                }
646                pendingUpgraded = null;
647            }
648            boolean hasBody = response.hasPayload();
649            while (true) {
650                outBuffer = upstreamChannel().byteBufferPool().acquire();
651                final ManagedBuffer<ByteBuffer> buffer = outBuffer;
652                Codec.Result result = engine.encode(
653                    Codec.EMPTY_IN, buffer.backingBuffer(), !hasBody);
654                if (result.isOverflow()) {
655                    upstreamChannel().respond(Output.fromSink(buffer, false));
656                    continue;
657                }
658                if (hasBody) {
659                    // Keep buffer with incomplete response to be further
660                    // filled by subsequent Output events
661                    break;
662                }
663                // Response is complete
664                if (buffer.position() > 0) {
665                    upstreamChannel().respond(Output.fromSink(buffer, true));
666                } else {
667                    buffer.unlockBuffer();
668                }
669                outBuffer = null;
670                if (result.closeConnection()) {
671                    upstreamChannel().respond(new Close());
672                }
673                break;
674            }
675
676        }
677
678        /**
679         * Handle a {@link ProtocolSwitchAccepted} event from the 
680         * application layer.
681         *
682         * @param event the event
683         * @param appChannel the app channel
684         */
685        public void handleProtocolSwitchAccepted(
686                ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
687            appChannel.setAssociated(URI.class,
688                event.requestEvent().requestUri());
689            final HttpResponse response = event.requestEvent()
690                .httpRequest().response().get()
691                .setStatus(HttpStatus.SWITCHING_PROTOCOLS)
692                .setField(HttpField.UPGRADE,
693                    new StringList(event.protocol()));
694            // We send the Upgraded event only after the response has
695            // successfully been encoded (and thus checked).
696            pendingUpgraded = new Upgraded(event.resourceName(),
697                event.protocol());
698            respond(new Response(response));
699        }
700
701        /**
702         * Handle output from the application layer.
703         *
704         * @param event the event
705         * @throws InterruptedException the interrupted exception
706         */
707        @SuppressWarnings({ "PMD.CyclomaticComplexity", "PMD.NcssCount",
708            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
709            "PMD.CognitiveComplexity" })
710        public void handleAppOutput(Output<?> event)
711                throws InterruptedException {
712            Buffer eventData = event.data();
713            Buffer input;
714            if (eventData instanceof ByteBuffer) {
715                input = ((ByteBuffer) eventData).duplicate();
716            } else if (eventData instanceof CharBuffer) {
717                input = ((CharBuffer) eventData).duplicate();
718            } else {
719                return;
720            }
721            if (engine.switchedTo().equals(Optional.of("websocket"))
722                && currentWsMessage == null) {
723                // When switched to WebSockets, we only have Input and Output
724                // events. Add header automatically.
725                @SuppressWarnings("unchecked")
726                ServerEngine<?, MessageHeader> wsEngine
727                    = (ServerEngine<?, MessageHeader>) engine;
728                currentWsMessage = new WsMessageHeader(
729                    event.buffer().backingBuffer() instanceof CharBuffer,
730                    true);
731                wsEngine.encode(currentWsMessage);
732            }
733            while (input.hasRemaining() || event.isEndOfRecord()) {
734                if (outBuffer == null) {
735                    outBuffer = upstreamChannel().byteBufferPool().acquire();
736                }
737                Codec.Result result = engine.encode(input,
738                    outBuffer.backingBuffer(), event.isEndOfRecord());
739                if (result.isOverflow()) {
740                    upstreamChannel()
741                        .respond(Output.fromSink(outBuffer, false));
742                    outBuffer = upstreamChannel().byteBufferPool().acquire();
743                    continue;
744                }
745                if (event.isEndOfRecord() || result.closeConnection()) {
746                    if (outBuffer.position() > 0) {
747                        upstreamChannel()
748                            .respond(Output.fromSink(outBuffer, true));
749                    } else {
750                        outBuffer.unlockBuffer();
751                    }
752                    outBuffer = null;
753                    if (result.closeConnection()) {
754                        upstreamChannel().respond(new Close());
755                    }
756                    break;
757                }
758            }
759            if (engine.switchedTo().equals(Optional.of("websocket"))
760                && event.isEndOfRecord()) {
761                currentWsMessage = null;
762            }
763        }
764
765        /**
766         * Handle a {@link Close} event from the application layer.
767         *
768         * @param event the event
769         * @throws InterruptedException the interrupted exception
770         */
771        public void handleClose(Close event) throws InterruptedException {
772            if (engine.switchedTo().equals(Optional.of("websocket"))) {
773                fire(new Response(new WsCloseFrame(null, null)), this);
774                return;
775            }
776            upstreamChannel().respond(new Close());
777        }
778
779        /**
780         * Handle a {@link Closed} event from the network by forwarding
781         * it to the application layer.
782         *
783         * @param event the event
784         */
785        public void handleClosed(Closed<?> event) {
786            downPipeline.fire(new Closed<Void>(), this);
787        }
788
789        /**
790         * Handle a {@link Purge} event by forwarding it to the
791         * application layer.
792         *
793         * @param event the event
794         */
795        public void handlePurge(Purge event) {
796            downPipeline.fire(new Purge(), this);
797        }
798
799    }
800
801}