001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.http; 020 021import java.lang.ref.WeakReference; 022import java.net.Inet4Address; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.net.UnknownHostException; 028import java.nio.Buffer; 029import java.nio.ByteBuffer; 030import java.nio.CharBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Optional; 036import javax.net.ssl.SNIHostName; 037import javax.net.ssl.SNIServerName; 038import org.jdrupes.httpcodec.Codec; 039import org.jdrupes.httpcodec.Decoder; 040import org.jdrupes.httpcodec.MessageHeader; 041import org.jdrupes.httpcodec.ProtocolException; 042import org.jdrupes.httpcodec.ServerEngine; 043import org.jdrupes.httpcodec.protocols.http.HttpConstants.HttpStatus; 044import org.jdrupes.httpcodec.protocols.http.HttpField; 045import org.jdrupes.httpcodec.protocols.http.HttpRequest; 046import org.jdrupes.httpcodec.protocols.http.HttpResponse; 047import org.jdrupes.httpcodec.protocols.http.server.HttpRequestDecoder; 048import org.jdrupes.httpcodec.protocols.http.server.HttpResponseEncoder; 049import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame; 050import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader; 051import org.jdrupes.httpcodec.types.Converters; 052import org.jdrupes.httpcodec.types.StringList; 053import org.jgrapes.core.Channel; 054import org.jgrapes.core.ClassChannel; 055import org.jgrapes.core.Component; 056import org.jgrapes.core.Components; 057import org.jgrapes.core.EventPipeline; 058import org.jgrapes.core.annotation.Handler; 059import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements; 060import org.jgrapes.core.internal.EventProcessor; 061import org.jgrapes.http.events.ProtocolSwitchAccepted; 062import org.jgrapes.http.events.Request; 063import org.jgrapes.http.events.Response; 064import org.jgrapes.http.events.Upgraded; 065import org.jgrapes.http.events.WebSocketClose; 066import org.jgrapes.io.IOSubchannel; 067import org.jgrapes.io.events.Close; 068import org.jgrapes.io.events.Closed; 069import org.jgrapes.io.events.Input; 070import org.jgrapes.io.events.Output; 071import org.jgrapes.io.events.Purge; 072import org.jgrapes.io.util.LinkedIOSubchannel; 073import org.jgrapes.io.util.ManagedBuffer; 074import org.jgrapes.io.util.ManagedBufferPool; 075import org.jgrapes.net.SocketServer; 076import org.jgrapes.net.events.Accepted; 077 078/** 079 * A converter component that receives and sends byte buffers on a 080 * network channel and web application layer messages on 081 * {@link IOSubchannel}s of its channel. 082 * 083 * Each {@link IOSubchannel} represents a connection established by 084 * the browser. The {@link HttpServer} fires {@link Request} events 085 * (and {@link Input} events, if there is associated data) on the 086 * subchannels. Web application components (short "weblets") handle 087 * these events and use 088 * {@link LinkedIOSubchannel#respond(org.jgrapes.core.Event)} 089 * to send {@link Response} events and, if applicable, {@link Output} 090 * events with data belonging to the response. 091 * 092 * Events must be fired by weblets while handling the {@link Request} 093 * or {@link Input} events only (to be precise: while handling events 094 * processed by the associated {@link EventProcessor}) to ensure 095 * that responses and their associated data do not interleave. 096 */ 097@SuppressWarnings("PMD.ExcessiveImports") 098public class HttpServer extends Component { 099 100 private WeakReference<Channel> networkChannelPassBack; 101 private List<Class<? extends Request.In>> providedFallbacks; 102 private int matchLevels = 1; 103 private boolean acceptNoSni; 104 private int applicationBufferSize = -1; 105 106 /** 107 * Denotes the network channel in handler annotations. 108 */ 109 private static class NetworkChannel extends ClassChannel { 110 } 111 112 /** 113 * Create a new server that uses the {@code networkChannel} for network 114 * level I/O. 115 * <P> 116 * As a convenience the server can provide fall back handlers for the 117 * specified types of requests. The fall back handler simply returns 404 ( 118 * "Not found"). 119 * 120 * @param appChannel 121 * this component's channel 122 * @param networkChannel 123 * the channel for network level I/O 124 * @param fallbacks 125 * the requests for which a fall back handler is provided 126 */ 127 @SafeVarargs 128 public HttpServer(Channel appChannel, Channel networkChannel, 129 Class<? extends Request.In>... fallbacks) { 130 super(appChannel, ChannelReplacements.create() 131 .add(NetworkChannel.class, networkChannel)); 132 networkChannelPassBack = new WeakReference<>(networkChannel); 133 this.providedFallbacks = Arrays.asList(fallbacks); 134 } 135 136 /** 137 * Create a new server that creates its own {@link SocketServer} with 138 * the given address and uses it for network level I/O. 139 * 140 * @param appChannel 141 * this component's channel 142 * @param serverAddress the address to listen on 143 * @param fallbacks fall backs 144 */ 145 @SafeVarargs 146 public HttpServer(Channel appChannel, InetSocketAddress serverAddress, 147 Class<? extends Request.In>... fallbacks) { 148 this(appChannel, new SocketServer().setServerAddress(serverAddress), 149 fallbacks); 150 attach((SocketServer) networkChannelPassBack.get()); 151 } 152 153 /** 154 * @return the matchLevels 155 */ 156 public int matchLevels() { 157 return matchLevels; 158 } 159 160 /** 161 * Sets the number of elements from the request path used in the match value 162 * of the generated events (see {@link Request#defaultCriterion()}), defaults 163 * to 1. 164 * 165 * @param matchLevels the matchLevels to set 166 * @return the http server for easy chaining 167 */ 168 public HttpServer setMatchLevels(int matchLevels) { 169 this.matchLevels = matchLevels; 170 return this; 171 } 172 173 /** 174 * Sets the size of the buffers used for {@link Output} events 175 * on the application channel. Defaults to the upstream buffer size 176 * minus 512 (estimate for added protocol overhead). 177 * 178 * @param applicationBufferSize the size to set 179 * @return the http server for easy chaining 180 */ 181 public HttpServer setApplicationBufferSize(int applicationBufferSize) { 182 this.applicationBufferSize = applicationBufferSize; 183 return this; 184 } 185 186 /** 187 * Returns the size of the application side (receive) buffers. 188 * 189 * @return the value or -1 if not set 190 */ 191 public int applicationBufferSize() { 192 return applicationBufferSize; 193 } 194 195 /** 196 * Determines if request from secure (TLS) connections without 197 * SNI are accepted. 198 * 199 * Secure (TLS) requests usually transfer the name of the server that 200 * they want to connect to during handshake. The HTTP server checks 201 * that the `Host` header field of decoded requests matches the 202 * name used to establish the connection. If, however, the connection 203 * is made using the IP-address, the client does not have a host name. 204 * If such connections are to be accepted, this flag, which 205 * defaults to `false`, must be set. 206 * 207 * Note that in request accepted without SNI, the `Host` header field 208 * will be modified to contain the IP-address of the indicated host 209 * to prevent accidental matching with virtual host names. 210 * 211 * @param acceptNoSni the value to set 212 * @return the http server for easy chaining 213 */ 214 public HttpServer setAcceptNoSni(boolean acceptNoSni) { 215 this.acceptNoSni = acceptNoSni; 216 return this; 217 } 218 219 /** 220 * Returns if secure (TLS) requests without SNI are allowed. 221 * 222 * @return the result 223 */ 224 public boolean acceptNoSni() { 225 return acceptNoSni; 226 } 227 228 /** 229 * Creates a new downstream connection as {@link LinkedIOSubchannel} 230 * of the network connection, a {@link HttpRequestDecoder} and a 231 * {@link HttpResponseEncoder}. 232 * 233 * @param event 234 * the accepted event 235 */ 236 @Handler(channels = NetworkChannel.class) 237 public void onAccepted(Accepted event, IOSubchannel netChannel) { 238 new WebAppMsgChannel(event, netChannel); 239 } 240 241 /** 242 * Handles data from the client (from upstream). The data is send through 243 * the {@link HttpRequestDecoder} and events are sent downstream according 244 * to the decoding results. 245 * 246 * @param event the event 247 * @throws ProtocolException if a protocol exception occurs 248 * @throws InterruptedException 249 */ 250 @Handler(channels = NetworkChannel.class) 251 public void onInput( 252 Input<ByteBuffer> event, IOSubchannel netChannel) 253 throws ProtocolException, InterruptedException { 254 @SuppressWarnings("unchecked") 255 final Optional<WebAppMsgChannel> appChannel 256 = (Optional<WebAppMsgChannel>) LinkedIOSubchannel 257 .downstreamChannel(this, netChannel); 258 if (appChannel.isPresent()) { 259 appChannel.get().handleNetInput(event); 260 } 261 } 262 263 /** 264 * Forwards a {@link Closed} event to the application channel. 265 * 266 * @param event the event 267 * @param netChannel the net channel 268 */ 269 @Handler(channels = NetworkChannel.class) 270 public void onClosed(Closed<?> event, IOSubchannel netChannel) { 271 LinkedIOSubchannel.downstreamChannel(this, netChannel, 272 WebAppMsgChannel.class).ifPresent(appChannel -> { 273 appChannel.handleClosed(event); 274 }); 275 } 276 277 /** 278 * Forwards a {@link Purge} event to the application channel. 279 * 280 * @param event the event 281 * @param netChannel the net channel 282 */ 283 @Handler(channels = NetworkChannel.class) 284 public void onPurge(Purge event, IOSubchannel netChannel) { 285 LinkedIOSubchannel.downstreamChannel(this, netChannel, 286 WebAppMsgChannel.class).ifPresent(appChannel -> { 287 appChannel.handlePurge(event); 288 }); 289 } 290 291 /** 292 * Handles a response event from downstream by sending it through an 293 * {@link HttpResponseEncoder} that generates the data (encoded information) 294 * and sends it upstream with {@link Output} events. Depending on whether 295 * the response has a body, subsequent {@link Output} events can 296 * follow. 297 * 298 * @param event 299 * the response event 300 * @throws InterruptedException if the execution was interrupted 301 */ 302 @Handler 303 public void onResponse(Response event, WebAppMsgChannel appChannel) 304 throws InterruptedException { 305 appChannel.handleResponse(event); 306 } 307 308 /** 309 * Receives the message body of a response. A {@link Response} event that 310 * has a message body can be followed by one or more {@link Output} events 311 * from downstream that contain the data. An {@code Output} event 312 * with the end of record flag set signals the end of the message body. 313 * 314 * @param event 315 * the event with the data 316 * @throws InterruptedException if the execution was interrupted 317 */ 318 @Handler 319 public void onOutput(Output<?> event, WebAppMsgChannel appChannel) 320 throws InterruptedException { 321 appChannel.handleAppOutput(event); 322 } 323 324 /** 325 * Handles a close event from downstream by closing the upstream 326 * connections. 327 * 328 * @param event 329 * the close event 330 * @throws InterruptedException if the execution was interrupted 331 */ 332 @Handler 333 public void onClose(Close event, WebAppMsgChannel appChannel) 334 throws InterruptedException { 335 appChannel.handleClose(event); 336 } 337 338 /** 339 * Checks whether the request has been handled (value of {@link Request} 340 * event set to `true`) or the status code in the prepared response 341 * is no longer "Not Implemented". If not, but a fall back has been set, 342 * send a "Not Found" response. If this isn't the case either, send 343 * the default response ("Not implemented") to the client. 344 * 345 * @param event 346 * the request completed event 347 * @param appChannel the application channel 348 * @throws InterruptedException if the execution was interrupted 349 */ 350 @Handler 351 public void onRequestCompleted( 352 Request.In.Completed event, IOSubchannel appChannel) 353 throws InterruptedException { 354 final Request.In requestEvent = event.event(); 355 // A check that also works with null. 356 if (Boolean.TRUE.equals(requestEvent.get()) 357 || requestEvent.httpRequest().response().map( 358 response -> response.statusCode() != HttpStatus.NOT_IMPLEMENTED 359 .statusCode()) 360 .orElse(false)) { 361 // Some other component has taken care 362 return; 363 } 364 365 // Check if "Not Found" should be sent 366 if (providedFallbacks != null 367 && providedFallbacks.contains(requestEvent.getClass())) { 368 ResponseCreationSupport.sendResponse( 369 requestEvent.httpRequest(), appChannel, HttpStatus.NOT_FOUND); 370 return; 371 } 372 373 // Last resort 374 ResponseCreationSupport.sendResponse(requestEvent.httpRequest(), 375 appChannel, HttpStatus.NOT_IMPLEMENTED); 376 } 377 378 /** 379 * Provides a fallback handler for an OPTIONS request with asterisk. Simply 380 * responds with "OK". 381 * 382 * @param event the event 383 * @param appChannel the application channel 384 */ 385 @Handler(priority = Integer.MIN_VALUE) 386 public void onOptions(Request.In.Options event, IOSubchannel appChannel) { 387 if (event.requestUri() == HttpRequest.ASTERISK_REQUEST) { 388 HttpResponse response = event.httpRequest().response().get(); 389 response.setStatus(HttpStatus.OK); 390 appChannel.respond(new Response(response)); 391 event.setResult(true); 392 event.stop(); 393 } 394 } 395 396 /** 397 * Send the response indicating that the protocol switch was accepted 398 * and causes subsequent data to be handled as {@link Input} and 399 * {@link Output} events on the channel. 400 * 401 * As a convenience, the channel is associates with the URI that 402 * was used to request the protocol switch using {@link URI} as key. 403 * 404 * @param event the event 405 * @param appChannel the channel 406 */ 407 @Handler 408 public void onProtocolSwitchAccepted( 409 ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) { 410 appChannel.handleProtocolSwitchAccepted(event, appChannel); 411 } 412 413 /** 414 * An application layer channel. 415 */ 416 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 417 private class WebAppMsgChannel extends LinkedIOSubchannel { 418 // Starts as ServerEngine<HttpRequest,HttpResponse> but may change 419 private final ServerEngine<?, ?> engine; 420 private ManagedBuffer<ByteBuffer> outBuffer; 421 private final boolean secure; 422 private List<String> snis = Collections.emptyList(); 423 private final ManagedBufferPool<ManagedBuffer<ByteBuffer>, 424 ByteBuffer> byteBufferPool; 425 private final ManagedBufferPool<ManagedBuffer<CharBuffer>, 426 CharBuffer> charBufferPool; 427 private ManagedBufferPool<?, ?> currentPool; 428 private final EventPipeline downPipeline; 429 private Upgraded pendingUpgraded; 430 private WsMessageHeader currentWsMessage; 431 432 /** 433 * Instantiates a new channel. 434 * 435 * @param event the event 436 * @param netChannel the net channel 437 */ 438 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 439 public WebAppMsgChannel(Accepted event, IOSubchannel netChannel) { 440 super(HttpServer.this, channel(), netChannel, newEventPipeline()); 441 engine = new ServerEngine<>( 442 new HttpRequestDecoder(), new HttpResponseEncoder()); 443 secure = event.isSecure(); 444 if (secure) { 445 snis = new ArrayList<>(); 446 for (SNIServerName sni : event.requestedServerNames()) { 447 if (sni instanceof SNIHostName) { 448 snis.add(((SNIHostName) sni).getAsciiName()); 449 } 450 } 451 } 452 453 // Calculate "good" application buffer size 454 int bufferSize = applicationBufferSize; 455 if (bufferSize <= 0) { 456 bufferSize = netChannel.byteBufferPool().bufferSize() - 512; 457 if (bufferSize < 4096) { 458 bufferSize = 4096; 459 } 460 } 461 462 String channelName = Components.objectName(HttpServer.this) 463 + "." + Components.objectName(this); 464 byteBufferPool().setName(channelName + ".upstream.byteBuffers"); 465 charBufferPool().setName(channelName + ".upstream.charBuffers"); 466 // Allocate downstream buffer pools. Note that decoding WebSocket 467 // network packets may result in several WS frames that are each 468 // delivered in independent events. Therefore provide some 469 // additional buffers. 470 final int bufSize = bufferSize; 471 byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 472 () -> { 473 return ByteBuffer.allocate(bufSize); 474 }, 2, 100) 475 .setName(channelName + ".downstream.byteBuffers"); 476 charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 477 () -> { 478 return CharBuffer.allocate(bufSize); 479 }, 2, 100) 480 .setName(channelName + ".downstream.charBuffers"); 481 482 // Downstream pipeline 483 downPipeline = newEventPipeline(); 484 } 485 486 /** 487 * Handle {@link Input} events from the network. 488 * 489 * @param event the event 490 * @throws ProtocolException the protocol exception 491 * @throws InterruptedException the interrupted exception 492 */ 493 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 494 "PMD.AvoidInstantiatingObjectsInLoops", 495 "PMD.AvoidDeeplyNestedIfStmts", "PMD.CollapsibleIfStatements", 496 "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" }) 497 public void handleNetInput(Input<ByteBuffer> event) 498 throws ProtocolException, InterruptedException { 499 // Send the data from the event through the decoder. 500 ByteBuffer inData = event.data(); 501 // Don't unnecessary allocate a buffer, may be header only message 502 ManagedBuffer<?> bodyData = null; 503 boolean wasOverflow = false; 504 while (inData.hasRemaining()) { 505 if (wasOverflow) { 506 // Message has (more) body 507 bodyData = currentPool.acquire(); 508 } 509 Decoder.Result<?> result = engine.decode(inData, 510 bodyData == null ? null : bodyData.backingBuffer(), 511 event.isEndOfRecord()); 512 if (result.response().isPresent()) { 513 // Feedback required, send it "in sync", even if 514 // event source is not the regular one. 515 responsePipeline().overrideRestriction().fire( 516 new Response(result.response().get()), this); 517 if (result.isResponseOnly()) { 518 maybeCloseConnection(result); 519 continue; 520 } 521 } 522 if (result.isHeaderCompleted()) { 523 if (!handleRequestHeader(engine.currentRequest().get())) { 524 maybeCloseConnection(result); 525 break; 526 } 527 } 528 if (bodyData != null) { 529 if (bodyData.position() > 0) { 530 downPipeline.fire(Input.fromSink( 531 bodyData, !result.isOverflow() 532 && !result.isUnderflow()), 533 this); 534 } else { 535 bodyData.unlockBuffer(); 536 } 537 bodyData = null; 538 } 539 maybeCloseConnection(result); 540 wasOverflow = result.isOverflow(); 541 } 542 } 543 544 private void maybeCloseConnection(Decoder.Result<?> result) { 545 if (result.closeConnection()) { 546 // Send close "in sync", even if event source is unexpected. 547 responsePipeline().overrideRestriction() 548 .fire(new Close(), this); 549 } 550 } 551 552 @SuppressWarnings({ "PMD.CollapsibleIfStatements", 553 "PMD.CognitiveComplexity" }) 554 private boolean handleRequestHeader(MessageHeader request) { 555 if (request instanceof HttpRequest) { 556 HttpRequest httpRequest = (HttpRequest) request; 557 if (httpRequest.hasPayload()) { 558 if (httpRequest.findValue( 559 HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE) 560 .map(type -> "text" 561 .equalsIgnoreCase(type.value().topLevelType())) 562 .orElse(false)) { 563 currentPool = charBufferPool; 564 } else { 565 currentPool = byteBufferPool; 566 } 567 } 568 if (secure) { 569 if (!snis.contains(httpRequest.host())) { 570 if (acceptNoSni && snis.isEmpty()) { 571 convertHostToNumerical(httpRequest); 572 } else { 573 ResponseCreationSupport.sendResponse(httpRequest, 574 this, 421, "Misdirected Request"); 575 return false; 576 } 577 } 578 } 579 try { 580 downPipeline.fire(Request.In.fromHttpRequest(httpRequest, 581 secure, matchLevels), this); 582 } catch (URISyntaxException e) { 583 ResponseCreationSupport.sendResponse(httpRequest, this, 400, 584 "Bad Request"); 585 return false; 586 } 587 } else if (request instanceof WsMessageHeader) { 588 WsMessageHeader wsMessage = (WsMessageHeader) request; 589 if (wsMessage.hasPayload()) { 590 if (wsMessage.isTextMode()) { 591 currentPool = charBufferPool; 592 } else { 593 currentPool = byteBufferPool; 594 } 595 } 596 } else if (request instanceof WsCloseFrame) { 597 downPipeline.fire( 598 new WebSocketClose((WsCloseFrame) request, this)); 599 } 600 return true; 601 } 602 603 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 604 "PMD.UseStringBufferForStringAppends" }) 605 private void convertHostToNumerical(HttpRequest request) { 606 int port = request.port(); 607 String host; 608 try { 609 InetAddress addr = InetAddress.getByName( 610 request.host()); 611 host = addr.getHostAddress(); 612 if (!(addr instanceof Inet4Address)) { 613 host = "[" + host + "]"; 614 } 615 } catch (UnknownHostException e) { 616 host = InetAddress.getLoopbackAddress().getHostAddress(); 617 } 618 request.setHostAndPort(host, port); 619 } 620 621 /** 622 * Handle a response event from the application layer. 623 * 624 * @param event the event 625 * @throws InterruptedException the interrupted exception 626 */ 627 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 628 "PMD.AvoidBranchingStatementAsLastInLoop", 629 "PMD.CognitiveComplexity" }) 630 public void handleResponse(Response event) throws InterruptedException { 631 if (!engine.encoding() 632 .isAssignableFrom(event.response().getClass())) { 633 return; 634 } 635 final MessageHeader response = event.response(); 636 // Start sending the response 637 @SuppressWarnings("unchecked") 638 ServerEngine<?, MessageHeader> msgEngine 639 = (ServerEngine<?, MessageHeader>) engine; 640 msgEngine.encode(response); 641 if (pendingUpgraded != null) { 642 if (response instanceof HttpResponse 643 && ((HttpResponse) response).statusCode() % 100 == 1) { 644 downPipeline.fire(pendingUpgraded, this); 645 } 646 pendingUpgraded = null; 647 } 648 boolean hasBody = response.hasPayload(); 649 while (true) { 650 outBuffer = upstreamChannel().byteBufferPool().acquire(); 651 final ManagedBuffer<ByteBuffer> buffer = outBuffer; 652 Codec.Result result = engine.encode( 653 Codec.EMPTY_IN, buffer.backingBuffer(), !hasBody); 654 if (result.isOverflow()) { 655 upstreamChannel().respond(Output.fromSink(buffer, false)); 656 continue; 657 } 658 if (hasBody) { 659 // Keep buffer with incomplete response to be further 660 // filled by subsequent Output events 661 break; 662 } 663 // Response is complete 664 if (buffer.position() > 0) { 665 upstreamChannel().respond(Output.fromSink(buffer, true)); 666 } else { 667 buffer.unlockBuffer(); 668 } 669 outBuffer = null; 670 if (result.closeConnection()) { 671 upstreamChannel().respond(new Close()); 672 } 673 break; 674 } 675 676 } 677 678 /** 679 * Handle a {@link ProtocolSwitchAccepted} event from the 680 * application layer. 681 * 682 * @param event the event 683 * @param appChannel the app channel 684 */ 685 public void handleProtocolSwitchAccepted( 686 ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) { 687 appChannel.setAssociated(URI.class, 688 event.requestEvent().requestUri()); 689 final HttpResponse response = event.requestEvent() 690 .httpRequest().response().get() 691 .setStatus(HttpStatus.SWITCHING_PROTOCOLS) 692 .setField(HttpField.UPGRADE, 693 new StringList(event.protocol())); 694 // We send the Upgraded event only after the response has 695 // successfully been encoded (and thus checked). 696 pendingUpgraded = new Upgraded(event.resourceName(), 697 event.protocol()); 698 respond(new Response(response)); 699 } 700 701 /** 702 * Handle output from the application layer. 703 * 704 * @param event the event 705 * @throws InterruptedException the interrupted exception 706 */ 707 @SuppressWarnings({ "PMD.CyclomaticComplexity", "PMD.NcssCount", 708 "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops", 709 "PMD.CognitiveComplexity" }) 710 public void handleAppOutput(Output<?> event) 711 throws InterruptedException { 712 Buffer eventData = event.data(); 713 Buffer input; 714 if (eventData instanceof ByteBuffer) { 715 input = ((ByteBuffer) eventData).duplicate(); 716 } else if (eventData instanceof CharBuffer) { 717 input = ((CharBuffer) eventData).duplicate(); 718 } else { 719 return; 720 } 721 if (engine.switchedTo().equals(Optional.of("websocket")) 722 && currentWsMessage == null) { 723 // When switched to WebSockets, we only have Input and Output 724 // events. Add header automatically. 725 @SuppressWarnings("unchecked") 726 ServerEngine<?, MessageHeader> wsEngine 727 = (ServerEngine<?, MessageHeader>) engine; 728 currentWsMessage = new WsMessageHeader( 729 event.buffer().backingBuffer() instanceof CharBuffer, 730 true); 731 wsEngine.encode(currentWsMessage); 732 } 733 while (input.hasRemaining() || event.isEndOfRecord()) { 734 if (outBuffer == null) { 735 outBuffer = upstreamChannel().byteBufferPool().acquire(); 736 } 737 Codec.Result result = engine.encode(input, 738 outBuffer.backingBuffer(), event.isEndOfRecord()); 739 if (result.isOverflow()) { 740 upstreamChannel() 741 .respond(Output.fromSink(outBuffer, false)); 742 outBuffer = upstreamChannel().byteBufferPool().acquire(); 743 continue; 744 } 745 if (event.isEndOfRecord() || result.closeConnection()) { 746 if (outBuffer.position() > 0) { 747 upstreamChannel() 748 .respond(Output.fromSink(outBuffer, true)); 749 } else { 750 outBuffer.unlockBuffer(); 751 } 752 outBuffer = null; 753 if (result.closeConnection()) { 754 upstreamChannel().respond(new Close()); 755 } 756 break; 757 } 758 } 759 if (engine.switchedTo().equals(Optional.of("websocket")) 760 && event.isEndOfRecord()) { 761 currentWsMessage = null; 762 } 763 } 764 765 /** 766 * Handle a {@link Close} event from the application layer. 767 * 768 * @param event the event 769 * @throws InterruptedException the interrupted exception 770 */ 771 public void handleClose(Close event) throws InterruptedException { 772 if (engine.switchedTo().equals(Optional.of("websocket"))) { 773 fire(new Response(new WsCloseFrame(null, null)), this); 774 return; 775 } 776 upstreamChannel().respond(new Close()); 777 } 778 779 /** 780 * Handle a {@link Closed} event from the network by forwarding 781 * it to the application layer. 782 * 783 * @param event the event 784 */ 785 public void handleClosed(Closed<?> event) { 786 downPipeline.fire(new Closed<Void>(), this); 787 } 788 789 /** 790 * Handle a {@link Purge} event by forwarding it to the 791 * application layer. 792 * 793 * @param event the event 794 */ 795 public void handlePurge(Purge event) { 796 downPipeline.fire(new Purge(), this); 797 } 798 799 } 800 801}