001/* 002 * This file is part of the JDrupes non-blocking HTTP Codec 003 * Copyright (C) 2016, 2017 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published 007 * by the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.httpcodec.protocols.http; 020 021import java.io.IOException; 022import java.io.OutputStreamWriter; 023import java.io.UnsupportedEncodingException; 024import java.io.Writer; 025import java.nio.Buffer; 026import java.nio.ByteBuffer; 027import java.nio.CharBuffer; 028import java.nio.charset.Charset; 029import java.nio.charset.CharsetEncoder; 030import java.nio.charset.CoderResult; 031import java.util.Iterator; 032import java.util.Optional; 033import java.util.Stack; 034 035import org.jdrupes.httpcodec.Codec; 036import org.jdrupes.httpcodec.Decoder; 037import org.jdrupes.httpcodec.Encoder; 038 039import static org.jdrupes.httpcodec.protocols.http.HttpConstants.*; 040 041import org.jdrupes.httpcodec.types.Converters; 042import org.jdrupes.httpcodec.types.MediaType; 043import org.jdrupes.httpcodec.types.StringList; 044import org.jdrupes.httpcodec.util.ByteBufferOutputStream; 045import org.jdrupes.httpcodec.util.ByteBufferUtils; 046 047/** 048 * Implements an encoder for HTTP. The class can be used as base class for both 049 * a request and a response encoder. 050 * 051 * @param <T> the type of the message header to be encoded 052 * @param <D> the type of the message header decoded by the peer decoder 053 */ 054public abstract class HttpEncoder<T extends HttpMessageHeader, 055 D extends HttpMessageHeader> extends HttpCodec<T> 056 implements Encoder<T, D> { 057 058 private enum State { 059 // Main states 060 INITIAL, DONE, CLOSED, 061 // Sub states 062 HEADERS, CHUNK_BODY, STREAM_CHUNK, FINISH_CHUNK, FINISH_CHUNKED, 063 START_COLLECT_BODY, COLLECT_BODY, STREAM_COLLECTED, 064 STREAM_BODY, FLUSH_ENCODER 065 } 066 067 private Stack<State> states = new Stack<>(); 068 private boolean closeAfterBody = false; 069 private ByteBufferOutputStream outStream; 070 private Writer writer; 071 private Iterator<HttpField<?>> headerIter = null; 072 private int pendingLimit = 1024 * 1024; 073 private long leftToStream; 074 private ByteBufferOutputStream collectedBodyData; 075 private CharsetEncoder charEncoder = null; 076 private Writer charWriter = null; 077 private ByteBuffer chunkData; 078 protected Decoder<D, T> peerDecoder; 079 080 /** 081 * Creates a new encoder. 082 */ 083 public HttpEncoder() { 084 outStream = new ByteBufferOutputStream(); 085 try { 086 writer = new OutputStreamWriter(outStream, "ascii"); 087 } catch (UnsupportedEncodingException e) { 088 // Cannot happen (specified to be supported) 089 } 090 states.push(State.INITIAL); 091 } 092 093 public Encoder<T, D> setPeerDecoder(Decoder<D, T> decoder) { 094 peerDecoder = decoder; 095 return this; 096 } 097 098 /** 099 * Returns the result factory for this codec. 100 * 101 * @return the factory 102 */ 103 protected abstract Result.Factory resultFactory(); 104 105 /** 106 * Returns the limit for pending body bytes. If the protocol is HTTP/1.0 and 107 * the message has a body but no "Content-Length" header, the only 108 * (unreliable) way to indicate the end of the body is to close the 109 * connection after all body bytes have been sent. 110 * <P> 111 * The encoder tries to calculate the content length by buffering the body 112 * data up to the "pending" limit. If the body is smaller than the limit, 113 * the message is set with the calculated content length header, else the 114 * data is sent without such a header and the connection is closed. 115 * <P> 116 * If the response protocol is HTTP/1.1 and there is no "Content-Length" 117 * header, chunked transfer encoding is used. 118 * 119 * @return the limit 120 */ 121 public int pendingLimit() { 122 return pendingLimit; 123 } 124 125 /** 126 * Sets the limit for the pending body bytes. 127 * 128 * @param pendingLimit 129 * the limit to set 130 */ 131 public void setPendingLimit(int pendingLimit) { 132 this.pendingLimit = pendingLimit; 133 } 134 135 /** 136 * Returns {@code true} if the encoder does not accept further input because 137 * the processed data indicated that the connection has been or is to be 138 * closed. 139 * 140 * @return the result 141 */ 142 public boolean isClosed() { 143 return states.peek() == State.CLOSED; 144 } 145 146 /** 147 * Writes the first line of the message (including the terminating CRLF). 148 * Must be provided by the derived class because the first line depends on 149 * whether a request or response is encoded. 150 * 151 * @param messageHeader 152 * the message header to encode (see 153 * {@link #encode(HttpMessageHeader)} 154 * @param writer 155 * the Writer to use for writing 156 * @throws IOException 157 * if an I/O error occurs 158 */ 159 protected abstract void startMessage(T messageHeader, Writer writer) 160 throws IOException; 161 162 /** 163 * Force close after body. Used to override the header driven 164 * behavior for HTTP/1.0 responses. 165 * 166 * @return true, if forced 167 */ 168 protected boolean forceCloseAfterBody() { 169 return false; 170 } 171 172 /** 173 * Set a new HTTP message that is to be encoded. 174 * 175 * @param messageHeader 176 * the response 177 */ 178 @Override 179 public void encode(T messageHeader) { 180 if (states.peek() != State.INITIAL) { 181 throw new IllegalStateException(); 182 } 183 this.messageHeader = messageHeader; 184 charEncoder = null; 185 charWriter = null; 186 // Consistency checks 187 if (messageHeader.fields().containsKey(HttpField.CONTENT_TYPE)) { 188 messageHeader.setHasPayload(true); 189 } 190 } 191 192 /** 193 * Encodes a HTTP message. 194 * 195 * @param in 196 * the body data 197 * @param out 198 * the buffer to which data is written 199 * @param endOfInput 200 * {@code true} if there is no input left beyond the data 201 * currently in the {@code in} buffer (indicates end of body or 202 * no body at all) 203 * @return the result 204 */ 205 public Encoder.Result encode(Buffer in, ByteBuffer out, 206 boolean endOfInput) { 207 outStream.assignBuffer(out); // Prepare and copy over what was left. 208 if (out.remaining() == 0) { // If a lot was left. 209 outStream.assignBuffer(null); 210 return resultFactory().newResult(true, false, false); 211 } 212 Encoder.Result result = resultFactory().newResult(false, false, false); 213 while (true) { 214 if (result.isOverflow() || result.isUnderflow()) { 215 outStream.assignBuffer(null); 216 return result; 217 } 218 switch (states.peek()) { 219 case INITIAL: 220 outStream.clear(); 221 outStream.assignBuffer(out); 222 startEncoding(); 223 break; 224 225 case HEADERS: 226 // If headers remain (new request or buffer was full) write them 227 if (writeHeaders()) { 228 continue; 229 } 230 break; 231 232 case START_COLLECT_BODY: 233 // Start collecting 234 if (in.remaining() == 0 && !endOfInput) { 235 // Has probably been invoked with a dummy buffer, 236 // cannot be used to create pending body buffer. 237 outStream.assignBuffer(null); 238 return resultFactory().newResult(false, true, false); 239 } 240 collectedBodyData = new ByteBufferOutputStream( 241 Math.min(pendingLimit, 242 Math.max(in.capacity(), 64 * 1024))); 243 states.pop(); 244 states.push(State.COLLECT_BODY); 245 // fall through (no write occurred yet) 246 case COLLECT_BODY: 247 result = collectBody(in, endOfInput); 248 continue; // never writes to out 249 250 case STREAM_COLLECTED: 251 // Output collected body 252 if (collectedBodyData.remaining() < 0) { 253 // Copy over. 254 collectedBodyData.assignBuffer(out); 255 collectedBodyData.assignBuffer(null); 256 break; 257 } 258 states.pop(); 259 continue; 260 261 case STREAM_BODY: 262 // Stream, i.e. simply copy from source to destination 263 int initiallyRemaining = out.remaining(); 264 result = copyBodyData(in, out, endOfInput); 265 leftToStream -= (initiallyRemaining - out.remaining()); 266 if (!result.isOverflow() && (leftToStream == 0 || endOfInput)) { 267 // end of data 268 states.pop(); 269 if (charEncoder != null) { 270 states.push(State.FLUSH_ENCODER); 271 } 272 continue; 273 } 274 // Buffer written, waiting for space, more data or end of data 275 outStream.assignBuffer(null); 276 return result; 277 278 case FLUSH_ENCODER: 279 CoderResult flushResult = charEncoder.flush(out); 280 if (flushResult.isOverflow()) { 281 outStream.assignBuffer(null); 282 return resultFactory().newResult(true, false, false); 283 } 284 states.pop(); 285 break; 286 287 case CHUNK_BODY: 288 // Send in data as chunk 289 result = startChunk(in, endOfInput); 290 continue; // remaining check already done 291 292 case STREAM_CHUNK: 293 // Stream, i.e. simply copy from source to destination 294 if (!ByteBufferUtils.putAsMuchAsPossible(out, chunkData)) { 295 // Not enough space in out buffer 296 outStream.assignBuffer(null); 297 return resultFactory().newResult(true, false, false); 298 } 299 // everything from in written 300 states.pop(); 301 continue; 302 303 case FINISH_CHUNK: 304 try { 305 outStream.write("\r\n".getBytes("ascii")); 306 states.pop(); 307 } catch (IOException e) { 308 // Formally thrown by write 309 } 310 break; 311 312 case FINISH_CHUNKED: 313 try { 314 outStream.write("0\r\n\r\n".getBytes("ascii")); 315 states.pop(); 316 } catch (IOException e) { 317 // Formally thrown by write 318 } 319 break; 320 321 case DONE: 322 // Everything is written 323 outStream.assignBuffer(null); 324 if (!endOfInput) { 325 if (in.remaining() == 0) { 326 return resultFactory() 327 .newResult(false, true, false); 328 } 329 throw new IllegalStateException( 330 "A message has been completely encoded but" 331 + " encode has not been invoked with endOfInput" 332 + " set to true."); 333 } 334 states.pop(); 335 if (closeAfterBody) { 336 states.push(State.CLOSED); 337 } else { 338 states.push(State.INITIAL); 339 } 340 return resultFactory().newResult(false, false, isClosed()); 341 342 default: 343 throw new IllegalStateException(); 344 } 345 // Using "continue" above avoids this check. Use it only 346 // if the state has changed and no additional output is expected. 347 if (out.remaining() == 0) { 348 outStream.assignBuffer(null); 349 return resultFactory().newResult(true, false, false); 350 } 351 } 352 } 353 354 @Override 355 public Optional<T> header() { 356 return Optional.ofNullable(messageHeader); 357 } 358 359 /** 360 * Called during the initial state. Writes the status line, determines the 361 * body-mode and puts the state machine in output-headers mode, unless the 362 * body-mode is "collect body". 363 */ 364 private void startEncoding() { 365 // Complete content type 366 Optional<HttpField<MediaType>> contentField = messageHeader 367 .findField(HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE); 368 if (contentField.isPresent()) { 369 MediaType type = contentField.get().value(); 370 if ("text".equals(type.topLevelType()) 371 && type.parameter("charset") == null) { 372 messageHeader.setField(HttpField.CONTENT_TYPE, 373 MediaType.builder().from(type) 374 .setParameter("charset", "utf-8").build()); 375 } 376 } 377 378 // Prepare encoder 379 headerIter = null; 380 381 // Write request or status line 382 try { 383 startMessage(messageHeader, writer); 384 } catch (IOException e) { 385 // Formally thrown by writer, cannot happen. 386 } 387 states.pop(); 388 // We'll eventually fall back to this state 389 states.push(State.DONE); 390 // Get a default for closeAfterBody from the header fields 391 closeAfterBody = forceCloseAfterBody() || messageHeader 392 .findField(HttpField.CONNECTION, Converters.STRING_LIST) 393 .map(h -> h.value()).map(f -> f.containsIgnoreCase("close")) 394 .orElse(false); 395 // If there's no body, start outputting header fields 396 if (!messageHeader.hasPayload()) { 397 states.push(State.HEADERS); 398 return; 399 } 400 configureBodyHandling(); 401 } 402 403 private void configureBodyHandling() { 404 // Message has a body, find out how to handle it 405 Optional<HttpField<Long>> cl = messageHeader 406 .findField(HttpField.CONTENT_LENGTH, Converters.LONG); 407 leftToStream = (!cl.isPresent() ? -1 : cl.get().value()); 408 if (leftToStream >= 0) { 409 // Easiest: we have a content length, works always 410 states.push(State.STREAM_BODY); 411 states.push(State.HEADERS); 412 return; 413 } 414 if (messageHeader.protocol() 415 .compareTo(HttpProtocol.HTTP_1_0) > 0) { 416 // At least 1.1, use chunked 417 Optional<HttpField<StringList>> transEnc = messageHeader.findField( 418 HttpField.TRANSFER_ENCODING, Converters.STRING_LIST); 419 if (!transEnc.isPresent()) { 420 messageHeader.setField(HttpField.TRANSFER_ENCODING, 421 new StringList(TransferCoding.CHUNKED.toString())); 422 } else { 423 transEnc.get().value().remove(TransferCoding.CHUNKED.toString()); 424 transEnc.get().value().add(TransferCoding.CHUNKED.toString()); 425 } 426 states.push(State.CHUNK_BODY); 427 states.push(State.HEADERS); 428 return; 429 } 430 // Bad: 1.0 and no content length. 431 if (pendingLimit > 0) { 432 // Try to calculate length by collecting the data 433 leftToStream = 0; 434 states.push(State.START_COLLECT_BODY); 435 return; 436 } 437 // May not buffer, use close 438 states.push(State.STREAM_BODY); 439 states.push(State.HEADERS); 440 closeAfterBody = true; 441 } 442 443 /** 444 * Outputs as many headers as fit in the current out buffer. If all headers 445 * are output, pops a state (thus proceeding with the selected body mode). 446 * Unless very small out buffers are used (or very large headers occur), 447 * this is invoked only once. Therefore no attempt has been made to avoid 448 * the usage of temporary buffers in the header header stream (there may be 449 * a maximum overflow of one partial header). 450 * 451 * @return {@code true} if all headers could be written to the out 452 * buffer (nothing pending) 453 */ 454 private boolean writeHeaders() { 455 try { 456 if (headerIter == null) { 457 headerIter = messageHeader.fields().values().iterator(); 458 } 459 while (true) { 460 if (!headerIter.hasNext()) { 461 writer.write("\r\n"); 462 writer.flush(); 463 states.pop(); 464 return outStream.remaining() >= 0; 465 } 466 HttpField<?> header = headerIter.next(); 467 writer.write(header.asHeaderField()); 468 writer.write("\r\n"); 469 writer.flush(); 470 if (outStream.remaining() <= 0) { 471 break; 472 } 473 } 474 } catch (IOException e) { 475 // Formally thrown by writer, cannot happen. 476 } 477 return false; 478 } 479 480 /** 481 * Copy as much data as possible from in to out. 482 * 483 * @param in 484 * @param out 485 * @param endOfInput 486 */ 487 private Encoder.Result copyBodyData(Buffer in, ByteBuffer out, 488 boolean endOfInput) { 489 if (in instanceof CharBuffer) { 490 // copy via encoder 491 if (charEncoder == null) { 492 charEncoder = Charset.forName(bodyCharset()).newEncoder(); 493 } 494 CoderResult result = charEncoder.encode((CharBuffer) in, out, 495 endOfInput); 496 return resultFactory().newResult(result.isOverflow(), 497 !endOfInput && result.isUnderflow(), false); 498 } 499 if (out.remaining() <= leftToStream) { 500 ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in); 501 } else { 502 ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in, 503 (int) leftToStream); 504 } 505 return resultFactory().newResult(out.remaining() == 0, 506 !endOfInput && in.remaining() == 0, false); 507 } 508 509 /** 510 * Handle the input as appropriate for the collect-body mode. 511 * 512 * @param in 513 * @param endOfInput 514 */ 515 private Encoder.Result collectBody(Buffer in, boolean endOfInput) { 516 if (collectedBodyData.remaining() - in.remaining() < -pendingLimit) { 517 // No space left, output headers, collected and rest (and then 518 // close) 519 states.pop(); 520 closeAfterBody = true; 521 leftToStream = Long.MAX_VALUE; 522 states.push(State.STREAM_BODY); 523 states.push(State.STREAM_COLLECTED); 524 states.push(State.HEADERS); 525 return resultFactory().newResult(false, false, false); 526 } 527 // Space left, collect 528 if (in instanceof ByteBuffer) { 529 collectedBodyData.write((ByteBuffer)in); 530 } else { 531 if (charWriter == null) { 532 try { 533 charWriter = new OutputStreamWriter(collectedBodyData, 534 bodyCharset()); 535 } catch (UnsupportedEncodingException e) { 536 throw new IllegalArgumentException(e); 537 } 538 } 539 try { 540 if (in.hasArray()) { 541 // more efficient than CharSequence 542 charWriter.write(((CharBuffer) in).array(), 543 in.arrayOffset() + in.position(), in.remaining()); 544 } else { 545 charWriter.append((CharBuffer)in); 546 } 547 in.position(in.limit()); 548 charWriter.flush(); 549 } catch (IOException e) { 550 // Formally thrown, cannot happen 551 } 552 } 553 if (endOfInput) { 554 // End of body, found content length! 555 messageHeader.setField( 556 HttpField.CONTENT_LENGTH, collectedBodyData.bytesWritten()); 557 states.pop(); 558 states.push(State.STREAM_COLLECTED); 559 states.push(State.HEADERS); 560 return resultFactory().newResult(false, false, false); 561 } 562 // Get more input 563 return resultFactory().newResult(false, true, false); 564 } 565 566 /** 567 * Handle the input as appropriate for the chunked-body mode. 568 * 569 * @param in the data 570 * @return the result 571 */ 572 private Encoder.Result startChunk(Buffer in, boolean endOfInput) { 573 if (endOfInput) { 574 states.pop(); 575 states.push(State.FINISH_CHUNKED); 576 } 577 try { 578 // Don't write zero sized chunks 579 if (in.hasRemaining()) { 580 if (in instanceof CharBuffer) { 581 if (charEncoder == null) { 582 charEncoder = Charset.forName(bodyCharset()) 583 .newEncoder(); 584 } 585 chunkData = charEncoder.encode((CharBuffer)in); 586 } else { 587 chunkData = (ByteBuffer)in; 588 } 589 outStream.write( 590 Long.toHexString(chunkData.remaining()) 591 .getBytes("ascii")); 592 outStream.write("\r\n".getBytes("ascii")); 593 states.push(State.FINISH_CHUNK); 594 states.push(State.STREAM_CHUNK); 595 return resultFactory().newResult( 596 outStream.remaining() <= 0, false, false); 597 } 598 } catch (IOException e) { 599 // Formally thrown by outStream, cannot happen. 600 } 601 return resultFactory().newResult(outStream.remaining() < 0, 602 in.remaining() == 0 && !endOfInput, false); 603 } 604 605 /** 606 * Results from {@link HttpEncoder} provide no additional 607 * information compared to {@link org.jdrupes.httpcodec.Codec.Result}. 608 * This class only provides a factory for creating concrete results. 609 */ 610 public static class Result extends Codec.Result { 611 612 protected Result(boolean overflow, boolean underflow, 613 boolean closeConnection) { 614 super(overflow, underflow, closeConnection); 615 } 616 617 /** 618 * A factory for creating new Results. 619 */ 620 protected abstract static class Factory 621 extends Codec.Result.Factory { 622 623 /** 624 * Create new result. 625 * 626 * @param overflow 627 * {@code true} if the data didn't fit in the out buffer 628 * @param underflow 629 * {@code true} if more data is expected 630 * @param closeConnection 631 * {@code true} if the connection should be closed 632 * @return the result 633 */ 634 public Result newResult(boolean overflow, boolean underflow, 635 boolean closeConnection) { 636 return new Result(overflow, underflow, closeConnection) { 637 }; 638 } 639 } 640 } 641}