001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.lang.ref.WeakReference;
022import java.net.Inet4Address;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.net.UnknownHostException;
028import java.nio.Buffer;
029import java.nio.ByteBuffer;
030import java.nio.CharBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Optional;
036import javax.net.ssl.SNIHostName;
037import javax.net.ssl.SNIServerName;
038import org.jdrupes.httpcodec.Codec;
039import org.jdrupes.httpcodec.Decoder;
040import org.jdrupes.httpcodec.MessageHeader;
041import org.jdrupes.httpcodec.ProtocolException;
042import org.jdrupes.httpcodec.ServerEngine;
043import org.jdrupes.httpcodec.protocols.http.HttpConstants.HttpStatus;
044import org.jdrupes.httpcodec.protocols.http.HttpField;
045import org.jdrupes.httpcodec.protocols.http.HttpRequest;
046import org.jdrupes.httpcodec.protocols.http.HttpResponse;
047import org.jdrupes.httpcodec.protocols.http.server.HttpRequestDecoder;
048import org.jdrupes.httpcodec.protocols.http.server.HttpResponseEncoder;
049import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
050import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
051import org.jdrupes.httpcodec.types.Converters;
052import org.jdrupes.httpcodec.types.StringList;
053import org.jgrapes.core.Channel;
054import org.jgrapes.core.ClassChannel;
055import org.jgrapes.core.Component;
056import org.jgrapes.core.Components;
057import org.jgrapes.core.EventPipeline;
058import org.jgrapes.core.annotation.Handler;
059import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
060import org.jgrapes.core.internal.EventProcessor;
061import org.jgrapes.http.events.ProtocolSwitchAccepted;
062import org.jgrapes.http.events.Request;
063import org.jgrapes.http.events.Response;
064import org.jgrapes.http.events.Upgraded;
065import org.jgrapes.http.events.WebSocketClose;
066import org.jgrapes.io.IOSubchannel;
067import org.jgrapes.io.events.Close;
068import org.jgrapes.io.events.Closed;
069import org.jgrapes.io.events.Input;
070import org.jgrapes.io.events.Output;
071import org.jgrapes.io.events.Purge;
072import org.jgrapes.io.util.LinkedIOSubchannel;
073import org.jgrapes.io.util.ManagedBuffer;
074import org.jgrapes.io.util.ManagedBufferPool;
075import org.jgrapes.net.SocketServer;
076import org.jgrapes.net.events.Accepted;
077
078/**
079 * A converter component that receives and sends byte buffers on a 
080 * network channel and web application layer messages on
081 * {@link IOSubchannel}s of its channel. 
082 * 
083 * Each {@link IOSubchannel} represents a connection established by 
084 * the browser. The {@link HttpServer} fires {@link Request} events 
085 * (and {@link Input} events, if there is associated data) on the 
086 * subchannels. Web application components (short "weblets") handle 
087 * these events and use 
088 * {@link LinkedIOSubchannel#respond(org.jgrapes.core.Event)}
089 * to send {@link Response} events and, if applicable, {@link Output}
090 * events with data belonging to the response.
091 * 
092 * Events must be fired by weblets while handling the {@link Request}
093 * or {@link Input} events only (to be precise: while handling events 
094 * processed by the associated {@link EventProcessor}) to ensure
095 * that responses and their associated data do not interleave. 
096 */
097@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" })
098public class HttpServer extends Component {
099
100    @SuppressWarnings("PMD.SingularField")
101    private WeakReference<Channel> networkChannelPassBack;
102    private List<Class<? extends Request.In>> providedFallbacks;
103    private int matchLevels = 1;
104    private boolean acceptNoSni;
105    private int applicationBufferSize = -1;
106
107    /**
108     * Denotes the network channel in handler annotations.
109     */
110    private static final class NetworkChannel extends ClassChannel {
111    }
112
113    /**
114     * Create a new server that uses the {@code networkChannel} for network
115     * level I/O.
116     * <P>
117     * As a convenience the server can provide fall back handlers for the
118     * specified types of requests. The fall back handler simply returns 404 (
119     * "Not found").
120     * 
121     * @param appChannel
122     *            this component's channel
123     * @param networkChannel
124     *            the channel for network level I/O
125     * @param fallbacks
126     *            the requests for which a fall back handler is provided
127     */
128    @SafeVarargs
129    public HttpServer(Channel appChannel, Channel networkChannel,
130            Class<? extends Request.In>... fallbacks) {
131        super(appChannel, ChannelReplacements.create()
132            .add(NetworkChannel.class, networkChannel));
133        networkChannelPassBack = new WeakReference<>(networkChannel);
134        this.providedFallbacks = Arrays.asList(fallbacks);
135    }
136
137    /**
138     * Create a new server that creates its own {@link SocketServer} with 
139     * the given address and uses it for network level I/O.
140     * 
141     * @param appChannel
142     *            this component's channel
143     * @param serverAddress the address to listen on
144     * @param fallbacks fall backs
145     */
146    @SafeVarargs
147    public HttpServer(Channel appChannel, InetSocketAddress serverAddress,
148            Class<? extends Request.In>... fallbacks) {
149        this(appChannel, new SocketServer().setServerAddress(serverAddress),
150            fallbacks);
151        attach((SocketServer) networkChannelPassBack.get());
152    }
153
154    /**
155     * @return the matchLevels
156     */
157    public int matchLevels() {
158        return matchLevels;
159    }
160
161    /**
162     * Sets the number of elements from the request path used in the match value
163     * of the generated events (see {@link Request#defaultCriterion()}), defaults
164     * to 1.
165     * 
166     * @param matchLevels the matchLevels to set
167     * @return the http server for easy chaining
168     */
169    public HttpServer setMatchLevels(int matchLevels) {
170        this.matchLevels = matchLevels;
171        return this;
172    }
173
174    /**
175     * Sets the size of the buffers used for {@link Output} events
176     * on the application channel. Defaults to the upstream buffer size
177     * minus 512 (estimate for added protocol overhead).
178     * 
179     * @param applicationBufferSize the size to set
180     * @return the http server for easy chaining
181     */
182    public HttpServer setApplicationBufferSize(int applicationBufferSize) {
183        this.applicationBufferSize = applicationBufferSize;
184        return this;
185    }
186
187    /**
188     * Returns the size of the application side (receive) buffers.
189     * 
190     * @return the value or -1 if not set
191     */
192    public int applicationBufferSize() {
193        return applicationBufferSize;
194    }
195
196    /**
197     * Determines if request from secure (TLS) connections without
198     * SNI are accepted.
199     *  
200     * Secure (TLS) requests usually transfer the name of the server that
201     * they want to connect to during handshake. The HTTP server checks
202     * that the `Host` header field of decoded requests matches the
203     * name used to establish the connection. If, however, the connection
204     * is made using the IP-address, the client does not have a host name.
205     * If such connections are to be accepted, this flag, which
206     * defaults to `false`, must be set.
207     * 
208     * Note that in request accepted without SNI, the `Host` header field
209     * will be modified to contain the IP-address of the indicated host
210     * to prevent accidental matching with virtual host names.  
211     * 
212     * @param acceptNoSni the value to set
213     * @return the http server for easy chaining
214     */
215    public HttpServer setAcceptNoSni(boolean acceptNoSni) {
216        this.acceptNoSni = acceptNoSni;
217        return this;
218    }
219
220    /**
221     * Returns if secure (TLS) requests without SNI are allowed.
222     * 
223     * @return the result
224     */
225    public boolean acceptNoSni() {
226        return acceptNoSni;
227    }
228
229    /**
230     * Creates a new downstream connection as {@link LinkedIOSubchannel} 
231     * of the network connection, a {@link HttpRequestDecoder} and a
232     * {@link HttpResponseEncoder}.
233     * 
234     * @param event
235     *            the accepted event
236     */
237    @Handler(channels = NetworkChannel.class)
238    public void onAccepted(Accepted event, IOSubchannel netChannel) {
239        new WebAppMsgChannel(event, netChannel);
240    }
241
242    /**
243     * Handles data from the client (from upstream). The data is send through 
244     * the {@link HttpRequestDecoder} and events are sent downstream according
245     * to the decoding results.
246     * 
247     * @param event the event
248     * @throws ProtocolException if a protocol exception occurs
249     * @throws InterruptedException 
250     */
251    @Handler(channels = NetworkChannel.class)
252    public void onInput(
253            Input<ByteBuffer> event, IOSubchannel netChannel)
254            throws ProtocolException, InterruptedException {
255        @SuppressWarnings("unchecked")
256        final Optional<WebAppMsgChannel> appChannel
257            = (Optional<WebAppMsgChannel>) LinkedIOSubchannel
258                .downstreamChannel(this, netChannel);
259        if (appChannel.isPresent()) {
260            appChannel.get().handleNetInput(event);
261        }
262    }
263
264    /**
265     * Forwards a {@link Closed} event to the application channel. 
266     *
267     * @param event the event
268     * @param netChannel the net channel
269     */
270    @Handler(channels = NetworkChannel.class)
271    public void onClosed(Closed<?> event, IOSubchannel netChannel) {
272        LinkedIOSubchannel.downstreamChannel(this, netChannel,
273            WebAppMsgChannel.class).ifPresent(appChannel -> {
274                appChannel.handleClosed(event);
275            });
276    }
277
278    /**
279     * Forwards a {@link Purge} event to the application channel.
280     *
281     * @param event the event
282     * @param netChannel the net channel
283     */
284    @Handler(channels = NetworkChannel.class)
285    public void onPurge(Purge event, IOSubchannel netChannel) {
286        LinkedIOSubchannel.downstreamChannel(this, netChannel,
287            WebAppMsgChannel.class).ifPresent(appChannel -> {
288                appChannel.handlePurge(event);
289            });
290    }
291
292    /**
293     * Handles a response event from downstream by sending it through an
294     * {@link HttpResponseEncoder} that generates the data (encoded information)
295     * and sends it upstream with {@link Output} events. Depending on whether 
296     * the response has a body, subsequent {@link Output} events can
297     * follow.
298     * 
299     * @param event
300     *            the response event
301     * @throws InterruptedException if the execution was interrupted
302     */
303    @Handler
304    public void onResponse(Response event, WebAppMsgChannel appChannel)
305            throws InterruptedException {
306        appChannel.handleResponse(event);
307    }
308
309    /**
310     * Receives the message body of a response. A {@link Response} event that
311     * has a message body can be followed by one or more {@link Output} events
312     * from downstream that contain the data. An {@code Output} event
313     * with the end of record flag set signals the end of the message body.
314     * 
315     * @param event
316     *            the event with the data
317     * @throws InterruptedException if the execution was interrupted
318     */
319    @Handler
320    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
321            throws InterruptedException {
322        appChannel.handleAppOutput(event);
323    }
324
325    /**
326     * Handles a close event from downstream by closing the upstream
327     * connections.
328     * 
329     * @param event
330     *            the close event
331     * @throws InterruptedException if the execution was interrupted
332     */
333    @Handler
334    public void onClose(Close event, WebAppMsgChannel appChannel)
335            throws InterruptedException {
336        appChannel.handleClose(event);
337    }
338
339    /**
340     * Checks whether the request has been handled (value of {@link Request}
341     * event set to `true`) or the status code in the prepared response
342     * is no longer "Not Implemented". If not, but a fall back has been set, 
343     * send a "Not Found" response. If this isn't the case either, send 
344     * the default response ("Not implemented") to the client.
345     * 
346     * @param event
347     *            the request completed event
348     * @param appChannel the application channel 
349     * @throws InterruptedException if the execution was interrupted
350     */
351    @Handler
352    public void onRequestCompleted(
353            Request.In.Completed event, IOSubchannel appChannel)
354            throws InterruptedException {
355        final Request.In requestEvent = event.event();
356        // A check that also works with null.
357        if (Boolean.TRUE.equals(requestEvent.get())
358            || requestEvent.httpRequest().response().map(
359                response -> response.statusCode() != HttpStatus.NOT_IMPLEMENTED
360                    .statusCode())
361                .orElse(false)) {
362            // Some other component has taken care
363            return;
364        }
365
366        // Check if "Not Found" should be sent
367        if (providedFallbacks != null
368            && providedFallbacks.contains(requestEvent.getClass())) {
369            ResponseCreationSupport.sendResponse(
370                requestEvent.httpRequest(), appChannel, HttpStatus.NOT_FOUND);
371            return;
372        }
373
374        // Last resort
375        ResponseCreationSupport.sendResponse(requestEvent.httpRequest(),
376            appChannel, HttpStatus.NOT_IMPLEMENTED);
377    }
378
379    /**
380     * Provides a fallback handler for an OPTIONS request with asterisk. Simply
381     * responds with "OK".
382     * 
383     * @param event the event
384     * @param appChannel the application channel
385     */
386    @Handler(priority = Integer.MIN_VALUE)
387    public void onOptions(Request.In.Options event, IOSubchannel appChannel) {
388        if (event.requestUri() == HttpRequest.ASTERISK_REQUEST) {
389            HttpResponse response = event.httpRequest().response().get();
390            response.setStatus(HttpStatus.OK);
391            appChannel.respond(new Response(response));
392            event.setResult(true);
393            event.stop();
394        }
395    }
396
397    /**
398     * Send the response indicating that the protocol switch was accepted
399     * and causes subsequent data to be handled as {@link Input} and
400     * {@link Output} events on the channel.
401     * 
402     * As a convenience, the channel is associates with the URI that
403     * was used to request the protocol switch using {@link URI} as key.
404     * 
405     * @param event the event
406     * @param appChannel the channel
407     */
408    @Handler
409    public void onProtocolSwitchAccepted(
410            ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
411        appChannel.handleProtocolSwitchAccepted(event, appChannel);
412    }
413
414    /**
415     * An application layer channel.
416     */
417    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
418    private class WebAppMsgChannel extends LinkedIOSubchannel {
419        // Starts as ServerEngine<HttpRequest,HttpResponse> but may change
420        private final ServerEngine<?, ?> engine;
421        private ManagedBuffer<ByteBuffer> outBuffer;
422        private final boolean secure;
423        private List<String> snis = Collections.emptyList();
424        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
425                ByteBuffer> byteBufferPool;
426        private final ManagedBufferPool<ManagedBuffer<CharBuffer>,
427                CharBuffer> charBufferPool;
428        private ManagedBufferPool<?, ?> currentPool;
429        private final EventPipeline downPipeline;
430        private Upgraded pendingUpgraded;
431        private WsMessageHeader currentWsMessage;
432
433        /**
434         * Instantiates a new channel.
435         *
436         * @param event the event
437         * @param netChannel the net channel
438         */
439        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
440        public WebAppMsgChannel(Accepted event, IOSubchannel netChannel) {
441            super(HttpServer.this, channel(), netChannel, newEventPipeline());
442            engine = new ServerEngine<>(
443                new HttpRequestDecoder(), new HttpResponseEncoder());
444            secure = event.isSecure();
445            if (secure) {
446                snis = new ArrayList<>();
447                for (SNIServerName sni : event.requestedServerNames()) {
448                    if (sni instanceof SNIHostName) {
449                        snis.add(((SNIHostName) sni).getAsciiName());
450                    }
451                }
452            }
453
454            // Calculate "good" application buffer size
455            int bufferSize = applicationBufferSize;
456            if (bufferSize <= 0) {
457                bufferSize = netChannel.byteBufferPool().bufferSize() - 512;
458                if (bufferSize < 4096) {
459                    bufferSize = 4096;
460                }
461            }
462
463            String channelName = Components.objectName(HttpServer.this)
464                + "." + Components.objectName(this);
465            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
466            charBufferPool().setName(channelName + ".upstream.charBuffers");
467            // Allocate downstream buffer pools. Note that decoding WebSocket
468            // network packets may result in several WS frames that are each
469            // delivered in independent events. Therefore provide some
470            // additional buffers.
471            final int bufSize = bufferSize;
472            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
473                () -> {
474                    return ByteBuffer.allocate(bufSize);
475                }, 2, 100)
476                    .setName(channelName + ".downstream.byteBuffers");
477            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
478                () -> {
479                    return CharBuffer.allocate(bufSize);
480                }, 2, 100)
481                    .setName(channelName + ".downstream.charBuffers");
482
483            // Downstream pipeline
484            downPipeline = newEventPipeline();
485        }
486
487        /**
488         * Handle {@link Input} events from the network.
489         *
490         * @param event the event
491         * @throws ProtocolException the protocol exception
492         * @throws InterruptedException the interrupted exception
493         */
494        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
495            "PMD.AvoidInstantiatingObjectsInLoops",
496            "PMD.AvoidDeeplyNestedIfStmts", "PMD.CollapsibleIfStatements",
497            "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" })
498        public void handleNetInput(Input<ByteBuffer> event)
499                throws ProtocolException, InterruptedException {
500            // Send the data from the event through the decoder.
501            ByteBuffer inData = event.data();
502            // Don't unnecessary allocate a buffer, may be header only message
503            ManagedBuffer<?> bodyData = null;
504            boolean wasOverflow = false;
505            while (inData.hasRemaining()) {
506                if (wasOverflow) {
507                    // Message has (more) body
508                    bodyData = currentPool.acquire();
509                }
510                Decoder.Result<?> result = engine.decode(inData,
511                    bodyData == null ? null : bodyData.backingBuffer(),
512                    event.isEndOfRecord());
513                if (result.response().isPresent()) {
514                    // Feedback required, send it "in sync", even if
515                    // event source is not the regular one.
516                    responsePipeline().overrideRestriction().fire(
517                        new Response(result.response().get()), this);
518                    if (result.isResponseOnly()) {
519                        maybeCloseConnection(result);
520                        continue;
521                    }
522                }
523                if (result.isHeaderCompleted()) {
524                    if (!handleRequestHeader(engine.currentRequest().get())) {
525                        maybeCloseConnection(result);
526                        break;
527                    }
528                }
529                if (bodyData != null) {
530                    if (bodyData.position() > 0) {
531                        downPipeline.fire(Input.fromSink(
532                            bodyData, !result.isOverflow()
533                                && !result.isUnderflow()),
534                            this);
535                    } else {
536                        bodyData.unlockBuffer();
537                    }
538                    bodyData = null;
539                }
540                maybeCloseConnection(result);
541                wasOverflow = result.isOverflow();
542            }
543        }
544
545        private void maybeCloseConnection(Decoder.Result<?> result) {
546            if (result.closeConnection()) {
547                // Send close "in sync", even if event source is unexpected.
548                responsePipeline().overrideRestriction()
549                    .fire(new Close(), this);
550            }
551        }
552
553        @SuppressWarnings({ "PMD.CollapsibleIfStatements",
554            "PMD.CognitiveComplexity" })
555        private boolean handleRequestHeader(MessageHeader request) {
556            if (request instanceof HttpRequest) {
557                HttpRequest httpRequest = (HttpRequest) request;
558                if (httpRequest.hasPayload()) {
559                    if (httpRequest.findValue(
560                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
561                        .map(type -> "text"
562                            .equalsIgnoreCase(type.value().topLevelType()))
563                        .orElse(false)) {
564                        currentPool = charBufferPool;
565                    } else {
566                        currentPool = byteBufferPool;
567                    }
568                }
569                if (secure) {
570                    if (!snis.contains(httpRequest.host())) {
571                        if (acceptNoSni && snis.isEmpty()) {
572                            convertHostToNumerical(httpRequest);
573                        } else {
574                            ResponseCreationSupport.sendResponse(httpRequest,
575                                this, 421, "Misdirected Request");
576                            return false;
577                        }
578                    }
579                }
580                try {
581                    downPipeline.fire(Request.In.fromHttpRequest(httpRequest,
582                        secure, matchLevels), this);
583                } catch (URISyntaxException e) {
584                    ResponseCreationSupport.sendResponse(httpRequest, this, 400,
585                        "Bad Request");
586                    return false;
587                }
588            } else if (request instanceof WsMessageHeader) {
589                WsMessageHeader wsMessage = (WsMessageHeader) request;
590                if (wsMessage.hasPayload()) {
591                    if (wsMessage.isTextMode()) {
592                        currentPool = charBufferPool;
593                    } else {
594                        currentPool = byteBufferPool;
595                    }
596                }
597            } else if (request instanceof WsCloseFrame) {
598                downPipeline.fire(
599                    new WebSocketClose((WsCloseFrame) request, this));
600            }
601            return true;
602        }
603
604        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
605            "PMD.UseStringBufferForStringAppends" })
606        private void convertHostToNumerical(HttpRequest request) {
607            int port = request.port();
608            String host;
609            try {
610                InetAddress addr = InetAddress.getByName(
611                    request.host());
612                host = addr.getHostAddress();
613                if (!(addr instanceof Inet4Address)) {
614                    host = "[" + host + "]";
615                }
616            } catch (UnknownHostException e) {
617                host = InetAddress.getLoopbackAddress().getHostAddress();
618            }
619            request.setHostAndPort(host, port);
620        }
621
622        /**
623         * Handle a response event from the application layer.
624         *
625         * @param event the event
626         * @throws InterruptedException the interrupted exception
627         */
628        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
629            "PMD.AvoidBranchingStatementAsLastInLoop",
630            "PMD.CognitiveComplexity" })
631        public void handleResponse(Response event) throws InterruptedException {
632            if (!engine.encoding()
633                .isAssignableFrom(event.response().getClass())) {
634                return;
635            }
636            final MessageHeader response = event.response();
637            // Start sending the response
638            @SuppressWarnings("unchecked")
639            ServerEngine<?, MessageHeader> msgEngine
640                = (ServerEngine<?, MessageHeader>) engine;
641            msgEngine.encode(response);
642            if (pendingUpgraded != null) {
643                if (response instanceof HttpResponse
644                    && ((HttpResponse) response).statusCode() % 100 == 1) {
645                    downPipeline.fire(pendingUpgraded, this);
646                }
647                pendingUpgraded = null;
648            }
649            boolean hasBody = response.hasPayload();
650            while (true) {
651                outBuffer = upstreamChannel().byteBufferPool().acquire();
652                final ManagedBuffer<ByteBuffer> buffer = outBuffer;
653                Codec.Result result = engine.encode(
654                    Codec.EMPTY_IN, buffer.backingBuffer(), !hasBody);
655                if (result.isOverflow()) {
656                    upstreamChannel().respond(Output.fromSink(buffer, false));
657                    continue;
658                }
659                if (hasBody) {
660                    // Keep buffer with incomplete response to be further
661                    // filled by subsequent Output events
662                    break;
663                }
664                // Response is complete
665                if (buffer.position() > 0) {
666                    upstreamChannel().respond(Output.fromSink(buffer, true));
667                } else {
668                    buffer.unlockBuffer();
669                }
670                outBuffer = null;
671                if (result.closeConnection()) {
672                    upstreamChannel().respond(new Close());
673                }
674                break;
675            }
676
677        }
678
679        /**
680         * Handle a {@link ProtocolSwitchAccepted} event from the 
681         * application layer.
682         *
683         * @param event the event
684         * @param appChannel the app channel
685         */
686        public void handleProtocolSwitchAccepted(
687                ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
688            appChannel.setAssociated(URI.class,
689                event.requestEvent().requestUri());
690            final HttpResponse response = event.requestEvent()
691                .httpRequest().response().get()
692                .setStatus(HttpStatus.SWITCHING_PROTOCOLS)
693                .setField(HttpField.UPGRADE,
694                    new StringList(event.protocol()));
695            // We send the Upgraded event only after the response has
696            // successfully been encoded (and thus checked).
697            pendingUpgraded = new Upgraded(event.resourceName(),
698                event.protocol());
699            respond(new Response(response));
700        }
701
702        /**
703         * Handle output from the application layer.
704         *
705         * @param event the event
706         * @throws InterruptedException the interrupted exception
707         */
708        @SuppressWarnings({ "PMD.CyclomaticComplexity", "PMD.NcssCount",
709            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
710            "PMD.CognitiveComplexity" })
711        public void handleAppOutput(Output<?> event)
712                throws InterruptedException {
713            Buffer eventData = event.data();
714            Buffer input;
715            if (eventData instanceof ByteBuffer) {
716                input = ((ByteBuffer) eventData).duplicate();
717            } else if (eventData instanceof CharBuffer) {
718                input = ((CharBuffer) eventData).duplicate();
719            } else {
720                return;
721            }
722            if (engine.switchedTo().equals(Optional.of("websocket"))
723                && currentWsMessage == null) {
724                // When switched to WebSockets, we only have Input and Output
725                // events. Add header automatically.
726                @SuppressWarnings("unchecked")
727                ServerEngine<?, MessageHeader> wsEngine
728                    = (ServerEngine<?, MessageHeader>) engine;
729                currentWsMessage = new WsMessageHeader(
730                    event.buffer().backingBuffer() instanceof CharBuffer,
731                    true);
732                wsEngine.encode(currentWsMessage);
733            }
734            while (input.hasRemaining() || event.isEndOfRecord()) {
735                if (outBuffer == null) {
736                    outBuffer = upstreamChannel().byteBufferPool().acquire();
737                }
738                Codec.Result result = engine.encode(input,
739                    outBuffer.backingBuffer(), event.isEndOfRecord());
740                if (result.isOverflow()) {
741                    upstreamChannel()
742                        .respond(Output.fromSink(outBuffer, false));
743                    outBuffer = upstreamChannel().byteBufferPool().acquire();
744                    continue;
745                }
746                if (event.isEndOfRecord() || result.closeConnection()) {
747                    if (outBuffer.position() > 0) {
748                        upstreamChannel()
749                            .respond(Output.fromSink(outBuffer, true));
750                    } else {
751                        outBuffer.unlockBuffer();
752                    }
753                    outBuffer = null;
754                    if (result.closeConnection()) {
755                        upstreamChannel().respond(new Close());
756                    }
757                    break;
758                }
759            }
760            if (engine.switchedTo().equals(Optional.of("websocket"))
761                && event.isEndOfRecord()) {
762                currentWsMessage = null;
763            }
764        }
765
766        /**
767         * Handle a {@link Close} event from the application layer.
768         *
769         * @param event the event
770         * @throws InterruptedException the interrupted exception
771         */
772        public void handleClose(Close event) throws InterruptedException {
773            if (engine.switchedTo().equals(Optional.of("websocket"))) {
774                fire(new Response(new WsCloseFrame(null, null)), this);
775                return;
776            }
777            upstreamChannel().respond(new Close());
778        }
779
780        /**
781         * Handle a {@link Closed} event from the network by forwarding
782         * it to the application layer.
783         *
784         * @param event the event
785         */
786        public void handleClosed(Closed<?> event) {
787            downPipeline.fire(new Closed<Void>(), this);
788        }
789
790        /**
791         * Handle a {@link Purge} event by forwarding it to the
792         * application layer.
793         *
794         * @param event the event
795         */
796        public void handlePurge(Purge event) {
797            downPipeline.fire(new Purge(), this);
798        }
799
800    }
801
802}