001/*
002 * This file is part of the JDrupes non-blocking HTTP Codec
003 * Copyright (C) 2016, 2017  Michael N. Lipp
004 *
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published
007 * by the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 
013 * License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.httpcodec.protocols.http;
020
021import java.io.IOException;
022import java.io.OutputStreamWriter;
023import java.io.UnsupportedEncodingException;
024import java.io.Writer;
025import java.nio.Buffer;
026import java.nio.ByteBuffer;
027import java.nio.CharBuffer;
028import java.nio.charset.Charset;
029import java.nio.charset.CharsetEncoder;
030import java.nio.charset.CoderResult;
031import java.util.Iterator;
032import java.util.Optional;
033import java.util.Stack;
034
035import org.jdrupes.httpcodec.Codec;
036import org.jdrupes.httpcodec.Decoder;
037import org.jdrupes.httpcodec.Encoder;
038
039import static org.jdrupes.httpcodec.protocols.http.HttpConstants.*;
040
041import org.jdrupes.httpcodec.types.Converters;
042import org.jdrupes.httpcodec.types.MediaType;
043import org.jdrupes.httpcodec.types.StringList;
044import org.jdrupes.httpcodec.util.ByteBufferOutputStream;
045import org.jdrupes.httpcodec.util.ByteBufferUtils;
046
047/**
048 * Implements an encoder for HTTP. The class can be used as base class for both
049 * a request and a response encoder.
050 * 
051 * @param <T> the type of the message header to be encoded
052 * @param <D> the type of the message header decoded by the peer decoder
053 */
054public abstract class HttpEncoder<T extends HttpMessageHeader,
055        D extends HttpMessageHeader> extends HttpCodec<T>
056        implements Encoder<T, D> {
057
058        private enum State {
059                // Main states
060                INITIAL, DONE, CLOSED,
061                // Sub states
062                HEADERS, CHUNK_BODY, STREAM_CHUNK, FINISH_CHUNK, FINISH_CHUNKED,
063                START_COLLECT_BODY, COLLECT_BODY, STREAM_COLLECTED, 
064                STREAM_BODY, FLUSH_ENCODER
065        }
066
067        private Stack<State> states = new Stack<>();
068        private boolean closeAfterBody = false;
069        private ByteBufferOutputStream outStream;
070        private Writer writer;
071        private Iterator<HttpField<?>> headerIter = null;
072        private int pendingLimit = 1024 * 1024;
073        private long leftToStream;
074        private ByteBufferOutputStream collectedBodyData;
075        private CharsetEncoder charEncoder = null;
076        private Writer charWriter = null;
077        private ByteBuffer chunkData;
078        protected Decoder<D, T> peerDecoder;
079
080        /**
081         * Creates a new encoder.
082         */
083        public HttpEncoder() {
084                outStream = new ByteBufferOutputStream();
085                try {
086                        writer = new OutputStreamWriter(outStream, "ascii");
087                } catch (UnsupportedEncodingException e) {
088                        // Cannot happen (specified to be supported)
089                }
090                states.push(State.INITIAL);
091        }
092
093        public Encoder<T, D> setPeerDecoder(Decoder<D, T> decoder) {
094                peerDecoder = decoder;
095                return this;
096        }
097        
098        /**
099         * Returns the result factory for this codec.
100         * 
101         * @return the factory
102         */
103        protected abstract Result.Factory resultFactory();
104        
105        /**
106         * Returns the limit for pending body bytes. If the protocol is HTTP/1.0 and
107         * the message has a body but no "Content-Length" header, the only
108         * (unreliable) way to indicate the end of the body is to close the
109         * connection after all body bytes have been sent.
110         * <P>
111         * The encoder tries to calculate the content length by buffering the body
112         * data up to the "pending" limit. If the body is smaller than the limit,
113         * the message is set with the calculated content length header, else the
114         * data is sent without such a header and the connection is closed.
115         * <P>
116         * If the response protocol is HTTP/1.1 and there is no "Content-Length"
117         * header, chunked transfer encoding is used.
118         *
119         * @return the limit
120         */
121        public int pendingLimit() {
122                return pendingLimit;
123        }
124
125        /**
126         * Sets the limit for the pending body bytes.
127         * 
128         * @param pendingLimit
129         *            the limit to set
130         */
131        public void setPendingLimit(int pendingLimit) {
132                this.pendingLimit = pendingLimit;
133        }
134
135        /**
136         * Returns {@code true} if the encoder does not accept further input because
137         * the processed data indicated that the connection has been or is to be
138         * closed.
139         * 
140         * @return the result
141         */
142        public boolean isClosed() {
143                return states.peek() == State.CLOSED;
144        }
145
146        /**
147         * Writes the first line of the message (including the terminating CRLF).
148         * Must be provided by the derived class because the first line depends on
149         * whether a request or response is encoded.
150         * 
151         * @param messageHeader
152         *            the message header to encode (see
153         *            {@link #encode(HttpMessageHeader)}
154         * @param writer
155         *            the Writer to use for writing
156         * @throws IOException
157         *             if an I/O error occurs
158         */
159        protected abstract void startMessage(T messageHeader, Writer writer) 
160                        throws IOException;
161
162        /**
163         * Set a new HTTP message that is to be encoded.
164         * 
165         * @param messageHeader
166         *            the response
167         */
168        @Override
169        public void encode(T messageHeader) {
170                if (states.peek() != State.INITIAL) {
171                        throw new IllegalStateException();
172                }
173                this.messageHeader = messageHeader;
174                charEncoder = null;
175                charWriter = null;
176                // Consistency checks
177                if (messageHeader.fields().containsKey(HttpField.CONTENT_TYPE)) {
178                        messageHeader.setHasPayload(true);
179                }
180        }
181
182        /**
183         * Encodes a HTTP message.
184         * 
185         * @param in
186         *            the body data
187         * @param out
188         *            the buffer to which data is written
189         * @param endOfInput
190         *            {@code true} if there is no input left beyond the data
191         *            currently in the {@code in} buffer (indicates end of body or
192         *            no body at all)
193         * @return the result
194         */
195        public Encoder.Result encode(Buffer in, ByteBuffer out,
196                boolean endOfInput) {
197                outStream.assignBuffer(out); // Prepare and copy over what was left.
198                if (out.remaining() == 0) { // If a lot was left.
199                        outStream.assignBuffer(null);
200                        return resultFactory().newResult(true, false, false);
201                }
202                Encoder.Result result = resultFactory().newResult(false, false, false);
203                while (true) {
204                        if (result.isOverflow() || result.isUnderflow()) {
205                                outStream.assignBuffer(null);
206                                return result;
207                        }
208                        switch (states.peek()) {
209                        case INITIAL:
210                                outStream.clear();
211                                outStream.assignBuffer(out);
212                                startEncoding();
213                                break;
214
215                        case HEADERS:
216                                // If headers remain (new request or buffer was full) write them
217                                if (writeHeaders()) {
218                                        continue;
219                                }
220                                break;
221
222                        case START_COLLECT_BODY:
223                                // Start collecting
224                                if (in.remaining() == 0 && !endOfInput) {
225                                        // Has probably been invoked with a dummy buffer,
226                                        // cannot be used to create pending body buffer.
227                                        outStream.assignBuffer(null);
228                                        return resultFactory().newResult(false, true, false);
229                                }
230                                collectedBodyData = new ByteBufferOutputStream(
231                                        Math.min(pendingLimit, 
232                                                        Math.max(in.capacity(), 64 * 1024)));
233                                states.pop();
234                                states.push(State.COLLECT_BODY);
235                                // fall through (no write occurred yet)
236                        case COLLECT_BODY:
237                                result = collectBody(in, endOfInput);
238                                continue; // never writes to out
239
240                        case STREAM_COLLECTED:
241                                // Output collected body
242                                if (collectedBodyData.remaining() < 0) {
243                                        // Copy over.
244                                        collectedBodyData.assignBuffer(out);
245                                        collectedBodyData.assignBuffer(null);
246                                        break;
247                                }
248                                states.pop();
249                                continue;
250
251                        case STREAM_BODY:
252                                // Stream, i.e. simply copy from source to destination
253                                int initiallyRemaining = out.remaining();
254                                result = copyBodyData(in, out, endOfInput);
255                                leftToStream -= (initiallyRemaining - out.remaining());
256                                if (!result.isOverflow() && (leftToStream == 0 || endOfInput)) {
257                                        // end of data
258                                        states.pop();
259                                        if (charEncoder != null) {
260                                                states.push(State.FLUSH_ENCODER);
261                                        }
262                                        continue;
263                                }
264                                // Buffer written, waiting for space, more data or end of data
265                                outStream.assignBuffer(null);
266                                return result;
267
268                        case FLUSH_ENCODER:
269                                CoderResult flushResult = charEncoder.flush(out);
270                                if (flushResult.isOverflow()) {
271                                        outStream.assignBuffer(null);
272                                        return resultFactory().newResult(true, false, false);
273                                }
274                                states.pop();
275                                break;
276                                
277                        case CHUNK_BODY:
278                                // Send in data as chunk
279                                result = startChunk(in, endOfInput);
280                                continue; // remaining check already done 
281
282                        case STREAM_CHUNK:
283                                // Stream, i.e. simply copy from source to destination
284                                if (!ByteBufferUtils.putAsMuchAsPossible(out, chunkData)) {
285                                        // Not enough space in out buffer
286                                        outStream.assignBuffer(null);
287                                        return resultFactory().newResult(true, false, false);
288                                }
289                                // everything from in written
290                                states.pop();
291                                continue;
292        
293                        case FINISH_CHUNK:
294                                try {
295                                        outStream.write("\r\n".getBytes("ascii"));
296                                        states.pop();
297                                } catch (IOException e) {
298                                        // Formally thrown by write
299                                }
300                                break;
301                                
302                        case FINISH_CHUNKED:
303                                try {
304                                        outStream.write("0\r\n\r\n".getBytes("ascii"));
305                                        states.pop();
306                                } catch (IOException e) {
307                                        // Formally thrown by write
308                                }
309                                break;
310                                
311                        case DONE:
312                                // Everything is written
313                                outStream.assignBuffer(null);
314                                if (!endOfInput) {
315                                        if (in.remaining() == 0) {
316                                                return resultFactory()
317                                                                .newResult(false, true, false);
318                                        }
319                                        throw new IllegalStateException(
320                                                        "A message has been completely encoded but"
321                                                        + " encode has not been invoked with endOfInput"
322                                                        + " set to true.");
323                                }
324                                states.pop();
325                                if (closeAfterBody) {
326                                        states.push(State.CLOSED);
327                                } else {
328                                        states.push(State.INITIAL);
329                                }
330                                return resultFactory().newResult(false, false, isClosed());
331
332                        default:
333                                throw new IllegalStateException();
334                        }
335                        // Using "continue" above avoids this check. Use it only
336                        // if the state has changed and no additional output is expected. 
337                        if (out.remaining() == 0) {
338                                outStream.assignBuffer(null);
339                                return resultFactory().newResult(true, false, false);
340                        }
341                }
342        }
343
344        @Override
345        public Optional<T> header() {
346                return Optional.ofNullable(messageHeader);
347        }
348
349        /**
350         * Called during the initial state. Writes the status line, determines the
351         * body-mode and puts the state machine in output-headers mode, unless the
352         * body-mode is "collect body".
353         */
354        private void startEncoding() {
355                // Complete content type
356                Optional<HttpField<MediaType>> contentField = messageHeader
357                                .findField(HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE);
358                if (contentField.isPresent()) {
359                        MediaType type = contentField.get().value();
360                        if ("text".equals(type.topLevelType())
361                                        && type.parameter("charset") == null) {
362                                messageHeader.setField(HttpField.CONTENT_TYPE,
363                                                MediaType.builder().from(type)
364                                                .setParameter("charset", "utf-8").build());
365                        }
366                }
367                
368                // Prepare encoder
369                headerIter = null;
370
371                // Write request or status line
372                try {
373                        startMessage(messageHeader, writer);
374                } catch (IOException e) {
375                        // Formally thrown by writer, cannot happen.
376                }
377                states.pop();
378                // We'll eventually fall back to this state
379                states.push(State.DONE);
380                // Get a default for closeAfterBody from the header fields
381                closeAfterBody = messageHeader
382                        .findField(HttpField.CONNECTION, Converters.STRING_LIST)
383                        .map(h -> h.value()).map(f -> f.containsIgnoreCase("close"))
384                        .orElse(false);
385                // If there's no body, start outputting header fields
386                if (!messageHeader.hasPayload()) {
387                        states.push(State.HEADERS);
388                        return;
389                }
390                configureBodyHandling();
391        }
392
393        private void configureBodyHandling() {
394                // Message has a body, find out how to handle it
395                Optional<HttpField<Long>> cl = messageHeader
396                                .findField(HttpField.CONTENT_LENGTH, Converters.LONG);
397                leftToStream = (!cl.isPresent() ? -1 : cl.get().value());
398                if (leftToStream >= 0) {
399                        // Easiest: we have a content length, works always
400                        states.push(State.STREAM_BODY);
401                        states.push(State.HEADERS);
402                        return;
403                } 
404                if (messageHeader.protocol()
405                        .compareTo(HttpProtocol.HTTP_1_0) > 0) {
406                        // At least 1.1, use chunked
407                        Optional<HttpField<StringList>> transEnc = messageHeader.findField(
408                                HttpField.TRANSFER_ENCODING, Converters.STRING_LIST);
409                        if (!transEnc.isPresent()) {
410                                messageHeader.setField(HttpField.TRANSFER_ENCODING,
411                                                new StringList(TransferCoding.CHUNKED.toString()));
412                        } else {
413                                transEnc.get().value().remove(TransferCoding.CHUNKED.toString());
414                                transEnc.get().value().add(TransferCoding.CHUNKED.toString());
415                        }
416                        states.push(State.CHUNK_BODY);
417                        states.push(State.HEADERS);
418                        return;
419                }
420                // Bad: 1.0 and no content length.
421                if (pendingLimit > 0) {
422                        // Try to calculate length by collecting the data
423                        leftToStream = 0;
424                        states.push(State.START_COLLECT_BODY);
425                        return;
426                }
427                // May not buffer, use close
428                states.push(State.STREAM_BODY);
429                states.push(State.HEADERS);
430                closeAfterBody = true;
431        }
432
433        /**
434         * Outputs as many headers as fit in the current out buffer. If all headers
435         * are output, pops a state (thus proceeding with the selected body mode).
436         * Unless very small out buffers are used (or very large headers occur),
437         * this is invoked only once. Therefore no attempt has been made to avoid
438         * the usage of temporary buffers in the header header stream (there may be
439         * a maximum overflow of one partial header).
440         * 
441         * @return {@code true} if all headers could be written to the out
442         * buffer (nothing pending)
443         */
444        private boolean writeHeaders() {
445                try {
446                        if (headerIter == null) {
447                                headerIter = messageHeader.fields().values().iterator();
448                        }
449                        while (true) {
450                                if (!headerIter.hasNext()) {
451                                        writer.write("\r\n");
452                                        writer.flush();
453                                        states.pop();
454                                        return outStream.remaining() >= 0;
455                                }
456                                HttpField<?> header = headerIter.next();
457                                writer.write(header.asHeaderField());
458                                writer.write("\r\n");
459                                writer.flush();
460                                if (outStream.remaining() <= 0) {
461                                        break;
462                                }
463                        }
464                } catch (IOException e) {
465                        // Formally thrown by writer, cannot happen.
466                }
467                return false;
468        }
469
470        /**
471         * Copy as much data as possible from in to out.
472         * 
473         * @param in
474         * @param out
475         * @param endOfInput
476         */
477        private Encoder.Result copyBodyData(Buffer in, ByteBuffer out,
478                boolean endOfInput) {
479                if (in instanceof CharBuffer) {
480                        // copy via encoder
481                        if (charEncoder == null) {
482                                charEncoder = Charset.forName(bodyCharset()).newEncoder();
483                        }
484                        CoderResult result = charEncoder.encode((CharBuffer) in, out,
485                                endOfInput);
486                        return resultFactory().newResult(result.isOverflow(), 
487                                        !endOfInput && result.isUnderflow(), false);
488                }
489                if (out.remaining() <= leftToStream) {
490                        ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in);
491                } else {
492                        ByteBufferUtils.putAsMuchAsPossible(out, (ByteBuffer) in,
493                                (int) leftToStream);
494                }
495                return resultFactory().newResult(out.remaining() == 0, 
496                                !endOfInput && in.remaining() == 0, false);
497        }
498
499        /**
500         * Handle the input as appropriate for the collect-body mode.
501         * 
502         * @param in
503         * @param endOfInput
504         */
505        private Encoder.Result collectBody(Buffer in, boolean endOfInput) {
506                if (collectedBodyData.remaining() - in.remaining() < -pendingLimit) {
507                        // No space left, output headers, collected and rest (and then
508                        // close)
509                        states.pop();
510                        closeAfterBody = true;
511                        leftToStream = Long.MAX_VALUE;
512                        states.push(State.STREAM_BODY);
513                        states.push(State.STREAM_COLLECTED);
514                        states.push(State.HEADERS);
515                        return resultFactory().newResult(false, false, false);
516                }
517                // Space left, collect
518                if (in instanceof ByteBuffer) {
519                        collectedBodyData.write((ByteBuffer)in);
520                } else {
521                        if (charWriter == null) {
522                                try {
523                                        charWriter = new OutputStreamWriter(collectedBodyData,
524                                                bodyCharset());
525                                } catch (UnsupportedEncodingException e) {
526                                        throw new IllegalArgumentException(e);
527                                }
528                        }
529                        try {
530                                if (in.hasArray()) {
531                                        // more efficient than CharSequence
532                                        charWriter.write(((CharBuffer) in).array(),
533                                                in.arrayOffset() + in.position(), in.remaining());
534                                } else {
535                                        charWriter.append((CharBuffer)in);
536                                }
537                                in.position(in.limit());
538                                charWriter.flush();
539                        } catch (IOException e) {
540                                // Formally thrown, cannot happen
541                        }
542                }
543                if (endOfInput) {
544                        // End of body, found content length!
545                        messageHeader.setField(
546                                        HttpField.CONTENT_LENGTH, collectedBodyData.bytesWritten());
547                        states.pop();
548                        states.push(State.STREAM_COLLECTED);
549                        states.push(State.HEADERS);
550                        return resultFactory().newResult(false, false, false);
551                }
552                // Get more input
553                return resultFactory().newResult(false, true, false);
554        }
555
556        /**
557         * Handle the input as appropriate for the chunked-body mode.
558         * 
559         * @param in the data
560         * @return the result
561         */
562        private Encoder.Result startChunk(Buffer in, boolean endOfInput) {
563                if (endOfInput) {
564                        states.pop();
565                        states.push(State.FINISH_CHUNKED);
566                }
567                try {
568                        // Don't write zero sized chunks
569                        if (in.hasRemaining()) {
570                                if (in instanceof CharBuffer) {
571                                        if (charEncoder == null) {
572                                                charEncoder = Charset.forName(bodyCharset())
573                                                        .newEncoder();
574                                        }
575                                        chunkData = charEncoder.encode((CharBuffer)in);
576                                } else {
577                                        chunkData = (ByteBuffer)in;
578                                }
579                                outStream.write(
580                                        Long.toHexString(chunkData.remaining())
581                                                .getBytes("ascii"));
582                                outStream.write("\r\n".getBytes("ascii"));
583                                states.push(State.FINISH_CHUNK);
584                                states.push(State.STREAM_CHUNK);
585                                return resultFactory().newResult(
586                                                outStream.remaining() <= 0, false, false);
587                        }
588                } catch (IOException e) {
589                        // Formally thrown by outStream, cannot happen.
590                }
591                return resultFactory().newResult(outStream.remaining() < 0, 
592                                in.remaining() == 0 && !endOfInput, false);
593        }
594        
595        /**
596         * Results from {@link HttpEncoder} provide no additional
597         * information compared to {@link org.jdrupes.httpcodec.Codec.Result}. 
598         * This class only provides a factory for creating concrete results.
599         */
600        public static class Result extends Codec.Result {
601        
602                protected Result(boolean overflow, boolean underflow,
603                        boolean closeConnection) {
604                        super(overflow, underflow, closeConnection);
605                }
606
607                /**
608                 * A factory for creating new Results.
609                 */
610                protected abstract static class Factory 
611                        extends Codec.Result.Factory {
612
613                        /**
614                         * Create new result.
615                         * 
616                         * @param overflow
617                         *            {@code true} if the data didn't fit in the out buffer
618                         * @param underflow
619                         *            {@code true} if more data is expected
620                         * @param closeConnection
621                         *            {@code true} if the connection should be closed
622                         * @return the result
623                         */
624                        public Result newResult(boolean overflow, boolean underflow,
625                                boolean closeConnection) {
626                                return new Result(overflow, underflow, closeConnection) {
627                                };
628                        }
629                }
630        }
631}