001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.lang.ref.WeakReference;
022import java.net.Inet4Address;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.URI;
026import java.net.UnknownHostException;
027import java.nio.Buffer;
028import java.nio.ByteBuffer;
029import java.nio.CharBuffer;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.List;
034import java.util.Optional;
035
036import javax.net.ssl.SNIHostName;
037import javax.net.ssl.SNIServerName;
038
039import org.jdrupes.httpcodec.Codec;
040import org.jdrupes.httpcodec.Decoder;
041import org.jdrupes.httpcodec.MessageHeader;
042import org.jdrupes.httpcodec.ProtocolException;
043import org.jdrupes.httpcodec.ServerEngine;
044import org.jdrupes.httpcodec.protocols.http.HttpConstants.HttpStatus;
045import org.jdrupes.httpcodec.protocols.http.HttpField;
046import org.jdrupes.httpcodec.protocols.http.HttpRequest;
047import org.jdrupes.httpcodec.protocols.http.HttpResponse;
048import org.jdrupes.httpcodec.protocols.http.server.HttpRequestDecoder;
049import org.jdrupes.httpcodec.protocols.http.server.HttpResponseEncoder;
050import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
051import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
052import org.jdrupes.httpcodec.types.Converters;
053import org.jdrupes.httpcodec.types.StringList;
054import org.jgrapes.core.Channel;
055import org.jgrapes.core.ClassChannel;
056import org.jgrapes.core.Component;
057import org.jgrapes.core.Components;
058import org.jgrapes.core.EventPipeline;
059import org.jgrapes.core.annotation.Handler;
060import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
061import org.jgrapes.core.internal.EventProcessor;
062import org.jgrapes.http.events.ProtocolSwitchAccepted;
063import org.jgrapes.http.events.Request;
064import org.jgrapes.http.events.Response;
065import org.jgrapes.http.events.Upgraded;
066import org.jgrapes.http.events.WebSocketClose;
067import org.jgrapes.io.IOSubchannel;
068import org.jgrapes.io.events.Close;
069import org.jgrapes.io.events.Closed;
070import org.jgrapes.io.events.Input;
071import org.jgrapes.io.events.Output;
072import org.jgrapes.io.events.Purge;
073import org.jgrapes.io.util.LinkedIOSubchannel;
074import org.jgrapes.io.util.ManagedBuffer;
075import org.jgrapes.io.util.ManagedBufferPool;
076import org.jgrapes.net.TcpServer;
077import org.jgrapes.net.events.Accepted;
078
079/**
080 * A converter component that receives and sends byte buffers on a 
081 * network channel and web application layer messages on
082 * {@link IOSubchannel}s of its channel. 
083 * 
084 * Each {@link IOSubchannel} represents a connection established by 
085 * the browser. The {@link HttpServer} fires {@link Request} events 
086 * (and {@link Input} events, if there is associated data) on the 
087 * subchannels. Web application components (short "weblets") handle 
088 * these events and use 
089 * {@link LinkedIOSubchannel#respond(org.jgrapes.core.Event)}
090 * to send {@link Response} events and, if applicable, {@link Output}
091 * events with data belonging to the response.
092 * 
093 * Events must be fired by weblets while handling the {@link Request}
094 * or {@link Input} events only (to be precise: while handling events 
095 * processed by the associated {@link EventProcessor}) to ensure
096 * that responses and their associated data do not interleave. 
097 */
098@SuppressWarnings("PMD.ExcessiveImports")
099public class HttpServer extends Component {
100
101    private WeakReference<Channel> networkChannelPassBack;
102    private List<Class<? extends Request.In>> providedFallbacks;
103    private int matchLevels = 1;
104    private boolean acceptNoSni;
105    private int applicationBufferSize = -1;
106
107    /**
108     * Denotes the network channel in handler annotations.
109     */
110    private static class NetworkChannel extends ClassChannel {
111    }
112
113    /**
114     * Create a new server that uses the {@code networkChannel} for network
115     * level I/O.
116     * <P>
117     * As a convenience the server can provide fall back handlers for the
118     * specified types of requests. The fall back handler simply returns 404 (
119     * "Not found").
120     * 
121     * @param appChannel
122     *            this component's channel
123     * @param networkChannel
124     *            the channel for network level I/O
125     * @param fallbacks
126     *            the requests for which a fall back handler is provided
127     */
128    @SafeVarargs
129    public HttpServer(Channel appChannel, Channel networkChannel,
130            Class<? extends Request.In>... fallbacks) {
131        super(appChannel, ChannelReplacements.create()
132            .add(NetworkChannel.class, networkChannel));
133        networkChannelPassBack = new WeakReference<Channel>(networkChannel);
134        this.providedFallbacks = Arrays.asList(fallbacks);
135    }
136
137    /**
138     * Create a new server that creates its own {@link TcpServer} with the given
139     * address and uses it for network level I/O.
140     * 
141     * @param appChannel
142     *            this component's channel
143     * @param serverAddress the address to listen on
144     * @param fallbacks fall backs
145     */
146    @SafeVarargs
147    public HttpServer(Channel appChannel, InetSocketAddress serverAddress,
148            Class<? extends Request.In>... fallbacks) {
149        this(appChannel, new TcpServer().setServerAddress(serverAddress),
150            fallbacks);
151        attach((TcpServer) networkChannelPassBack.get());
152    }
153
154    /**
155     * @return the matchLevels
156     */
157    public int matchLevels() {
158        return matchLevels;
159    }
160
161    /**
162     * Sets the number of elements from the request path used in the match value
163     * of the generated events (see {@link Request#defaultCriterion()}), defaults
164     * to 1.
165     * 
166     * @param matchLevels the matchLevels to set
167     * @return the http server for easy chaining
168     */
169    public HttpServer setMatchLevels(int matchLevels) {
170        this.matchLevels = matchLevels;
171        return this;
172    }
173
174    /**
175     * Sets the size of the buffers used for {@link Output} events
176     * on the application channel. Defaults to the upstream buffer size
177     * minus 512 (estimate for added protocol overhead).
178     * 
179     * @param applicationBufferSize the size to set
180     * @return the http server for easy chaining
181     */
182    public HttpServer setApplicationBufferSize(int applicationBufferSize) {
183        this.applicationBufferSize = applicationBufferSize;
184        return this;
185    }
186
187    /**
188     * Returns the size of the application side (receive) buffers.
189     * 
190     * @return the value or -1 if not set
191     */
192    public int applicationBufferSize() {
193        return applicationBufferSize;
194    }
195
196    /**
197     * Determines if request from secure (TLS) connections without
198     * SNI are accepted.
199     *  
200     * Secure (TLS) requests usually transfer the name of the server that
201     * they want to connect to during handshake. The HTTP server checks
202     * that the `Host` header field of decoded requests matches the
203     * name used to establish the connection. If, however, the connection
204     * is made using the IP-address, the client does not have a host name.
205     * If such connections are to be accepted, this flag, which
206     * defaults to `false`, must be set.
207     * 
208     * Note that in request accepted without SNI, the `Host` header field
209     * will be modified to contain the IP-address of the indicated host
210     * to prevent accidental matching with virtual host names.  
211     * 
212     * @param acceptNoSni the value to set
213     * @return the http server for easy chaining
214     */
215    public HttpServer setAcceptNoSni(boolean acceptNoSni) {
216        this.acceptNoSni = acceptNoSni;
217        return this;
218    }
219
220    /**
221     * Returns if secure (TLS) requests without SNI are allowed.
222     * 
223     * @return the result
224     */
225    public boolean acceptNoSni() {
226        return acceptNoSni;
227    }
228
229    /**
230     * Creates a new downstream connection as {@link LinkedIOSubchannel} 
231     * of the network connection, a {@link HttpRequestDecoder} and a
232     * {@link HttpResponseEncoder}.
233     * 
234     * @param event
235     *            the accepted event
236     */
237    @Handler(channels = NetworkChannel.class)
238    public void onAccepted(Accepted event, IOSubchannel netChannel) {
239        new WebAppMsgChannel(event, netChannel);
240    }
241
242    /**
243     * Handles data from the client (from upstream). The data is send through 
244     * the {@link HttpRequestDecoder} and events are sent downstream according
245     * to the decoding results.
246     * 
247     * @param event the event
248     * @throws ProtocolException if a protocol exception occurs
249     * @throws InterruptedException 
250     */
251    @Handler(channels = NetworkChannel.class)
252    public void onInput(
253            Input<ByteBuffer> event, IOSubchannel netChannel)
254            throws ProtocolException, InterruptedException {
255        @SuppressWarnings("unchecked")
256        final Optional<WebAppMsgChannel> appChannel
257            = (Optional<WebAppMsgChannel>) LinkedIOSubchannel
258                .downstreamChannel(this, netChannel);
259        if (appChannel.isPresent()) {
260            appChannel.get().handleNetInput(event);
261        }
262    }
263
264    /**
265     * Forwards a {@link Closed} event to the application channel. 
266     *
267     * @param event the event
268     * @param netChannel the net channel
269     */
270    @Handler(channels = NetworkChannel.class)
271    public void onClosed(Closed event, IOSubchannel netChannel) {
272        LinkedIOSubchannel.downstreamChannel(this, netChannel,
273            WebAppMsgChannel.class).ifPresent(appChannel -> {
274                appChannel.handleClosed(event);
275            });
276    }
277
278    /**
279     * Forwards a {@link Purge} event to the application channel.
280     *
281     * @param event the event
282     * @param netChannel the net channel
283     */
284    @Handler(channels = NetworkChannel.class)
285    public void onPurge(Purge event, IOSubchannel netChannel) {
286        LinkedIOSubchannel.downstreamChannel(this, netChannel,
287            WebAppMsgChannel.class).ifPresent(appChannel -> {
288                appChannel.handlePurge(event);
289            });
290    }
291
292    /**
293     * Handles a response event from downstream by sending it through an
294     * {@link HttpResponseEncoder} that generates the data (encoded information)
295     * and sends it upstream with {@link Output} events. Depending on whether 
296     * the response has a body, subsequent {@link Output} events can
297     * follow.
298     * 
299     * @param event
300     *            the response event
301     * @throws InterruptedException if the execution was interrupted
302     */
303    @Handler
304    public void onResponse(Response event, WebAppMsgChannel appChannel)
305            throws InterruptedException {
306        appChannel.handleResponse(event);
307    }
308
309    /**
310     * Receives the message body of a response. A {@link Response} event that
311     * has a message body can be followed by one or more {@link Output} events
312     * from downstream that contain the data. An {@code Output} event
313     * with the end of record flag set signals the end of the message body.
314     * 
315     * @param event
316     *            the event with the data
317     * @throws InterruptedException if the execution was interrupted
318     */
319    @Handler
320    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
321            throws InterruptedException {
322        appChannel.handleAppOutput(event);
323    }
324
325    /**
326     * Handles a close event from downstream by closing the upstream
327     * connections.
328     * 
329     * @param event
330     *            the close event
331     * @throws InterruptedException if the execution was interrupted
332     */
333    @Handler
334    public void onClose(Close event, WebAppMsgChannel appChannel)
335            throws InterruptedException {
336        appChannel.handleClose(event);
337    }
338
339    /**
340     * Checks whether the request has been handled (value of {@link Request}
341     * event set to `true`) or the status code in the prepared response
342     * is no longer "Not Implemented". If not, but a fall back has been set, 
343     * send a "Not Found" response. If this isn't the case either, send 
344     * the default response ("Not implemented") to the client.
345     * 
346     * @param event
347     *            the request completed event
348     * @param appChannel the application channel 
349     * @throws InterruptedException if the execution was interrupted
350     */
351    @Handler
352    public void onRequestCompleted(
353            Request.In.Completed event, IOSubchannel appChannel)
354            throws InterruptedException {
355        final Request.In requestEvent = event.event();
356        // A check that also works with null.
357        if (Boolean.TRUE.equals(requestEvent.get())
358            || requestEvent.httpRequest().response().map(
359                response -> response.statusCode() != HttpStatus.NOT_IMPLEMENTED
360                    .statusCode())
361                .orElse(false)) {
362            // Some other component has taken care
363            return;
364        }
365
366        // Check if "Not Found" should be sent
367        if (providedFallbacks != null
368            && providedFallbacks.contains(requestEvent.getClass())) {
369            ResponseCreationSupport.sendResponse(
370                requestEvent.httpRequest(), appChannel, HttpStatus.NOT_FOUND);
371            return;
372        }
373
374        // Last resort
375        ResponseCreationSupport.sendResponse(requestEvent.httpRequest(),
376            appChannel, HttpStatus.NOT_IMPLEMENTED);
377    }
378
379    /**
380     * Provides a fallback handler for an OPTIONS request with asterisk. Simply
381     * responds with "OK".
382     * 
383     * @param event the event
384     * @param appChannel the application channel
385     */
386    @Handler(priority = Integer.MIN_VALUE)
387    public void onOptions(Request.In.Options event, IOSubchannel appChannel) {
388        if (event.requestUri() == HttpRequest.ASTERISK_REQUEST) {
389            HttpResponse response = event.httpRequest().response().get();
390            response.setStatus(HttpStatus.OK);
391            appChannel.respond(new Response(response));
392            event.setResult(true);
393            event.stop();
394        }
395    }
396
397    /**
398     * Send the response indicating that the protocol switch was accepted
399     * and causes subsequent data to be handled as {@link Input} and
400     * {@link Output} events on the channel.
401     * 
402     * As a convenience, the channel is associates with the URI that
403     * was used to request the protocol switch using {@link URI} as key.
404     * 
405     * @param event the event
406     * @param appChannel the channel
407     */
408    @Handler
409    public void onProtocolSwitchAccepted(
410            ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
411        appChannel.handleProtocolSwitchAccepted(event, appChannel);
412    }
413
414    /**
415     * An application layer channel.
416     */
417    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
418    private class WebAppMsgChannel extends LinkedIOSubchannel {
419        // Starts as ServerEngine<HttpRequest,HttpResponse> but may change
420        private final ServerEngine<?, ?> engine;
421        private ManagedBuffer<ByteBuffer> outBuffer;
422        private final boolean secure;
423        private List<String> snis = Collections.emptyList();
424        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
425                ByteBuffer> byteBufferPool;
426        private final ManagedBufferPool<ManagedBuffer<CharBuffer>,
427                CharBuffer> charBufferPool;
428        private ManagedBufferPool<?, ?> currentPool;
429        private final EventPipeline downPipeline;
430        private Upgraded pendingUpgraded;
431        private WsMessageHeader currentWsMessage;
432
433        /**
434         * Instantiates a new channel.
435         *
436         * @param event the event
437         * @param netChannel the net channel
438         */
439        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
440        public WebAppMsgChannel(Accepted event, IOSubchannel netChannel) {
441            super(HttpServer.this, channel(), netChannel, newEventPipeline());
442            engine = new ServerEngine<>(
443                new HttpRequestDecoder(), new HttpResponseEncoder());
444            secure = event.isSecure();
445            if (secure) {
446                snis = new ArrayList<>();
447                for (SNIServerName sni : event.requestedServerNames()) {
448                    if (sni instanceof SNIHostName) {
449                        snis.add(((SNIHostName) sni).getAsciiName());
450                    }
451                }
452            }
453
454            // Calculate "good" application buffer size
455            int bufferSize = applicationBufferSize;
456            if (bufferSize <= 0) {
457                bufferSize = netChannel.byteBufferPool().bufferSize() - 512;
458                if (bufferSize < 4096) {
459                    bufferSize = 4096;
460                }
461            }
462
463            String channelName = Components.objectName(HttpServer.this)
464                + "." + Components.objectName(this);
465            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
466            charBufferPool().setName(channelName + ".upstream.charBuffers");
467            // Allocate downstream buffer pools. Note that decoding WebSocket
468            // network packets may result in several WS frames that are each
469            // delivered in independent events. Therefore provide some
470            // additional buffers.
471            final int bufSize = bufferSize;
472            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
473                () -> {
474                    return ByteBuffer.allocate(bufSize);
475                }, 2, 100)
476                    .setName(channelName + ".downstream.byteBuffers");
477            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
478                () -> {
479                    return CharBuffer.allocate(bufSize);
480                }, 2, 100)
481                    .setName(channelName + ".downstream.charBuffers");
482
483            // Downstream pipeline
484            downPipeline = newEventPipeline();
485        }
486
487        /**
488         * Handle {@link Input} events from the network.
489         *
490         * @param event the event
491         * @throws ProtocolException the protocol exception
492         * @throws InterruptedException the interrupted exception
493         */
494        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
495            "PMD.AvoidInstantiatingObjectsInLoops",
496            "PMD.AvoidDeeplyNestedIfStmts", "PMD.CollapsibleIfStatements" })
497        public void handleNetInput(Input<ByteBuffer> event)
498                throws ProtocolException, InterruptedException {
499            // Send the data from the event through the decoder.
500            ByteBuffer inData = event.data();
501            // Don't unnecessary allocate a buffer, may be header only message
502            ManagedBuffer<?> bodyData = null;
503            boolean wasOverflow = false;
504            while (inData.hasRemaining()) {
505                if (wasOverflow) {
506                    // Message has (more) body
507                    bodyData = currentPool.acquire();
508                }
509                Decoder.Result<?> result = engine.decode(inData,
510                    bodyData == null ? null : bodyData.backingBuffer(),
511                    event.isEndOfRecord());
512                if (result.response().isPresent()) {
513                    // Feedback required, send it
514                    responsePipeline().overrideRestriction().fire(
515                        new Response(result.response().get()), this);
516                    if (result.isResponseOnly()) {
517                        maybeCloseConnection(result);
518                        continue;
519                    }
520                }
521                if (result.isHeaderCompleted()) {
522                    if (!handleRequestHeader(engine.currentRequest().get())) {
523                        maybeCloseConnection(result);
524                        break;
525                    }
526                }
527                if (bodyData != null) {
528                    if (bodyData.position() > 0) {
529                        downPipeline.fire(Input.fromSink(
530                            bodyData, !result.isOverflow()
531                                && !result.isUnderflow()),
532                            this);
533                    } else {
534                        bodyData.unlockBuffer();
535                    }
536                    bodyData = null;
537                }
538                maybeCloseConnection(result);
539                wasOverflow = result.isOverflow();
540            }
541        }
542
543        private void maybeCloseConnection(Decoder.Result<?> result) {
544            if (result.closeConnection()) {
545                respond(new Close());
546            }
547        }
548
549        @SuppressWarnings("PMD.CollapsibleIfStatements")
550        private boolean handleRequestHeader(MessageHeader request) {
551            if (request instanceof HttpRequest) {
552                HttpRequest httpRequest = (HttpRequest) request;
553                if (httpRequest.hasPayload()) {
554                    if (httpRequest.findValue(
555                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
556                        .map(type -> type.value().topLevelType()
557                            .equalsIgnoreCase("text"))
558                        .orElse(false)) {
559                        currentPool = charBufferPool;
560                    } else {
561                        currentPool = byteBufferPool;
562                    }
563                }
564                if (secure) {
565                    if (!snis.contains(httpRequest.host())) {
566                        if (acceptNoSni && snis.isEmpty()) {
567                            convertHostToNumerical(httpRequest);
568                        } else {
569                            ResponseCreationSupport.sendResponse(httpRequest,
570                                this, 421, "Misdirected Request");
571                            return false;
572                        }
573                    }
574                }
575                downPipeline.fire(Request.In.fromHttpRequest(httpRequest,
576                    secure, matchLevels), this);
577            } else if (request instanceof WsMessageHeader) {
578                WsMessageHeader wsMessage = (WsMessageHeader) request;
579                if (wsMessage.hasPayload()) {
580                    if (wsMessage.isTextMode()) {
581                        currentPool = charBufferPool;
582                    } else {
583                        currentPool = byteBufferPool;
584                    }
585                }
586            } else if (request instanceof WsCloseFrame) {
587                downPipeline.fire(
588                    new WebSocketClose((WsCloseFrame) request, this));
589            }
590            return true;
591        }
592
593        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
594            "PMD.UseStringBufferForStringAppends" })
595        private void convertHostToNumerical(HttpRequest request) {
596            int port = request.port();
597            String host;
598            try {
599                InetAddress addr = InetAddress.getByName(
600                    request.host());
601                host = addr.getHostAddress();
602                if (!(addr instanceof Inet4Address)) {
603                    host = "[" + host + "]";
604                }
605            } catch (UnknownHostException e) {
606                host = InetAddress.getLoopbackAddress().getHostAddress();
607            }
608            request.setHostAndPort(host, port);
609        }
610
611        /**
612         * Handle a response event from the application layer.
613         *
614         * @param event the event
615         * @throws InterruptedException the interrupted exception
616         */
617        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
618            "PMD.AvoidBranchingStatementAsLastInLoop" })
619        public void handleResponse(Response event) throws InterruptedException {
620            if (!engine.encoding()
621                .isAssignableFrom(event.response().getClass())) {
622                return;
623            }
624            final MessageHeader response = event.response();
625            // Start sending the response
626            @SuppressWarnings("unchecked")
627            ServerEngine<?, MessageHeader> msgEngine
628                = (ServerEngine<?, MessageHeader>) engine;
629            msgEngine.encode(response);
630            if (pendingUpgraded != null) {
631                if (response instanceof HttpResponse
632                    && ((HttpResponse) response).statusCode() % 100 == 1) {
633                    downPipeline.fire(pendingUpgraded, this);
634                }
635                pendingUpgraded = null;
636            }
637            boolean hasBody = response.hasPayload();
638            while (true) {
639                outBuffer = upstreamChannel().byteBufferPool().acquire();
640                final ManagedBuffer<ByteBuffer> buffer = outBuffer;
641                Codec.Result result = engine.encode(
642                    Codec.EMPTY_IN, buffer.backingBuffer(), !hasBody);
643                if (result.isOverflow()) {
644                    upstreamChannel().respond(Output.fromSink(buffer, false));
645                    continue;
646                }
647                if (hasBody) {
648                    // Keep buffer with incomplete response to be further
649                    // filled by subsequent Output events
650                    break;
651                }
652                // Response is complete
653                if (buffer.position() > 0) {
654                    upstreamChannel().respond(Output.fromSink(buffer, true));
655                } else {
656                    buffer.unlockBuffer();
657                }
658                outBuffer = null;
659                if (result.closeConnection()) {
660                    upstreamChannel().respond(new Close());
661                }
662                break;
663            }
664
665        }
666
667        /**
668         * Handle a {@link ProtocolSwitchAccepted} event from the 
669         * application layer.
670         *
671         * @param event the event
672         * @param appChannel the app channel
673         */
674        public void handleProtocolSwitchAccepted(
675                ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) {
676            appChannel.setAssociated(URI.class,
677                event.requestEvent().requestUri());
678            final HttpResponse response = event.requestEvent()
679                .httpRequest().response().get()
680                .setStatus(HttpStatus.SWITCHING_PROTOCOLS)
681                .setField(HttpField.UPGRADE,
682                    new StringList(event.protocol()));
683            // We send the Upgraded event only after the response has
684            // successfully been encoded (and thus checked).
685            pendingUpgraded = new Upgraded(event.resourceName(),
686                event.protocol());
687            respond(new Response(response));
688        }
689
690        /**
691         * Handle output from the application layer.
692         *
693         * @param event the event
694         * @throws InterruptedException the interrupted exception
695         */
696        @SuppressWarnings({ "PMD.CyclomaticComplexity", "PMD.NcssCount",
697            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops" })
698        public void handleAppOutput(Output<?> event)
699                throws InterruptedException {
700            Buffer eventData = event.data();
701            Buffer input;
702            if (eventData instanceof ByteBuffer) {
703                input = ((ByteBuffer) eventData).duplicate();
704            } else if (eventData instanceof CharBuffer) {
705                input = ((CharBuffer) eventData).duplicate();
706            } else {
707                return;
708            }
709            if (engine.switchedTo().equals(Optional.of("websocket"))
710                && currentWsMessage == null) {
711                // When switched to WebSockets, we only have Input and Output
712                // events. Add header automatically.
713                @SuppressWarnings("unchecked")
714                ServerEngine<?, MessageHeader> wsEngine
715                    = (ServerEngine<?, MessageHeader>) engine;
716                currentWsMessage = new WsMessageHeader(
717                    event.buffer().backingBuffer() instanceof CharBuffer,
718                    true);
719                wsEngine.encode(currentWsMessage);
720            }
721            while (input.hasRemaining() || event.isEndOfRecord()) {
722                if (outBuffer == null) {
723                    outBuffer = upstreamChannel().byteBufferPool().acquire();
724                }
725                Codec.Result result = engine.encode(input,
726                    outBuffer.backingBuffer(), event.isEndOfRecord());
727                if (result.isOverflow()) {
728                    upstreamChannel()
729                        .respond(Output.fromSink(outBuffer, false));
730                    outBuffer = upstreamChannel().byteBufferPool().acquire();
731                    continue;
732                }
733                if (event.isEndOfRecord() || result.closeConnection()) {
734                    if (outBuffer.position() > 0) {
735                        upstreamChannel()
736                            .respond(Output.fromSink(outBuffer, true));
737                    } else {
738                        outBuffer.unlockBuffer();
739                    }
740                    outBuffer = null;
741                    if (result.closeConnection()) {
742                        upstreamChannel().respond(new Close());
743                    }
744                    break;
745                }
746            }
747            if (engine.switchedTo().equals(Optional.of("websocket"))
748                && event.isEndOfRecord()) {
749                currentWsMessage = null;
750            }
751        }
752
753        /**
754         * Handle a {@link Close} event from the application layer.
755         *
756         * @param event the event
757         * @throws InterruptedException the interrupted exception
758         */
759        public void handleClose(Close event) throws InterruptedException {
760            if (engine.switchedTo().equals(Optional.of("websocket"))) {
761                fire(new Response(new WsCloseFrame(null, null)), this);
762                return;
763            }
764            upstreamChannel().respond(new Close());
765        }
766
767        /**
768         * Handle a {@link Closed} event from the network by forwarding
769         * it to the application layer.
770         *
771         * @param event the event
772         */
773        public void handleClosed(Closed event) {
774            downPipeline.fire(new Closed(), this);
775        }
776
777        /**
778         * Handle a {@link Purge} event by forwarding it to the
779         * application layer.
780         *
781         * @param event the event
782         */
783        public void handlePurge(Purge event) {
784            downPipeline.fire(new Purge(), this);
785        }
786
787    }
788
789}