001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.SocketAddress;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.CharBuffer;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.concurrent.Callable;
034import org.jdrupes.httpcodec.ClientEngine;
035import org.jdrupes.httpcodec.Codec;
036import org.jdrupes.httpcodec.Decoder;
037import org.jdrupes.httpcodec.MessageHeader;
038import org.jdrupes.httpcodec.ProtocolException;
039import org.jdrupes.httpcodec.protocols.http.HttpField;
040import org.jdrupes.httpcodec.protocols.http.HttpResponse;
041import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder;
042import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder;
043import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
044import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
045import org.jdrupes.httpcodec.types.Converters;
046import org.jgrapes.core.Channel;
047import org.jgrapes.core.ClassChannel;
048import org.jgrapes.core.Component;
049import org.jgrapes.core.Components;
050import org.jgrapes.core.Components.PoolingIndex;
051import org.jgrapes.core.EventPipeline;
052import org.jgrapes.core.annotation.Handler;
053import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
054import org.jgrapes.http.events.HostUnresolved;
055import org.jgrapes.http.events.HttpConnected;
056import org.jgrapes.http.events.Request;
057import org.jgrapes.http.events.Response;
058import org.jgrapes.http.events.WebSocketClose;
059import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
060import org.jgrapes.io.events.Close;
061import org.jgrapes.io.events.Closed;
062import org.jgrapes.io.events.IOError;
063import org.jgrapes.io.events.Input;
064import org.jgrapes.io.events.OpenSocketConnection;
065import org.jgrapes.io.events.Output;
066import org.jgrapes.io.util.ManagedBuffer;
067import org.jgrapes.io.util.ManagedBufferPool;
068import org.jgrapes.net.SocketIOChannel;
069import org.jgrapes.net.events.Connected;
070
071/**
072 * A converter component that receives and sends web application
073 * layer messages and byte buffers on associated network channels.
074 */
075@SuppressWarnings("PMD.ExcessiveImports")
076public class HttpConnector extends Component {
077
078    private int applicationBufferSize = -1;
079    private final Channel netMainChannel;
080    @SuppressWarnings("PMD.UseConcurrentHashMap")
081    private final Map<SocketAddress, Set<WebAppMsgChannel>> connecting
082        = new HashMap<>();
083    private final PoolingIndex<SocketAddress, SocketIOChannel> pooled
084        = new PoolingIndex<>();
085
086    /**
087     * Denotes the network channel in handler annotations.
088     */
089    private static class NetworkChannel extends ClassChannel {
090    }
091
092    /**
093     * Create a new connector that uses the {@code networkChannel} for network
094     * level I/O.
095     * 
096     * @param appChannel
097     *            this component's channel
098     * @param networkChannel
099     *            the channel for network level I/O
100     */
101    public HttpConnector(Channel appChannel, Channel networkChannel) {
102        super(appChannel, ChannelReplacements.create()
103            .add(NetworkChannel.class, networkChannel));
104        this.netMainChannel = networkChannel;
105    }
106
107    /**
108     * Sets the size of the buffers used for {@link Input} events
109     * on the application channel. Defaults to the upstream buffer size
110     * minus 512 (estimate for added protocol overhead).
111     * 
112     * @param applicationBufferSize the size to set
113     * @return the http server for easy chaining
114     */
115    public HttpConnector setApplicationBufferSize(int applicationBufferSize) {
116        this.applicationBufferSize = applicationBufferSize;
117        return this;
118    }
119
120    /**
121     * Returns the size of the application side (receive) buffers.
122     * 
123     * @return the value or -1 if not set
124     */
125    public int applicationBufferSize() {
126        return applicationBufferSize;
127    }
128
129    /**
130     * Starts the processing of a request from the application layer.
131     * When a network connection has been established, the application
132     * layer will be informed by a {@link HttpConnected} event, fired
133     * on a subchannel that is created for the processing of this
134     * request.
135     *
136     * @param event the request
137     * @throws InterruptedException if processing is interrupted
138     * @throws IOException Signals that an I/O exception has occurred.
139     */
140    @Handler
141    public void onRequest(Request.Out event)
142            throws InterruptedException, IOException {
143        new WebAppMsgChannel(event);
144    }
145
146    /**
147     * Handles output from the application. This may be the payload
148     * of e.g. a POST or data to be transferes on a websocket connection.
149     *
150     * @param event the event
151     * @param appChannel the application layer channel
152     * @throws InterruptedException the interrupted exception
153     */
154    @Handler
155    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
156            throws InterruptedException {
157        appChannel.handleAppOutput(event);
158    }
159
160    /**
161     * Called when the network connection is established. Triggers the
162     * further processing of the initial request.
163     *
164     * @param event the event
165     * @param netConnChannel the network layer channel
166     * @throws InterruptedException if the execution is interrupted
167     * @throws IOException Signals that an I/O exception has occurred.
168     */
169    @Handler(channels = NetworkChannel.class)
170    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
171    public void onConnected(Connected<?> event, SocketIOChannel netConnChannel)
172            throws InterruptedException, IOException {
173        // Check if an app channel has been waiting for such a connection
174        WebAppMsgChannel[] appChannel = { null };
175        synchronized (connecting) {
176            connecting.computeIfPresent(event.remoteAddress(), (key, set) -> {
177                Iterator<WebAppMsgChannel> iter = set.iterator();
178                appChannel[0] = iter.next();
179                iter.remove();
180                return set.isEmpty() ? null : set;
181            });
182        }
183        if (appChannel[0] != null) {
184            appChannel[0].connected(netConnChannel);
185        }
186    }
187
188    /**
189     * Handles I/O error events from the network layer.
190     *
191     * @param event the event
192     * @throws IOException Signals that an I/O exception has occurred.
193     */
194    @Handler(channels = NetworkChannel.class)
195    public void onIoError(IOError event) throws IOException {
196        for (Channel channel : event.channels()) {
197            if (channel instanceof SocketIOChannel) {
198                // Error while using established network connection
199                SocketIOChannel netConnChannel = (SocketIOChannel) channel;
200                Optional<WebAppMsgChannel> appChannel
201                    = netConnChannel.associated(WebAppMsgChannel.class);
202                if (appChannel.isPresent()) {
203                    // Error while using a network connection
204                    appChannel.get().handleIoError(event, netConnChannel);
205                    continue;
206                }
207                // Just in case...
208                pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
209                continue;
210            }
211            // Error while trying to establish the network connection
212            if (event.event() instanceof OpenSocketConnection) {
213                OpenSocketConnection connEvent
214                    = (OpenSocketConnection) event.event();
215                Optional<Set<WebAppMsgChannel>> erroneous;
216                synchronized (connecting) {
217                    erroneous = Optional
218                        .ofNullable(connecting.get(connEvent.address()));
219                    connecting.remove(connEvent.address());
220                }
221                erroneous.ifPresent(set -> {
222                    for (WebAppMsgChannel chann : set) {
223                        chann.openError(event);
224                    }
225                });
226            }
227        }
228    }
229
230    /**
231     * Processes any input from the network layer.
232     *
233     * @param event the event
234     * @param netConnChannel the network layer channel
235     * @throws InterruptedException if the thread is interrupted
236     * @throws ProtocolException if the protocol is violated
237     */
238    @Handler(channels = NetworkChannel.class)
239    public void onInput(Input<ByteBuffer> event, SocketIOChannel netConnChannel)
240            throws InterruptedException, ProtocolException {
241        Optional<WebAppMsgChannel> appChannel
242            = netConnChannel.associated(WebAppMsgChannel.class);
243        if (appChannel.isPresent()) {
244            appChannel.get().handleNetInput(event, netConnChannel);
245        }
246    }
247
248    /**
249     * Called when the network connection is closed. 
250     *
251     * @param event the event
252     * @param netConnChannel the net conn channel
253     */
254    @Handler(channels = NetworkChannel.class)
255    public void onClosed(Closed<?> event, SocketIOChannel netConnChannel) {
256        netConnChannel.associated(WebAppMsgChannel.class).ifPresent(
257            appChannel -> appChannel.handleClosed(event));
258        pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
259    }
260
261    /**
262     * Handles a close event from the application channel. Such an
263     * event may only be fired if the connection has been upgraded
264     * to a websocket connection.
265     *
266     * @param event the event
267     * @param appChannel the application channel
268     */
269    @Handler
270    public void onClose(Close event, WebAppMsgChannel appChannel) {
271        appChannel.handleClose(event);
272    }
273
274    /**
275     * An application layer channel. When an object is created, it is first
276     * inserted into the {@link HttpConnector#connecting} map. Once a network
277     * channel has been assigned to it, it is primarily referenced by that 
278     * network channel. 
279     */
280    private class WebAppMsgChannel extends DefaultIOSubchannel {
281        // Starts as ClientEngine<HttpRequest,HttpResponse> but may change
282        private final ClientEngine<?, ?> engine
283            = new ClientEngine<>(new HttpRequestEncoder(),
284                new HttpResponseDecoder());
285        private final InetSocketAddress serverAddress;
286        private final Request.Out request;
287        private ManagedBuffer<ByteBuffer> outBuffer;
288        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
289                ByteBuffer> byteBufferPool;
290        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
291                CharBuffer> charBufferPool;
292        private ManagedBufferPool<?, ?> currentPool;
293        private SocketIOChannel netConnChannel;
294        private final EventPipeline downPipeline;
295        private WsMessageHeader currentWsMessage;
296
297        /**
298         * Instantiates a new channel.
299         *
300         * @param event the event
301         * @param netChannel the net channel
302         * @throws InterruptedException 
303         * @throws IOException 
304         */
305        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
306        public WebAppMsgChannel(Request.Out event)
307                throws InterruptedException, IOException {
308            super(channel(), newEventPipeline());
309
310            // Downstream pipeline, needed even if connection fails
311            downPipeline = newEventPipeline();
312
313            // Extract request data and check host
314            request = event;
315            serverAddress = new InetSocketAddress(
316                event.requestUri().getHost(), event.requestUri().getPort());
317            if (serverAddress.isUnresolved()) {
318                downPipeline.fire(
319                    new HostUnresolved(event, "Host cannot be resolved."),
320                    this);
321                return;
322            }
323
324            // Re-use TCP connection, if possible
325            SocketIOChannel recycled = pooled.poll(serverAddress);
326            if (recycled != null) {
327                connected(recycled);
328                return;
329            }
330            synchronized (connecting) {
331                connecting
332                    .computeIfAbsent(serverAddress, key -> new HashSet<>())
333                    .add(this);
334            }
335
336            // Fire on main network channel (targeting the tcp connector)
337            // as a follow up event (using the current pipeline).
338            fire(new OpenSocketConnection(serverAddress), netMainChannel);
339        }
340
341        /**
342         * Error in response to trying to open a new TCP connection.
343         *
344         * @param event the event
345         */
346        public void openError(IOError event) {
347            // Already removed from connecting by caller, simply forward.
348            downPipeline.fire(IOError.duplicate(event), this);
349        }
350
351        /**
352         * Error from established TCP connection.
353         *
354         * @param event the event
355         * @param netConnChannel the network channel
356         */
357        public void handleIoError(IOError event,
358                SocketIOChannel netConnChannel) {
359            downPipeline.fire(IOError.duplicate(event), this);
360        }
361
362        /**
363         * Sets the network connection channel for this application channel.
364         *
365         * @param netConnChannel the net conn channel
366         * @throws InterruptedException the interrupted exception
367         * @throws IOException Signals that an I/O exception has occurred.
368         */
369        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
370        public final void connected(SocketIOChannel netConnChannel)
371                throws InterruptedException, IOException {
372            // Associate the network channel with this application channel
373            this.netConnChannel = netConnChannel;
374            netConnChannel.setAssociated(WebAppMsgChannel.class, this);
375            request.connectedCallback().ifPresent(
376                consumer -> consumer.accept(request, netConnChannel));
377
378            // Estimate "good" application buffer size
379            int bufferSize = applicationBufferSize;
380            if (bufferSize <= 0) {
381                bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512;
382                if (bufferSize < 4096) {
383                    bufferSize = 4096;
384                }
385            }
386            String channelName = Components.objectName(HttpConnector.this)
387                + "." + Components.objectName(this);
388            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
389            charBufferPool().setName(channelName + ".upstream.charBuffers");
390            // Allocate downstream buffer pools. Note that decoding WebSocket
391            // network packets may result in several WS frames that are each
392            // delivered in independent events. Therefore provide some
393            // additional buffers.
394            final int bufSize = bufferSize;
395            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
396                () -> {
397                    return ByteBuffer.allocate(bufSize);
398                }, 2, 100)
399                    .setName(channelName + ".downstream.byteBuffers");
400            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
401                () -> {
402                    return CharBuffer.allocate(bufSize);
403                }, 2, 100)
404                    .setName(channelName + ".downstream.charBuffers");
405
406            sendMessageUpstream(request.httpRequest(), netConnChannel);
407
408            // Forward Connected event downstream to e.g. start preparation
409            // of output events for payload data.
410            downPipeline.fire(new HttpConnected(request,
411                netConnChannel.localAddress(), netConnChannel.remoteAddress()),
412                this);
413        }
414
415        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
416            "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" })
417        private void sendMessageUpstream(MessageHeader message,
418                SocketIOChannel netConnChannel) {
419            // Now send request as if it came from downstream (to
420            // avoid confusion with output events that may be
421            // generated in parallel, see below).
422            responsePipeline().submit("SynchronizedResponse",
423                new Callable<Void>() {
424
425                    @SuppressWarnings({ "PMD.CommentRequired",
426                        "PMD.AvoidBranchingStatementAsLastInLoop",
427                        "PMD.AvoidDuplicateLiterals",
428                        "PMD.AvoidInstantiatingObjectsInLoops" })
429                    public Void call() throws InterruptedException {
430                        @SuppressWarnings("unchecked")
431                        ClientEngine<MessageHeader, MessageHeader> untypedEngine
432                            = (ClientEngine<MessageHeader,
433                                    MessageHeader>) engine;
434                        untypedEngine.encode(message);
435                        boolean hasBody = message.hasPayload();
436                        while (true) {
437                            outBuffer
438                                = netConnChannel.byteBufferPool().acquire();
439                            Codec.Result result
440                                = engine.encode(Codec.EMPTY_IN,
441                                    outBuffer.backingBuffer(), !hasBody);
442                            if (result.isOverflow()) {
443                                netConnChannel
444                                    .respond(Output.fromSink(outBuffer, false));
445                                continue;
446                            }
447                            if (hasBody) {
448                                // Keep buffer with incomplete request to be
449                                // further
450                                // filled by subsequent Output events
451                                break;
452                            }
453                            // Request is completely encoded
454                            if (outBuffer.position() > 0) {
455                                netConnChannel
456                                    .respond(Output.fromSink(outBuffer, true));
457                            } else {
458                                outBuffer.unlockBuffer();
459                            }
460                            outBuffer = null;
461                            if (result.closeConnection()) {
462                                netConnChannel.respond(new Close());
463                            }
464                            break;
465                        }
466                        return null;
467                    }
468                });
469        }
470
471        @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity",
472            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
473            "PMD.AvoidDuplicateLiterals", "PMD.CognitiveComplexity" })
474        public void handleAppOutput(Output<?> event)
475                throws InterruptedException {
476            Buffer eventData = event.data();
477            Buffer input;
478            if (eventData instanceof ByteBuffer) {
479                input = ((ByteBuffer) eventData).duplicate();
480            } else if (eventData instanceof CharBuffer) {
481                input = ((CharBuffer) eventData).duplicate();
482            } else {
483                return;
484            }
485            if (engine.switchedTo().equals(Optional.of("websocket"))
486                && currentWsMessage == null) {
487                // When switched to WebSockets, we only have Input and Output
488                // events. Add header automatically.
489                @SuppressWarnings("unchecked")
490                ClientEngine<MessageHeader, ?> wsEngine
491                    = (ClientEngine<MessageHeader, ?>) engine;
492                currentWsMessage = new WsMessageHeader(
493                    event.buffer().backingBuffer() instanceof CharBuffer,
494                    true);
495                wsEngine.encode(currentWsMessage);
496            }
497            while (input.hasRemaining() || event.isEndOfRecord()) {
498                if (outBuffer == null) {
499                    outBuffer = netConnChannel.byteBufferPool().acquire();
500                }
501                Codec.Result result = engine.encode(input,
502                    outBuffer.backingBuffer(), event.isEndOfRecord());
503                if (result.isOverflow()) {
504                    netConnChannel.respond(Output.fromSink(outBuffer, false));
505                    outBuffer = netConnChannel.byteBufferPool().acquire();
506                    continue;
507                }
508                if (event.isEndOfRecord() || result.closeConnection()) {
509                    if (outBuffer.position() > 0) {
510                        netConnChannel
511                            .respond(Output.fromSink(outBuffer, true));
512                    } else {
513                        outBuffer.unlockBuffer();
514                    }
515                    outBuffer = null;
516                    if (result.closeConnection()) {
517                        netConnChannel.respond(new Close());
518                    }
519                    break;
520                }
521            }
522            if (engine.switchedTo().equals(Optional.of("websocket"))
523                && event.isEndOfRecord()) {
524                currentWsMessage = null;
525            }
526        }
527
528        @SuppressWarnings({ "PMD.CommentRequired",
529            "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" })
530        public void handleNetInput(Input<ByteBuffer> event,
531                SocketIOChannel netConnChannel)
532                throws InterruptedException, ProtocolException {
533            // Send the data from the event through the decoder.
534            ByteBuffer inData = event.data();
535            // Don't unnecessary allocate a buffer, may be header only message
536            ManagedBuffer<?> bodyData = null;
537            boolean wasOverflow = false;
538            Decoder.Result<?> result;
539            while (inData.hasRemaining()) {
540                if (wasOverflow) {
541                    // Message has (more) body
542                    bodyData = currentPool.acquire();
543                }
544                result = engine.decode(inData,
545                    bodyData == null ? null : bodyData.backingBuffer(),
546                    event.isEndOfRecord());
547                if (result.response().isPresent()) {
548                    sendMessageUpstream(result.response().get(),
549                        netConnChannel);
550                    if (result.isResponseOnly()) {
551                        maybeReleaseConnection(result);
552                        continue;
553                    }
554                }
555                if (result.isHeaderCompleted()) {
556                    MessageHeader header
557                        = engine.responseDecoder().header().get();
558                    if (!handleResponseHeader(header)) {
559                        maybeReleaseConnection(result);
560                        break;
561                    }
562                }
563                if (bodyData != null) {
564                    if (bodyData.position() > 0) {
565                        boolean eor
566                            = !result.isOverflow() && !result.isUnderflow();
567                        downPipeline.fire(Input.fromSink(bodyData, eor), this);
568                    } else {
569                        bodyData.unlockBuffer();
570                    }
571                    bodyData = null;
572                }
573                maybeReleaseConnection(result);
574                wasOverflow = result.isOverflow();
575            }
576        }
577
578        @SuppressWarnings("PMD.CognitiveComplexity")
579        private boolean handleResponseHeader(MessageHeader response) {
580            if (response instanceof HttpResponse) {
581                HttpResponse httpResponse = (HttpResponse) response;
582                if (httpResponse.hasPayload()) {
583                    if (httpResponse.findValue(
584                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
585                        .map(type -> "text"
586                            .equalsIgnoreCase(type.value().topLevelType()))
587                        .orElse(false)) {
588                        currentPool = charBufferPool;
589                    } else {
590                        currentPool = byteBufferPool;
591                    }
592                }
593                downPipeline.fire(new Response(httpResponse), this);
594            } else if (response instanceof WsMessageHeader) {
595                WsMessageHeader wsMessage = (WsMessageHeader) response;
596                if (wsMessage.hasPayload()) {
597                    if (wsMessage.isTextMode()) {
598                        currentPool = charBufferPool;
599                    } else {
600                        currentPool = byteBufferPool;
601                    }
602                }
603            } else if (response instanceof WsCloseFrame) {
604                downPipeline.fire(
605                    new WebSocketClose((WsCloseFrame) response, this));
606            }
607            return true;
608        }
609
610        private void maybeReleaseConnection(Decoder.Result<?> result) {
611            if (result.isOverflow() || result.isUnderflow()) {
612                // Data remains to be processed
613                return;
614            }
615            MessageHeader header
616                = engine.responseDecoder().header().get();
617            // Don't release if something follows
618            if (header instanceof HttpResponse
619                && ((HttpResponse) header).statusCode() % 100 == 1) {
620                return;
621            }
622            if (engine.switchedTo().equals(Optional.of("websocket"))) {
623                if (!result.closeConnection()) {
624                    return;
625                }
626                // Is web socket close, inform application layer
627                downPipeline.fire(new Closed<Void>(), this);
628            }
629            netConnChannel.setAssociated(WebAppMsgChannel.class, null);
630            if (!result.closeConnection()) {
631                // May be reused
632                pooled.add(serverAddress, netConnChannel);
633            }
634            netConnChannel = null;
635        }
636
637        @SuppressWarnings("PMD.CommentRequired")
638        public void handleClose(Close event) {
639            if (engine.switchedTo().equals(Optional.of("websocket"))) {
640                sendMessageUpstream(new WsCloseFrame(null, null),
641                    netConnChannel);
642            }
643        }
644
645        @SuppressWarnings("PMD.CommentRequired")
646        public void handleClosed(Closed<?> event) {
647            if (engine.switchedTo().equals(Optional.of("websocket"))) {
648                downPipeline.fire(new Closed<Void>(), this);
649            }
650        }
651
652    }
653
654}