001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.SocketAddress;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.CharBuffer;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.concurrent.Callable;
034import org.jdrupes.httpcodec.ClientEngine;
035import org.jdrupes.httpcodec.Codec;
036import org.jdrupes.httpcodec.Decoder;
037import org.jdrupes.httpcodec.MessageHeader;
038import org.jdrupes.httpcodec.ProtocolException;
039import org.jdrupes.httpcodec.protocols.http.HttpField;
040import org.jdrupes.httpcodec.protocols.http.HttpResponse;
041import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder;
042import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder;
043import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
044import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
045import org.jdrupes.httpcodec.types.Converters;
046import org.jgrapes.core.Channel;
047import org.jgrapes.core.ClassChannel;
048import org.jgrapes.core.Component;
049import org.jgrapes.core.Components;
050import org.jgrapes.core.Components.PoolingIndex;
051import org.jgrapes.core.EventPipeline;
052import org.jgrapes.core.annotation.Handler;
053import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
054import org.jgrapes.http.events.HostUnresolved;
055import org.jgrapes.http.events.HttpConnected;
056import org.jgrapes.http.events.Request;
057import org.jgrapes.http.events.Response;
058import org.jgrapes.http.events.WebSocketClose;
059import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
060import org.jgrapes.io.events.Close;
061import org.jgrapes.io.events.Closed;
062import org.jgrapes.io.events.IOError;
063import org.jgrapes.io.events.Input;
064import org.jgrapes.io.events.OpenTcpConnection;
065import org.jgrapes.io.events.Output;
066import org.jgrapes.io.util.ManagedBuffer;
067import org.jgrapes.io.util.ManagedBufferPool;
068import org.jgrapes.net.TcpChannel;
069import org.jgrapes.net.events.Connected;
070
071/**
072 * A converter component that receives and sends web application
073 * layer messages and byte buffers on associated network channels.
074 */
075@SuppressWarnings("PMD.ExcessiveImports")
076public class HttpConnector extends Component {
077
078    private int applicationBufferSize = -1;
079    private final Channel netMainChannel;
080    @SuppressWarnings("PMD.UseConcurrentHashMap")
081    private final Map<SocketAddress, Set<WebAppMsgChannel>> connecting
082        = new HashMap<>();
083    private final PoolingIndex<SocketAddress, TcpChannel> pooled
084        = new PoolingIndex<>();
085
086    /**
087     * Denotes the network channel in handler annotations.
088     */
089    private static class NetworkChannel extends ClassChannel {
090    }
091
092    /**
093     * Create a new connector that uses the {@code networkChannel} for network
094     * level I/O.
095     * 
096     * @param appChannel
097     *            this component's channel
098     * @param networkChannel
099     *            the channel for network level I/O
100     */
101    public HttpConnector(Channel appChannel, Channel networkChannel) {
102        super(appChannel, ChannelReplacements.create()
103            .add(NetworkChannel.class, networkChannel));
104        this.netMainChannel = networkChannel;
105    }
106
107    /**
108     * Sets the size of the buffers used for {@link Input} events
109     * on the application channel. Defaults to the upstream buffer size
110     * minus 512 (estimate for added protocol overhead).
111     * 
112     * @param applicationBufferSize the size to set
113     * @return the http server for easy chaining
114     */
115    public HttpConnector setApplicationBufferSize(int applicationBufferSize) {
116        this.applicationBufferSize = applicationBufferSize;
117        return this;
118    }
119
120    /**
121     * Returns the size of the application side (receive) buffers.
122     * 
123     * @return the value or -1 if not set
124     */
125    public int applicationBufferSize() {
126        return applicationBufferSize;
127    }
128
129    /**
130     * Starts the processing of a request from the application layer.
131     * When a network connection has been established, the application
132     * layer will be informed by a {@link HttpConnected} event, fired
133     * on a subchannel that is created for the processing of this
134     * request.
135     *
136     * @param event the request
137     * @throws InterruptedException if processing is interrupted
138     * @throws IOException Signals that an I/O exception has occurred.
139     */
140    @Handler
141    public void onRequest(Request.Out event)
142            throws InterruptedException, IOException {
143        new WebAppMsgChannel(event);
144    }
145
146    /**
147     * Handles output from the application. This may be the payload
148     * of e.g. a POST or data to be transferes on a websocket connection.
149     *
150     * @param event the event
151     * @param appChannel the application layer channel
152     * @throws InterruptedException the interrupted exception
153     */
154    @Handler
155    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
156            throws InterruptedException {
157        appChannel.handleAppOutput(event);
158    }
159
160    /**
161     * Called when the network connection is established. Triggers the
162     * frther processing of the initial request.
163     *
164     * @param event the event
165     * @param netConnChannel the network layer channel
166     * @throws InterruptedException if the execution is interrupted
167     * @throws IOException Signals that an I/O exception has occurred.
168     */
169    @Handler(channels = NetworkChannel.class)
170    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
171    public void onConnected(Connected event, TcpChannel netConnChannel)
172            throws InterruptedException, IOException {
173        // Check if an app channel has been waiting for such a connection
174        WebAppMsgChannel[] appChannel = { null };
175        synchronized (connecting) {
176            connecting.computeIfPresent(event.remoteAddress(), (key, set) -> {
177                Iterator<WebAppMsgChannel> iter = set.iterator();
178                appChannel[0] = iter.next();
179                iter.remove();
180                return set.isEmpty() ? null : set;
181            });
182        }
183        if (appChannel[0] != null) {
184            appChannel[0].connected(netConnChannel);
185        }
186    }
187
188    /**
189     * Handles I/O error events from the network layer.
190     *
191     * @param event the event
192     * @throws IOException Signals that an I/O exception has occurred.
193     */
194    @Handler(channels = NetworkChannel.class)
195    public void onIoError(IOError event) throws IOException {
196        for (Channel channel : event.channels()) {
197            if (channel instanceof TcpChannel) {
198                // Error while using established network connection
199                TcpChannel netConnChannel = (TcpChannel) channel;
200                Optional<WebAppMsgChannel> appChannel
201                    = netConnChannel.associated(WebAppMsgChannel.class);
202                if (appChannel.isPresent()) {
203                    // Error while using a network connection
204                    appChannel.get().handleIoError(event, netConnChannel);
205                    continue;
206                }
207                // Just in case...
208                pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
209                continue;
210            }
211            // Error while trying to establish the network connection
212            if (event.event() instanceof OpenTcpConnection) {
213                OpenTcpConnection connEvent
214                    = (OpenTcpConnection) event.event();
215                Optional<Set<WebAppMsgChannel>> erroneous;
216                synchronized (connecting) {
217                    erroneous = Optional
218                        .ofNullable(connecting.get(connEvent.address()));
219                    connecting.remove(connEvent.address());
220                }
221                erroneous.ifPresent(set -> {
222                    for (WebAppMsgChannel chann : set) {
223                        chann.openError(event);
224                    }
225                });
226            }
227        }
228    }
229
230    /**
231     * Processes any input from the network layer.
232     *
233     * @param event the event
234     * @param netConnChannel the network layer channel
235     * @throws InterruptedException if the thread is interrupted
236     * @throws ProtocolException if the protocol is violated
237     */
238    @Handler(channels = NetworkChannel.class)
239    public void onInput(Input<ByteBuffer> event, TcpChannel netConnChannel)
240            throws InterruptedException, ProtocolException {
241        Optional<WebAppMsgChannel> appChannel
242            = netConnChannel.associated(WebAppMsgChannel.class);
243        if (appChannel.isPresent()) {
244            appChannel.get().handleNetInput(event, netConnChannel);
245        }
246    }
247
248    /**
249     * Called when the network connection is closed. 
250     *
251     * @param event the event
252     * @param netConnChannel the net conn channel
253     */
254    @Handler(channels = NetworkChannel.class)
255    public void onClosed(Closed event, TcpChannel netConnChannel) {
256        netConnChannel.associated(WebAppMsgChannel.class).ifPresent(
257            appChannel -> appChannel.handleClosed(event));
258        pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
259    }
260
261    /**
262     * Handles a close event from the application channel. Such an
263     * event may only be fired of the connection has been upgraded
264     * to a websocket connection.
265     *
266     * @param event the event
267     * @param appChannel the application channel
268     */
269    @Handler
270    public void onClose(Close event, WebAppMsgChannel appChannel) {
271        appChannel.handleClose(event);
272    }
273
274    /**
275     * An application layer channel. When an object is created, it is first
276     * inserted into the {@link HttpConnector#connecting} map. Once a network
277     * channel has been assigned to it, it is primarily referenced by that 
278     * network channel. 
279     */
280    private class WebAppMsgChannel extends DefaultIOSubchannel {
281        // Starts as ClientEngine<HttpRequest,HttpResponse> but may change
282        private final ClientEngine<?, ?> engine
283            = new ClientEngine<>(new HttpRequestEncoder(),
284                new HttpResponseDecoder());
285        private final InetSocketAddress serverAddress;
286        private final Request.Out request;
287        private ManagedBuffer<ByteBuffer> outBuffer;
288        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
289                ByteBuffer> byteBufferPool;
290        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
291                CharBuffer> charBufferPool;
292        private ManagedBufferPool<?, ?> currentPool;
293        private TcpChannel netConnChannel;
294        private final EventPipeline downPipeline;
295        private WsMessageHeader currentWsMessage;
296
297        /**
298         * Instantiates a new channel.
299         *
300         * @param event the event
301         * @param netChannel the net channel
302         * @throws InterruptedException 
303         * @throws IOException 
304         */
305        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
306        public WebAppMsgChannel(Request.Out event)
307                throws InterruptedException, IOException {
308            super(channel(), newEventPipeline());
309
310            // Downstream pipeline, needed even if connection fails
311            downPipeline = newEventPipeline();
312
313            // Extract request data and check host
314            request = event;
315            serverAddress = new InetSocketAddress(
316                event.requestUri().getHost(), event.requestUri().getPort());
317            if (serverAddress.isUnresolved()) {
318                downPipeline.fire(
319                    new HostUnresolved(event, "Host cannot be resolved."),
320                    this);
321                return;
322            }
323
324            // Re-use TCP connection, if possible
325            TcpChannel recycled = pooled.poll(serverAddress);
326            if (recycled != null) {
327                connected(recycled);
328                return;
329            }
330            synchronized (connecting) {
331                connecting
332                    .computeIfAbsent(serverAddress, key -> new HashSet<>())
333                    .add(this);
334            }
335
336            // Fire on main network channel (targeting the tcp connector)
337            // as a follow up event (using the current pipeline).
338            fire(new OpenTcpConnection(serverAddress), netMainChannel);
339        }
340
341        /**
342         * Error in response to trying to open a new TCP connection.
343         *
344         * @param event the event
345         */
346        public void openError(IOError event) {
347            // Already removed from connecting by caller, simply forward.
348            downPipeline.fire(IOError.duplicate(event), this);
349        }
350
351        /**
352         * Error from established TCP connection.
353         *
354         * @param event the event
355         * @param netConnChannel the network channel
356         */
357        public void handleIoError(IOError event, TcpChannel netConnChannel) {
358            downPipeline.fire(IOError.duplicate(event), this);
359        }
360
361        /**
362         * Sets the network connection channel for this application channel.
363         *
364         * @param netConnChannel the net conn channel
365         * @throws InterruptedException the interrupted exception
366         * @throws IOException Signals that an I/O exception has occurred.
367         */
368        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
369        public final void connected(TcpChannel netConnChannel)
370                throws InterruptedException, IOException {
371            // Associate the network channel with this application channel
372            this.netConnChannel = netConnChannel;
373            netConnChannel.setAssociated(WebAppMsgChannel.class, this);
374            request.connectedCallback().ifPresent(
375                consumer -> consumer.accept(request, netConnChannel));
376
377            // Estimate "good" application buffer size
378            int bufferSize = applicationBufferSize;
379            if (bufferSize <= 0) {
380                bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512;
381                if (bufferSize < 4096) {
382                    bufferSize = 4096;
383                }
384            }
385            String channelName = Components.objectName(HttpConnector.this)
386                + "." + Components.objectName(this);
387            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
388            charBufferPool().setName(channelName + ".upstream.charBuffers");
389            // Allocate downstream buffer pools. Note that decoding WebSocket
390            // network packets may result in several WS frames that are each
391            // delivered in independent events. Therefore provide some
392            // additional buffers.
393            final int bufSize = bufferSize;
394            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
395                () -> {
396                    return ByteBuffer.allocate(bufSize);
397                }, 2, 100)
398                    .setName(channelName + ".downstream.byteBuffers");
399            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
400                () -> {
401                    return CharBuffer.allocate(bufSize);
402                }, 2, 100)
403                    .setName(channelName + ".downstream.charBuffers");
404
405            sendMessageUpstream(request.httpRequest(), netConnChannel);
406
407            // Forward Connected event downstream to e.g. start preparation
408            // of output events for payload data.
409            downPipeline.fire(new HttpConnected(request,
410                netConnChannel.localAddress(), netConnChannel.remoteAddress()),
411                this);
412        }
413
414        @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
415        private void sendMessageUpstream(MessageHeader message,
416                TcpChannel netConnChannel) {
417            // Now send request as if it came from downstream (to
418            // avoid confusion with output events that may be
419            // generated in parallel, see below).
420            responsePipeline().submit("SynchronizedResponse",
421                new Callable<Void>() {
422
423                    @SuppressWarnings({ "PMD.CommentRequired",
424                        "PMD.AvoidBranchingStatementAsLastInLoop",
425                        "PMD.AvoidDuplicateLiterals" })
426                    public Void call() throws InterruptedException {
427                        @SuppressWarnings("unchecked")
428                        ClientEngine<MessageHeader, MessageHeader> untypedEngine
429                            = (ClientEngine<MessageHeader,
430                                    MessageHeader>) engine;
431                        untypedEngine.encode(message);
432                        boolean hasBody = message.hasPayload();
433                        while (true) {
434                            outBuffer
435                                = netConnChannel.byteBufferPool().acquire();
436                            Codec.Result result
437                                = engine.encode(Codec.EMPTY_IN,
438                                    outBuffer.backingBuffer(), !hasBody);
439                            if (result.isOverflow()) {
440                                netConnChannel
441                                    .respond(Output.fromSink(outBuffer, false));
442                                continue;
443                            }
444                            if (hasBody) {
445                                // Keep buffer with incomplete request to be
446                                // further
447                                // filled by subsequent Output events
448                                break;
449                            }
450                            // Request is completely encoded
451                            if (outBuffer.position() > 0) {
452                                netConnChannel
453                                    .respond(Output.fromSink(outBuffer, true));
454                            } else {
455                                outBuffer.unlockBuffer();
456                            }
457                            outBuffer = null;
458                            if (result.closeConnection()) {
459                                netConnChannel.respond(new Close());
460                            }
461                            break;
462                        }
463                        return null;
464                    }
465                });
466        }
467
468        @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity",
469            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
470            "PMD.AvoidDuplicateLiterals" })
471        public void handleAppOutput(Output<?> event)
472                throws InterruptedException {
473            Buffer eventData = event.data();
474            Buffer input;
475            if (eventData instanceof ByteBuffer) {
476                input = ((ByteBuffer) eventData).duplicate();
477            } else if (eventData instanceof CharBuffer) {
478                input = ((CharBuffer) eventData).duplicate();
479            } else {
480                return;
481            }
482            if (engine.switchedTo().equals(Optional.of("websocket"))
483                && currentWsMessage == null) {
484                // When switched to WebSockets, we only have Input and Output
485                // events. Add header automatically.
486                @SuppressWarnings("unchecked")
487                ClientEngine<MessageHeader, ?> wsEngine
488                    = (ClientEngine<MessageHeader, ?>) engine;
489                currentWsMessage = new WsMessageHeader(
490                    event.buffer().backingBuffer() instanceof CharBuffer,
491                    true);
492                wsEngine.encode(currentWsMessage);
493            }
494            while (input.hasRemaining() || event.isEndOfRecord()) {
495                if (outBuffer == null) {
496                    outBuffer = netConnChannel.byteBufferPool().acquire();
497                }
498                Codec.Result result = engine.encode(input,
499                    outBuffer.backingBuffer(), event.isEndOfRecord());
500                if (result.isOverflow()) {
501                    netConnChannel.respond(Output.fromSink(outBuffer, false));
502                    outBuffer = netConnChannel.byteBufferPool().acquire();
503                    continue;
504                }
505                if (event.isEndOfRecord() || result.closeConnection()) {
506                    if (outBuffer.position() > 0) {
507                        netConnChannel
508                            .respond(Output.fromSink(outBuffer, true));
509                    } else {
510                        outBuffer.unlockBuffer();
511                    }
512                    outBuffer = null;
513                    if (result.closeConnection()) {
514                        netConnChannel.respond(new Close());
515                    }
516                    break;
517                }
518            }
519            if (engine.switchedTo().equals(Optional.of("websocket"))
520                && event.isEndOfRecord()) {
521                currentWsMessage = null;
522            }
523        }
524
525        @SuppressWarnings({ "PMD.CommentRequired",
526            "PMD.DataflowAnomalyAnalysis" })
527        public void handleNetInput(Input<ByteBuffer> event,
528                TcpChannel netConnChannel)
529                throws InterruptedException, ProtocolException {
530            // Send the data from the event through the decoder.
531            ByteBuffer inData = event.data();
532            // Don't unnecessary allocate a buffer, may be header only message
533            ManagedBuffer<?> bodyData = null;
534            boolean wasOverflow = false;
535            Decoder.Result<?> result;
536            while (inData.hasRemaining()) {
537                if (wasOverflow) {
538                    // Message has (more) body
539                    bodyData = currentPool.acquire();
540                }
541                result = engine.decode(inData,
542                    bodyData == null ? null : bodyData.backingBuffer(),
543                    event.isEndOfRecord());
544                if (result.response().isPresent()) {
545                    sendMessageUpstream(result.response().get(),
546                        netConnChannel);
547                    if (result.isResponseOnly()) {
548                        maybeReleaseConnection(result);
549                        continue;
550                    }
551                }
552                if (result.isHeaderCompleted()) {
553                    MessageHeader header
554                        = engine.responseDecoder().header().get();
555                    if (!handleResponseHeader(header)) {
556                        maybeReleaseConnection(result);
557                        break;
558                    }
559                }
560                if (bodyData != null) {
561                    if (bodyData.position() > 0) {
562                        boolean eor
563                            = !result.isOverflow() && !result.isUnderflow();
564                        downPipeline.fire(Input.fromSink(bodyData, eor), this);
565                    } else {
566                        bodyData.unlockBuffer();
567                    }
568                    bodyData = null;
569                }
570                maybeReleaseConnection(result);
571                wasOverflow = result.isOverflow();
572            }
573        }
574
575        private boolean handleResponseHeader(MessageHeader response) {
576            if (response instanceof HttpResponse) {
577                HttpResponse httpResponse = (HttpResponse) response;
578                if (httpResponse.hasPayload()) {
579                    if (httpResponse.findValue(
580                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
581                        .map(type -> type.value().topLevelType()
582                            .equalsIgnoreCase("text"))
583                        .orElse(false)) {
584                        currentPool = charBufferPool;
585                    } else {
586                        currentPool = byteBufferPool;
587                    }
588                }
589                downPipeline.fire(new Response(httpResponse), this);
590            } else if (response instanceof WsMessageHeader) {
591                WsMessageHeader wsMessage = (WsMessageHeader) response;
592                if (wsMessage.hasPayload()) {
593                    if (wsMessage.isTextMode()) {
594                        currentPool = charBufferPool;
595                    } else {
596                        currentPool = byteBufferPool;
597                    }
598                }
599            } else if (response instanceof WsCloseFrame) {
600                downPipeline.fire(
601                    new WebSocketClose((WsCloseFrame) response, this));
602            }
603            return true;
604        }
605
606        private void maybeReleaseConnection(Decoder.Result<?> result) {
607            if (result.isOverflow() || result.isUnderflow()) {
608                // Data remains to be processed
609                return;
610            }
611            MessageHeader header
612                = engine.responseDecoder().header().get();
613            // Don't release if something follows
614            if (header instanceof HttpResponse
615                && ((HttpResponse) header).statusCode() % 100 == 1) {
616                return;
617            }
618            if (engine.switchedTo().equals(Optional.of("websocket"))) {
619                if (!result.closeConnection()) {
620                    return;
621                }
622                // Is web socket close, inform application layer
623                downPipeline.fire(new Closed(), this);
624            }
625            netConnChannel.setAssociated(WebAppMsgChannel.class, null);
626            if (!result.closeConnection()) {
627                // May be reused
628                pooled.add(serverAddress, netConnChannel);
629            }
630            netConnChannel = null;
631        }
632
633        @SuppressWarnings("PMD.CommentRequired")
634        public void handleClose(Close event) {
635            if (engine.switchedTo().equals(Optional.of("websocket"))) {
636                sendMessageUpstream(new WsCloseFrame(null, null),
637                    netConnChannel);
638            }
639        }
640
641        @SuppressWarnings("PMD.CommentRequired")
642        public void handleClosed(Closed event) {
643            if (engine.switchedTo().equals(Optional.of("websocket"))) {
644                downPipeline.fire(new Closed(), this);
645            }
646        }
647
648    }
649
650}