001/*
002 * This file is part of the JDrupes non-blocking HTTP Codec
003 * Copyright (C) 2016, 2017  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published
007 * by the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 
013 * License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.httpcodec.protocols.http;
020
021import java.io.IOException;
022import java.io.OutputStreamWriter;
023import java.io.UnsupportedEncodingException;
024import java.io.Writer;
025import java.nio.Buffer;
026import java.nio.ByteBuffer;
027import java.nio.CharBuffer;
028import java.nio.charset.Charset;
029import java.nio.charset.CharsetEncoder;
030import java.nio.charset.CoderResult;
031import java.util.Iterator;
032import java.util.Optional;
033import java.util.Stack;
034
035import org.jdrupes.httpcodec.Codec;
036import org.jdrupes.httpcodec.Decoder;
037import org.jdrupes.httpcodec.Encoder;
038
039import static org.jdrupes.httpcodec.protocols.http.HttpConstants.*;
040
041import org.jdrupes.httpcodec.types.Converters;
042import org.jdrupes.httpcodec.types.MediaType;
043import org.jdrupes.httpcodec.types.StringList;
044import org.jdrupes.httpcodec.util.ByteBufferOutputStream;
045import org.jdrupes.httpcodec.util.ByteBufferUtils;
046
047/**
048 * Implements an encoder for HTTP. The class can be used as base class for both
049 * a request and a response encoder.
050 * 
051 * @param <T> the type of the message header to be encoded
052 * @param <D> the type of the message header decoded by the peer decoder
053 */
054public abstract class HttpEncoder<T extends HttpMessageHeader,
055        D extends HttpMessageHeader> extends HttpCodec<T>
056        implements Encoder<T, D> {
057
058        private enum State {
059                // Main states
060                INITIAL, DONE, CLOSED,
061                // Sub states
062                HEADERS, CHUNK_BODY, STREAM_CHUNK, FINISH_CHUNK, FINISH_CHUNKED,
063                START_COLLECT_BODY, COLLECT_BODY, STREAM_COLLECTED, 
064                STREAM_BODY, FLUSH_ENCODER
065        }
066
067        private Stack<State> states = new Stack<>();
068        private boolean closeAfterBody = false;
069        private ByteBufferOutputStream outStream;
070        private Writer writer;
071        private Iterator<HttpField<?>> headerIter = null;
072        private int pendingLimit = 1024 * 1024;
073        private long leftToStream;
074        private ByteBufferOutputStream collectedBodyData;
075        private CharsetEncoder charEncoder = null;
076        private Writer charWriter = null;
077        private ByteBuffer chunkData;
078        protected Decoder<D, T> peerDecoder;
079
080        /**
081         * Creates a new encoder.
082         */
083        public HttpEncoder() {
084                outStream = new ByteBufferOutputStream();
085                try {
086                        writer = new OutputStreamWriter(outStream, "ascii");
087                } catch (UnsupportedEncodingException e) {
088                        // Cannot happen (specified to be supported)
089                }
090                states.push(State.INITIAL);
091        }
092
093        public Encoder<T, D> setPeerDecoder(Decoder<D, T> decoder) {
094                peerDecoder = decoder;
095                return this;
096        }
097        
098        /**
099         * Returns the result factory for this codec.
100         * 
101         * @return the factory
102         */
103        protected abstract Result.Factory resultFactory();
104        
105        /**
106         * Returns the limit for pending body bytes. If the protocol is HTTP/1.0 and
107         * the message has a body but no "Content-Length" header, the only
108         * (unreliable) way to indicate the end of the body is to close the
109         * connection after all body bytes have been sent.
110         * <P>
111         * The encoder tries to calculate the content length by buffering the body
112         * data up to the "pending" limit. If the body is smaller than the limit,
113         * the message is set with the calculated content length header, else the
114         * data is sent without such a header and the connection is closed.
115         * <P>
116         * If the response protocol is HTTP/1.1 and there is no "Content-Length"
117         * header, chunked transfer encoding is used.
118         *
119         * @return the limit
120         */
121        public int pendingLimit() {
122                return pendingLimit;
123        }
124
125        /**
126         * Sets the limit for the pending body bytes.
127         * 
128         * @param pendingLimit
129         *            the limit to set
130         */
131        public void setPendingLimit(int pendingLimit) {
132                this.pendingLimit = pendingLimit;
133        }
134
135        /**
136         * Returns {@code true} if the encoder does not accept further input because
137         * the processed data indicated that the connection has been or is to be
138         * closed.
139         * 
140         * @return the result
141         */
142        public boolean isClosed() {
143                return states.peek() == State.CLOSED;
144        }
145
146        /**
147         * Writes the first line of the message (including the terminating CRLF).
148         * Must be provided by the derived class because the first line depends on
149         * whether a request or response is encoded.
150         * 
151         * @param messageHeader
152         *            the message header to encode (see
153         *            {@link #encode(HttpMessageHeader)}
154         * @param writer
155         *            the Writer to use for writing
156         * @throws IOException
157         *             if an I/O error occurs
158         */
159        protected abstract void startMessage(T messageHeader, Writer writer) 
160                        throws IOException;
161
162    /**
163     * Force close after body. Used to override the header driven
164     * behavior for HTTP/1.0 responses.
165     *
166     * @return true, if forced
167     */
168    protected boolean forceCloseAfterBody() {
169        return false;
170    }
171    
172        /**
173         * Set a new HTTP message that is to be encoded.
174         * 
175         * @param messageHeader
176         *            the response
177         */
178        @Override
179        public void encode(T messageHeader) {
180                if (states.peek() != State.INITIAL) {
181                        throw new IllegalStateException();
182                }
183                this.messageHeader = messageHeader;
184                charEncoder = null;
185                charWriter = null;
186                // Consistency checks
187                if (messageHeader.fields().containsKey(HttpField.CONTENT_TYPE)) {
188                        messageHeader.setHasPayload(true);
189                }
190        }
191
192        /**
193         * Encodes a HTTP message.
194         * 
195         * @param in
196         *            the body data
197         * @param out
198         *            the buffer to which data is written
199         * @param endOfInput
200         *            {@code true} if there is no input left beyond the data
201         *            currently in the {@code in} buffer (indicates end of body or
202         *            no body at all)
203         * @return the result
204         */
205        public Encoder.Result encode(Buffer in, ByteBuffer out,
206                boolean endOfInput) {
207                outStream.assignBuffer(out); // Prepare and copy over what was left.
208                if (out.remaining() == 0) { // If a lot was left.
209                        outStream.assignBuffer(null);
210                        return resultFactory().newResult(true, false, false);
211                }
212                Encoder.Result result = resultFactory().newResult(false, false, false);
213                while (true) {
214                        if (result.isOverflow() || result.isUnderflow()) {
215                                outStream.assignBuffer(null);
216                                return result;
217                        }
218                        switch (states.peek()) {
219                        case INITIAL:
220                                outStream.clear();
221                                outStream.assignBuffer(out);
222                                startEncoding();
223                                break;
224
225                        case HEADERS:
226                                // If headers remain (new request or buffer was full) write them
227                                if (writeHeaders()) {
228                                        continue;
229                                }
230                                break;
231
232                        case START_COLLECT_BODY:
233                                // Start collecting
234                                if (in.remaining() == 0 && !endOfInput) {
235                                        // Has probably been invoked with a dummy buffer,
236                                        // cannot be used to create pending body buffer.
237                                        outStream.assignBuffer(null);
238                                        return resultFactory().newResult(false, true, false);
239                                }
240                                collectedBodyData = new ByteBufferOutputStream(
241                                        Math.min(pendingLimit, 
242                                                        Math.max(in.capacity(), 64 * 1024)));
243                                states.pop();
244                                states.push(State.COLLECT_BODY);
245                                // fall through (no write occurred yet)
246                        case COLLECT_BODY:
247                                result = collectBody(in, endOfInput);
248                                continue; // never writes to out
249
250                        case STREAM_COLLECTED:
251                                // Output collected body
252                                if (collectedBodyData.remaining() < 0) {
253                                        // Copy over.
254                                        collectedBodyData.assignBuffer(out);
255                                        collectedBodyData.assignBuffer(null);
256                                        break;
257                                }
258                                states.pop();
259                                continue;
260
261                        case STREAM_BODY:
262                                // Stream, i.e. simply copy from source to destination
263                                int initiallyRemaining = out.remaining();
264                                result = copyBodyData(in, out, endOfInput);
265                                leftToStream -= (initiallyRemaining - out.remaining());
266                                if (!result.isOverflow() && (leftToStream == 0 || endOfInput)) {
267                                        // end of data
268                                        states.pop();
269                                        if (charEncoder != null) {
270                                                states.push(State.FLUSH_ENCODER);
271                                        }
272                                        continue;
273                                }
274                                // Buffer written, waiting for space, more data or end of data
275                                outStream.assignBuffer(null);
276                                return result;
277
278                        case FLUSH_ENCODER:
279                                CoderResult flushResult = charEncoder.flush(out);
280                                if (flushResult.isOverflow()) {
281                                        outStream.assignBuffer(null);
282                                        return resultFactory().newResult(true, false, false);
283                                }
284                                states.pop();
285                                break;
286                                
287                        case CHUNK_BODY:
288                                // Send in data as chunk
289                                result = startChunk(in, endOfInput);
290                                continue; // remaining check already done 
291
292                        case STREAM_CHUNK:
293                                // Stream, i.e. simply copy from source to destination
294                                if (!ByteBufferUtils.putAsMuchAsPossible(out, chunkData)) {
295                                        // Not enough space in out buffer
296                                        outStream.assignBuffer(null);
297                                        return resultFactory().newResult(true, false, false);
298                                }
299                                // everything from in written
300                                states.pop();
301                                continue;
302        
303                        case FINISH_CHUNK:
304                                try {
305                                        outStream.write("\r\n".getBytes("ascii"));
306                                        states.pop();
307                                } catch (IOException e) {
308                                        // Formally thrown by write
309                                }
310                                break;
311                                
312                        case FINISH_CHUNKED:
313                                try {
314                                        outStream.write("0\r\n\r\n".getBytes("ascii"));
315                                        states.pop();
316                                } catch (IOException e) {
317                                        // Formally thrown by write
318                                }
319                                break;
320                                
321                        case DONE:
322                                // Everything is written
323                                outStream.assignBuffer(null);
324                                if (!endOfInput) {
325                                        if (in.remaining() == 0) {
326                                                return resultFactory()
327                                                                .newResult(false, true, false);
328                                        }
329                                        throw new IllegalStateException(
330                                                        "A message has been completely encoded but"
331                                                        + " encode has not been invoked with endOfInput"
332                                                        + " set to true.");
333                                }
334                                states.pop();
335                                if (closeAfterBody) {
336                                        states.push(State.CLOSED);
337                                } else {
338                                        states.push(State.INITIAL);
339                                }
340                                return resultFactory().newResult(false, false, isClosed());
341
342                        default:
343                                throw new IllegalStateException();
344                        }
345                        // Using "continue" above avoids this check. Use it only
346                        // if the state has changed and no additional output is expected. 
347                        if (out.remaining() == 0) {
348                                outStream.assignBuffer(null);
349                                return resultFactory().newResult(true, false, false);
350                        }
351                }
352        }
353
354        @Override
355        public Optional<T> header() {
356                return Optional.ofNullable(messageHeader);
357        }
358
359        /**
360         * Called during the initial state. Writes the status line, determines the
361         * body-mode and puts the state machine in output-headers mode, unless the
362         * body-mode is "collect body".
363         */
364        private void startEncoding() {
365                // Complete content type
366                Optional<HttpField<MediaType>> contentField = messageHeader
367                                .findField(HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE);
368                if (contentField.isPresent()) {
369                        MediaType type = contentField.get().value();
370                        if ("text".equals(type.topLevelType())
371                                        && type.parameter("charset") == null) {
372                                messageHeader.setField(HttpField.CONTENT_TYPE,
373                                                MediaType.builder().from(type)
374                                                .setParameter("charset", "utf-8").build());
375                        }
376                }
377                
378                // Prepare encoder
379                headerIter = null;
380
381                // Write request or status line
382                try {
383                        startMessage(messageHeader, writer);
384                } catch (IOException e) {
385                        // Formally thrown by writer, cannot happen.
386                }
387                states.pop();
388                // We'll eventually fall back to this state
389                states.push(State.DONE);
390                // Get a default for closeAfterBody from the header fields
391                closeAfterBody = forceCloseAfterBody() || messageHeader
392                        .findField(HttpField.CONNECTION, Converters.STRING_LIST)
393                        .map(h -> h.value()).map(f -> f.containsIgnoreCase("close"))
394                        .orElse(false);
395                // If there's no body, start outputting header fields
396                if (!messageHeader.hasPayload()) {
397                        states.push(State.HEADERS);
398                        return;
399                }
400                configureBodyHandling();
401        }
402
403        private void configureBodyHandling() {
404                // Message has a body, find out how to handle it
405                Optional<HttpField<Long>> cl = messageHeader
406                                .findField(HttpField.CONTENT_LENGTH, Converters.LONG);
407                leftToStream = (!cl.isPresent() ? -1 : cl.get().value());
408                if (leftToStream >= 0) {
409                        // Easiest: we have a content length, works always
410                        states.push(State.STREAM_BODY);
411                        states.push(State.HEADERS);
412                        return;
413                } 
414                if (messageHeader.protocol()
415                        .compareTo(HttpProtocol.HTTP_1_0) > 0) {
416                        // At least 1.1, use chunked
417                        Optional<HttpField<StringList>> transEnc = messageHeader.findField(
418                                HttpField.TRANSFER_ENCODING, Converters.STRING_LIST);
419                        if (!transEnc.isPresent()) {
420                                messageHeader.setField(HttpField.TRANSFER_ENCODING,
421                                                new StringList(TransferCoding.CHUNKED.toString()));
422                        } else {
423                                transEnc.get().value().remove(TransferCoding.CHUNKED.toString());
424                                transEnc.get().value().add(TransferCoding.CHUNKED.toString());
425                        }
426                        states.push(State.CHUNK_BODY);
427                        states.push(State.HEADERS);
428                        return;
429                }
430                // Bad: 1.0 and no content length.
431                if (pendingLimit > 0) {
432                        // Try to calculate length by collecting the data
433                        leftToStream = 0;
434                        states.push(State.START_COLLECT_BODY);
435                        return;
436                }
437                // May not buffer, use close
438                states.push(State.STREAM_BODY);
439                states.push(State.HEADERS);
440                closeAfterBody = true;
441        }
442
443        /**
444         * Outputs as many headers as fit in the current out buffer. If all headers
445         * are output, pops a state (thus proceeding with the selected body mode).
446         * Unless very small out buffers are used (or very large headers occur),
447         * this is invoked only once. Therefore no attempt has been made to avoid
448         * the usage of temporary buffers in the header header stream (there may be
449         * a maximum overflow of one partial header).
450         * 
451         * @return {@code true} if all headers could be written to the out
452         * buffer (nothing pending)
453         */
454        private boolean writeHeaders() {
455                try {
456                        if (headerIter == null) {
457                                headerIter = messageHeader.fields().values().iterator();
458                        }
459                        while (true) {
460                                if (!headerIter.hasNext()) {
461                                        writer.write("\r\n");
462                                        writer.flush();
463                                        states.pop();
464                                        return outStream.remaining() >= 0;
465                                }
466                                HttpField<?> header = headerIter.next();
467                                writer.write(header.asHeaderField());
468                                writer.write("\r\n");
469                                writer.flush();
470                                if (outStream.remaining() <= 0) {
471                                        break;
472                                }
473                        }
474                } catch (IOException e) {
475                        // Formally thrown by writer, cannot happen.
476                }
477                return false;
478        }
479
480        /**
481         * Copy as much data as possible from in to out.
482         * 
483         * @param in
484         * @param out
485         * @param endOfInput
486         */
487        private Encoder.Result copyBodyData(Buffer in, ByteBuffer out,
488                boolean endOfInput) {
489                if (in instanceof CharBuffer) {
490                        // copy via encoder
491                        if (charEncoder == null) {
492                                charEncoder = Charset.forName(bodyCharset()).newEncoder();
493                        }
494                        CoderResult result = charEncoder.encode((CharBuffer) in, out,
495                                endOfInput);
496                        return resultFactory().newResult(result.isOverflow(), 
497                                        !endOfInput && result.isUnderflow(), false);
498                }
499                if (out.remaining() <= leftToStream) {
500                        ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in);
501                } else {
502                        ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in,
503                                (int) leftToStream);
504                }
505                return resultFactory().newResult(out.remaining() == 0, 
506                                !endOfInput && in.remaining() == 0, false);
507        }
508
509        /**
510         * Handle the input as appropriate for the collect-body mode.
511         * 
512         * @param in
513         * @param endOfInput
514         */
515        private Encoder.Result collectBody(Buffer in, boolean endOfInput) {
516                if (collectedBodyData.remaining() - in.remaining() < -pendingLimit) {
517                        // No space left, output headers, collected and rest (and then
518                        // close)
519                        states.pop();
520                        closeAfterBody = true;
521                        leftToStream = Long.MAX_VALUE;
522                        states.push(State.STREAM_BODY);
523                        states.push(State.STREAM_COLLECTED);
524                        states.push(State.HEADERS);
525                        return resultFactory().newResult(false, false, false);
526                }
527                // Space left, collect
528                if (in instanceof ByteBuffer) {
529                        collectedBodyData.write((ByteBuffer)in);
530                } else {
531                        if (charWriter == null) {
532                                try {
533                                        charWriter = new OutputStreamWriter(collectedBodyData,
534                                                bodyCharset());
535                                } catch (UnsupportedEncodingException e) {
536                                        throw new IllegalArgumentException(e);
537                                }
538                        }
539                        try {
540                                if (in.hasArray()) {
541                                        // more efficient than CharSequence
542                                        charWriter.write(((CharBuffer) in).array(),
543                                                in.arrayOffset() + in.position(), in.remaining());
544                                } else {
545                                        charWriter.append((CharBuffer)in);
546                                }
547                                in.position(in.limit());
548                                charWriter.flush();
549                        } catch (IOException e) {
550                                // Formally thrown, cannot happen
551                        }
552                }
553                if (endOfInput) {
554                        // End of body, found content length!
555                        messageHeader.setField(
556                                        HttpField.CONTENT_LENGTH, collectedBodyData.bytesWritten());
557                        states.pop();
558                        states.push(State.STREAM_COLLECTED);
559                        states.push(State.HEADERS);
560                        return resultFactory().newResult(false, false, false);
561                }
562                // Get more input
563                return resultFactory().newResult(false, true, false);
564        }
565
566        /**
567         * Handle the input as appropriate for the chunked-body mode.
568         * 
569         * @param in the data
570         * @return the result
571         */
572        private Encoder.Result startChunk(Buffer in, boolean endOfInput) {
573                if (endOfInput) {
574                        states.pop();
575                        states.push(State.FINISH_CHUNKED);
576                }
577                try {
578                        // Don't write zero sized chunks
579                        if (in.hasRemaining()) {
580                                if (in instanceof CharBuffer) {
581                                        if (charEncoder == null) {
582                                                charEncoder = Charset.forName(bodyCharset())
583                                                        .newEncoder();
584                                        }
585                                        chunkData = charEncoder.encode((CharBuffer)in);
586                                } else {
587                                        chunkData = (ByteBuffer)in;
588                                }
589                                outStream.write(
590                                        Long.toHexString(chunkData.remaining())
591                                                .getBytes("ascii"));
592                                outStream.write("\r\n".getBytes("ascii"));
593                                states.push(State.FINISH_CHUNK);
594                                states.push(State.STREAM_CHUNK);
595                                return resultFactory().newResult(
596                                                outStream.remaining() <= 0, false, false);
597                        }
598                } catch (IOException e) {
599                        // Formally thrown by outStream, cannot happen.
600                }
601                return resultFactory().newResult(outStream.remaining() < 0, 
602                                in.remaining() == 0 && !endOfInput, false);
603        }
604        
605        /**
606         * Results from {@link HttpEncoder} provide no additional
607         * information compared to {@link org.jdrupes.httpcodec.Codec.Result}. 
608         * This class only provides a factory for creating concrete results.
609         */
610        public static class Result extends Codec.Result {
611        
612                protected Result(boolean overflow, boolean underflow,
613                        boolean closeConnection) {
614                        super(overflow, underflow, closeConnection);
615                }
616
617                /**
618                 * A factory for creating new Results.
619                 */
620                protected abstract static class Factory 
621                        extends Codec.Result.Factory {
622
623                        /**
624                         * Create new result.
625                         * 
626                         * @param overflow
627                         *            {@code true} if the data didn't fit in the out buffer
628                         * @param underflow
629                         *            {@code true} if more data is expected
630                         * @param closeConnection
631                         *            {@code true} if the connection should be closed
632                         * @return the result
633                         */
634                        public Result newResult(boolean overflow, boolean underflow,
635                                boolean closeConnection) {
636                                return new Result(overflow, underflow, closeConnection) {
637                                };
638                        }
639                }
640        }
641}