001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.http; 020 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.SocketAddress; 024import java.nio.Buffer; 025import java.nio.ByteBuffer; 026import java.nio.CharBuffer; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.Map; 031import java.util.Optional; 032import java.util.Set; 033import java.util.concurrent.Callable; 034import org.jdrupes.httpcodec.ClientEngine; 035import org.jdrupes.httpcodec.Codec; 036import org.jdrupes.httpcodec.Decoder; 037import org.jdrupes.httpcodec.MessageHeader; 038import org.jdrupes.httpcodec.ProtocolException; 039import org.jdrupes.httpcodec.protocols.http.HttpField; 040import org.jdrupes.httpcodec.protocols.http.HttpResponse; 041import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder; 042import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder; 043import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame; 044import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader; 045import org.jdrupes.httpcodec.types.Converters; 046import org.jgrapes.core.Channel; 047import org.jgrapes.core.ClassChannel; 048import org.jgrapes.core.Component; 049import org.jgrapes.core.Components; 050import org.jgrapes.core.Components.PoolingIndex; 051import org.jgrapes.core.EventPipeline; 052import org.jgrapes.core.annotation.Handler; 053import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements; 054import org.jgrapes.http.events.HostUnresolved; 055import org.jgrapes.http.events.HttpConnected; 056import org.jgrapes.http.events.Request; 057import org.jgrapes.http.events.Response; 058import org.jgrapes.http.events.WebSocketClose; 059import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 060import org.jgrapes.io.events.Close; 061import org.jgrapes.io.events.Closed; 062import org.jgrapes.io.events.IOError; 063import org.jgrapes.io.events.Input; 064import org.jgrapes.io.events.OpenSocketConnection; 065import org.jgrapes.io.events.Output; 066import org.jgrapes.io.util.ManagedBuffer; 067import org.jgrapes.io.util.ManagedBufferPool; 068import org.jgrapes.net.SocketIOChannel; 069import org.jgrapes.net.events.Connected; 070 071/** 072 * A converter component that receives and sends web application 073 * layer messages and byte buffers on associated network channels. 074 */ 075@SuppressWarnings("PMD.ExcessiveImports") 076public class HttpConnector extends Component { 077 078 private int applicationBufferSize = -1; 079 private final Channel netMainChannel; 080 @SuppressWarnings("PMD.UseConcurrentHashMap") 081 private final Map<SocketAddress, Set<WebAppMsgChannel>> connecting 082 = new HashMap<>(); 083 private final PoolingIndex<SocketAddress, SocketIOChannel> pooled 084 = new PoolingIndex<>(); 085 086 /** 087 * Denotes the network channel in handler annotations. 088 */ 089 private static class NetworkChannel extends ClassChannel { 090 } 091 092 /** 093 * Create a new connector that uses the {@code networkChannel} for network 094 * level I/O. 095 * 096 * @param appChannel 097 * this component's channel 098 * @param networkChannel 099 * the channel for network level I/O 100 */ 101 public HttpConnector(Channel appChannel, Channel networkChannel) { 102 super(appChannel, ChannelReplacements.create() 103 .add(NetworkChannel.class, networkChannel)); 104 this.netMainChannel = networkChannel; 105 } 106 107 /** 108 * Sets the size of the buffers used for {@link Input} events 109 * on the application channel. Defaults to the upstream buffer size 110 * minus 512 (estimate for added protocol overhead). 111 * 112 * @param applicationBufferSize the size to set 113 * @return the http server for easy chaining 114 */ 115 public HttpConnector setApplicationBufferSize(int applicationBufferSize) { 116 this.applicationBufferSize = applicationBufferSize; 117 return this; 118 } 119 120 /** 121 * Returns the size of the application side (receive) buffers. 122 * 123 * @return the value or -1 if not set 124 */ 125 public int applicationBufferSize() { 126 return applicationBufferSize; 127 } 128 129 /** 130 * Starts the processing of a request from the application layer. 131 * When a network connection has been established, the application 132 * layer will be informed by a {@link HttpConnected} event, fired 133 * on a subchannel that is created for the processing of this 134 * request. 135 * 136 * @param event the request 137 * @throws InterruptedException if processing is interrupted 138 * @throws IOException Signals that an I/O exception has occurred. 139 */ 140 @Handler 141 public void onRequest(Request.Out event) 142 throws InterruptedException, IOException { 143 new WebAppMsgChannel(event); 144 } 145 146 /** 147 * Handles output from the application. This may be the payload 148 * of e.g. a POST or data to be transferes on a websocket connection. 149 * 150 * @param event the event 151 * @param appChannel the application layer channel 152 * @throws InterruptedException the interrupted exception 153 */ 154 @Handler 155 public void onOutput(Output<?> event, WebAppMsgChannel appChannel) 156 throws InterruptedException { 157 appChannel.handleAppOutput(event); 158 } 159 160 /** 161 * Called when the network connection is established. Triggers the 162 * further processing of the initial request. 163 * 164 * @param event the event 165 * @param netConnChannel the network layer channel 166 * @throws InterruptedException if the execution is interrupted 167 * @throws IOException Signals that an I/O exception has occurred. 168 */ 169 @Handler(channels = NetworkChannel.class) 170 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 171 public void onConnected(Connected<?> event, SocketIOChannel netConnChannel) 172 throws InterruptedException, IOException { 173 // Check if an app channel has been waiting for such a connection 174 WebAppMsgChannel[] appChannel = { null }; 175 synchronized (connecting) { 176 connecting.computeIfPresent(event.remoteAddress(), (key, set) -> { 177 Iterator<WebAppMsgChannel> iter = set.iterator(); 178 appChannel[0] = iter.next(); 179 iter.remove(); 180 return set.isEmpty() ? null : set; 181 }); 182 } 183 if (appChannel[0] != null) { 184 appChannel[0].connected(netConnChannel); 185 } 186 } 187 188 /** 189 * Handles I/O error events from the network layer. 190 * 191 * @param event the event 192 * @throws IOException Signals that an I/O exception has occurred. 193 */ 194 @Handler(channels = NetworkChannel.class) 195 public void onIoError(IOError event) throws IOException { 196 for (Channel channel : event.channels()) { 197 if (channel instanceof SocketIOChannel) { 198 // Error while using established network connection 199 SocketIOChannel netConnChannel = (SocketIOChannel) channel; 200 Optional<WebAppMsgChannel> appChannel 201 = netConnChannel.associated(WebAppMsgChannel.class); 202 if (appChannel.isPresent()) { 203 // Error while using a network connection 204 appChannel.get().handleIoError(event, netConnChannel); 205 continue; 206 } 207 // Just in case... 208 pooled.remove(netConnChannel.remoteAddress(), netConnChannel); 209 continue; 210 } 211 // Error while trying to establish the network connection 212 if (event.event() instanceof OpenSocketConnection) { 213 OpenSocketConnection connEvent 214 = (OpenSocketConnection) event.event(); 215 Optional<Set<WebAppMsgChannel>> erroneous; 216 synchronized (connecting) { 217 erroneous = Optional 218 .ofNullable(connecting.get(connEvent.address())); 219 connecting.remove(connEvent.address()); 220 } 221 erroneous.ifPresent(set -> { 222 for (WebAppMsgChannel chann : set) { 223 chann.openError(event); 224 } 225 }); 226 } 227 } 228 } 229 230 /** 231 * Processes any input from the network layer. 232 * 233 * @param event the event 234 * @param netConnChannel the network layer channel 235 * @throws InterruptedException if the thread is interrupted 236 * @throws ProtocolException if the protocol is violated 237 */ 238 @Handler(channels = NetworkChannel.class) 239 public void onInput(Input<ByteBuffer> event, SocketIOChannel netConnChannel) 240 throws InterruptedException, ProtocolException { 241 Optional<WebAppMsgChannel> appChannel 242 = netConnChannel.associated(WebAppMsgChannel.class); 243 if (appChannel.isPresent()) { 244 appChannel.get().handleNetInput(event, netConnChannel); 245 } 246 } 247 248 /** 249 * Called when the network connection is closed. 250 * 251 * @param event the event 252 * @param netConnChannel the net conn channel 253 */ 254 @Handler(channels = NetworkChannel.class) 255 public void onClosed(Closed<?> event, SocketIOChannel netConnChannel) { 256 netConnChannel.associated(WebAppMsgChannel.class).ifPresent( 257 appChannel -> appChannel.handleClosed(event)); 258 pooled.remove(netConnChannel.remoteAddress(), netConnChannel); 259 } 260 261 /** 262 * Handles a close event from the application channel. Such an 263 * event may only be fired if the connection has been upgraded 264 * to a websocket connection. 265 * 266 * @param event the event 267 * @param appChannel the application channel 268 */ 269 @Handler 270 public void onClose(Close event, WebAppMsgChannel appChannel) { 271 appChannel.handleClose(event); 272 } 273 274 /** 275 * An application layer channel. When an object is created, it is first 276 * inserted into the {@link HttpConnector#connecting} map. Once a network 277 * channel has been assigned to it, it is primarily referenced by that 278 * network channel. 279 */ 280 private class WebAppMsgChannel extends DefaultIOSubchannel { 281 // Starts as ClientEngine<HttpRequest,HttpResponse> but may change 282 private final ClientEngine<?, ?> engine 283 = new ClientEngine<>(new HttpRequestEncoder(), 284 new HttpResponseDecoder()); 285 private final InetSocketAddress serverAddress; 286 private final Request.Out request; 287 private ManagedBuffer<ByteBuffer> outBuffer; 288 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, 289 ByteBuffer> byteBufferPool; 290 private ManagedBufferPool<ManagedBuffer<CharBuffer>, 291 CharBuffer> charBufferPool; 292 private ManagedBufferPool<?, ?> currentPool; 293 private SocketIOChannel netConnChannel; 294 private final EventPipeline downPipeline; 295 private WsMessageHeader currentWsMessage; 296 297 /** 298 * Instantiates a new channel. 299 * 300 * @param event the event 301 * @param netChannel the net channel 302 * @throws InterruptedException 303 * @throws IOException 304 */ 305 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 306 public WebAppMsgChannel(Request.Out event) 307 throws InterruptedException, IOException { 308 super(channel(), newEventPipeline()); 309 310 // Downstream pipeline, needed even if connection fails 311 downPipeline = newEventPipeline(); 312 313 // Extract request data and check host 314 request = event; 315 serverAddress = new InetSocketAddress( 316 event.requestUri().getHost(), event.requestUri().getPort()); 317 if (serverAddress.isUnresolved()) { 318 downPipeline.fire( 319 new HostUnresolved(event, "Host cannot be resolved."), 320 this); 321 return; 322 } 323 324 // Re-use TCP connection, if possible 325 SocketIOChannel recycled = pooled.poll(serverAddress); 326 if (recycled != null) { 327 connected(recycled); 328 return; 329 } 330 synchronized (connecting) { 331 connecting 332 .computeIfAbsent(serverAddress, key -> new HashSet<>()) 333 .add(this); 334 } 335 336 // Fire on main network channel (targeting the tcp connector) 337 // as a follow up event (using the current pipeline). 338 fire(new OpenSocketConnection(serverAddress), netMainChannel); 339 } 340 341 /** 342 * Error in response to trying to open a new TCP connection. 343 * 344 * @param event the event 345 */ 346 public void openError(IOError event) { 347 // Already removed from connecting by caller, simply forward. 348 downPipeline.fire(IOError.duplicate(event), this); 349 } 350 351 /** 352 * Error from established TCP connection. 353 * 354 * @param event the event 355 * @param netConnChannel the network channel 356 */ 357 public void handleIoError(IOError event, 358 SocketIOChannel netConnChannel) { 359 downPipeline.fire(IOError.duplicate(event), this); 360 } 361 362 /** 363 * Sets the network connection channel for this application channel. 364 * 365 * @param netConnChannel the net conn channel 366 * @throws InterruptedException the interrupted exception 367 * @throws IOException Signals that an I/O exception has occurred. 368 */ 369 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 370 public final void connected(SocketIOChannel netConnChannel) 371 throws InterruptedException, IOException { 372 // Associate the network channel with this application channel 373 this.netConnChannel = netConnChannel; 374 netConnChannel.setAssociated(WebAppMsgChannel.class, this); 375 request.connectedCallback().ifPresent( 376 consumer -> consumer.accept(request, netConnChannel)); 377 378 // Estimate "good" application buffer size 379 int bufferSize = applicationBufferSize; 380 if (bufferSize <= 0) { 381 bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512; 382 if (bufferSize < 4096) { 383 bufferSize = 4096; 384 } 385 } 386 String channelName = Components.objectName(HttpConnector.this) 387 + "." + Components.objectName(this); 388 byteBufferPool().setName(channelName + ".upstream.byteBuffers"); 389 charBufferPool().setName(channelName + ".upstream.charBuffers"); 390 // Allocate downstream buffer pools. Note that decoding WebSocket 391 // network packets may result in several WS frames that are each 392 // delivered in independent events. Therefore provide some 393 // additional buffers. 394 final int bufSize = bufferSize; 395 byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 396 () -> { 397 return ByteBuffer.allocate(bufSize); 398 }, 2, 100) 399 .setName(channelName + ".downstream.byteBuffers"); 400 charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 401 () -> { 402 return CharBuffer.allocate(bufSize); 403 }, 2, 100) 404 .setName(channelName + ".downstream.charBuffers"); 405 406 sendMessageUpstream(request.httpRequest(), netConnChannel); 407 408 // Forward Connected event downstream to e.g. start preparation 409 // of output events for payload data. 410 downPipeline.fire(new HttpConnected(request, 411 netConnChannel.localAddress(), netConnChannel.remoteAddress()), 412 this); 413 } 414 415 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 416 "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" }) 417 private void sendMessageUpstream(MessageHeader message, 418 SocketIOChannel netConnChannel) { 419 // Now send request as if it came from downstream (to 420 // avoid confusion with output events that may be 421 // generated in parallel, see below). 422 responsePipeline().submit("SynchronizedResponse", 423 new Callable<Void>() { 424 425 @SuppressWarnings({ "PMD.CommentRequired", 426 "PMD.AvoidBranchingStatementAsLastInLoop", 427 "PMD.AvoidDuplicateLiterals", 428 "PMD.AvoidInstantiatingObjectsInLoops" }) 429 public Void call() throws InterruptedException { 430 @SuppressWarnings("unchecked") 431 ClientEngine<MessageHeader, MessageHeader> untypedEngine 432 = (ClientEngine<MessageHeader, 433 MessageHeader>) engine; 434 untypedEngine.encode(message); 435 boolean hasBody = message.hasPayload(); 436 while (true) { 437 outBuffer 438 = netConnChannel.byteBufferPool().acquire(); 439 Codec.Result result 440 = engine.encode(Codec.EMPTY_IN, 441 outBuffer.backingBuffer(), !hasBody); 442 if (result.isOverflow()) { 443 netConnChannel 444 .respond(Output.fromSink(outBuffer, false)); 445 continue; 446 } 447 if (hasBody) { 448 // Keep buffer with incomplete request to be 449 // further 450 // filled by subsequent Output events 451 break; 452 } 453 // Request is completely encoded 454 if (outBuffer.position() > 0) { 455 netConnChannel 456 .respond(Output.fromSink(outBuffer, true)); 457 } else { 458 outBuffer.unlockBuffer(); 459 } 460 outBuffer = null; 461 if (result.closeConnection()) { 462 netConnChannel.respond(new Close()); 463 } 464 break; 465 } 466 return null; 467 } 468 }); 469 } 470 471 @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity", 472 "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops", 473 "PMD.AvoidDuplicateLiterals", "PMD.CognitiveComplexity" }) 474 public void handleAppOutput(Output<?> event) 475 throws InterruptedException { 476 Buffer eventData = event.data(); 477 Buffer input; 478 if (eventData instanceof ByteBuffer) { 479 input = ((ByteBuffer) eventData).duplicate(); 480 } else if (eventData instanceof CharBuffer) { 481 input = ((CharBuffer) eventData).duplicate(); 482 } else { 483 return; 484 } 485 if (engine.switchedTo().equals(Optional.of("websocket")) 486 && currentWsMessage == null) { 487 // When switched to WebSockets, we only have Input and Output 488 // events. Add header automatically. 489 @SuppressWarnings("unchecked") 490 ClientEngine<MessageHeader, ?> wsEngine 491 = (ClientEngine<MessageHeader, ?>) engine; 492 currentWsMessage = new WsMessageHeader( 493 event.buffer().backingBuffer() instanceof CharBuffer, 494 true); 495 wsEngine.encode(currentWsMessage); 496 } 497 while (input.hasRemaining() || event.isEndOfRecord()) { 498 if (outBuffer == null) { 499 outBuffer = netConnChannel.byteBufferPool().acquire(); 500 } 501 Codec.Result result = engine.encode(input, 502 outBuffer.backingBuffer(), event.isEndOfRecord()); 503 if (result.isOverflow()) { 504 netConnChannel.respond(Output.fromSink(outBuffer, false)); 505 outBuffer = netConnChannel.byteBufferPool().acquire(); 506 continue; 507 } 508 if (event.isEndOfRecord() || result.closeConnection()) { 509 if (outBuffer.position() > 0) { 510 netConnChannel 511 .respond(Output.fromSink(outBuffer, true)); 512 } else { 513 outBuffer.unlockBuffer(); 514 } 515 outBuffer = null; 516 if (result.closeConnection()) { 517 netConnChannel.respond(new Close()); 518 } 519 break; 520 } 521 } 522 if (engine.switchedTo().equals(Optional.of("websocket")) 523 && event.isEndOfRecord()) { 524 currentWsMessage = null; 525 } 526 } 527 528 @SuppressWarnings({ "PMD.CommentRequired", 529 "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" }) 530 public void handleNetInput(Input<ByteBuffer> event, 531 SocketIOChannel netConnChannel) 532 throws InterruptedException, ProtocolException { 533 // Send the data from the event through the decoder. 534 ByteBuffer inData = event.data(); 535 // Don't unnecessary allocate a buffer, may be header only message 536 ManagedBuffer<?> bodyData = null; 537 boolean wasOverflow = false; 538 Decoder.Result<?> result; 539 while (inData.hasRemaining()) { 540 if (wasOverflow) { 541 // Message has (more) body 542 bodyData = currentPool.acquire(); 543 } 544 result = engine.decode(inData, 545 bodyData == null ? null : bodyData.backingBuffer(), 546 event.isEndOfRecord()); 547 if (result.response().isPresent()) { 548 sendMessageUpstream(result.response().get(), 549 netConnChannel); 550 if (result.isResponseOnly()) { 551 maybeReleaseConnection(result); 552 continue; 553 } 554 } 555 if (result.isHeaderCompleted()) { 556 MessageHeader header 557 = engine.responseDecoder().header().get(); 558 if (!handleResponseHeader(header)) { 559 maybeReleaseConnection(result); 560 break; 561 } 562 } 563 if (bodyData != null) { 564 if (bodyData.position() > 0) { 565 boolean eor 566 = !result.isOverflow() && !result.isUnderflow(); 567 downPipeline.fire(Input.fromSink(bodyData, eor), this); 568 } else { 569 bodyData.unlockBuffer(); 570 } 571 bodyData = null; 572 } 573 maybeReleaseConnection(result); 574 wasOverflow = result.isOverflow(); 575 } 576 } 577 578 @SuppressWarnings("PMD.CognitiveComplexity") 579 private boolean handleResponseHeader(MessageHeader response) { 580 if (response instanceof HttpResponse) { 581 HttpResponse httpResponse = (HttpResponse) response; 582 if (httpResponse.hasPayload()) { 583 if (httpResponse.findValue( 584 HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE) 585 .map(type -> "text" 586 .equalsIgnoreCase(type.value().topLevelType())) 587 .orElse(false)) { 588 currentPool = charBufferPool; 589 } else { 590 currentPool = byteBufferPool; 591 } 592 } 593 downPipeline.fire(new Response(httpResponse), this); 594 } else if (response instanceof WsMessageHeader) { 595 WsMessageHeader wsMessage = (WsMessageHeader) response; 596 if (wsMessage.hasPayload()) { 597 if (wsMessage.isTextMode()) { 598 currentPool = charBufferPool; 599 } else { 600 currentPool = byteBufferPool; 601 } 602 } 603 } else if (response instanceof WsCloseFrame) { 604 downPipeline.fire( 605 new WebSocketClose((WsCloseFrame) response, this)); 606 } 607 return true; 608 } 609 610 private void maybeReleaseConnection(Decoder.Result<?> result) { 611 if (result.isOverflow() || result.isUnderflow()) { 612 // Data remains to be processed 613 return; 614 } 615 MessageHeader header 616 = engine.responseDecoder().header().get(); 617 // Don't release if something follows 618 if (header instanceof HttpResponse 619 && ((HttpResponse) header).statusCode() % 100 == 1) { 620 return; 621 } 622 if (engine.switchedTo().equals(Optional.of("websocket"))) { 623 if (!result.closeConnection()) { 624 return; 625 } 626 // Is web socket close, inform application layer 627 downPipeline.fire(new Closed<Void>(), this); 628 } 629 netConnChannel.setAssociated(WebAppMsgChannel.class, null); 630 if (!result.closeConnection()) { 631 // May be reused 632 pooled.add(serverAddress, netConnChannel); 633 } 634 netConnChannel = null; 635 } 636 637 @SuppressWarnings("PMD.CommentRequired") 638 public void handleClose(Close event) { 639 if (engine.switchedTo().equals(Optional.of("websocket"))) { 640 sendMessageUpstream(new WsCloseFrame(null, null), 641 netConnChannel); 642 } 643 } 644 645 @SuppressWarnings("PMD.CommentRequired") 646 public void handleClosed(Closed<?> event) { 647 if (engine.switchedTo().equals(Optional.of("websocket"))) { 648 downPipeline.fire(new Closed<Void>(), this); 649 } 650 } 651 652 } 653 654}