001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.SocketAddress;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.CharBuffer;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.concurrent.Callable;
034
035import org.jdrupes.httpcodec.ClientEngine;
036import org.jdrupes.httpcodec.Codec;
037import org.jdrupes.httpcodec.Decoder;
038import org.jdrupes.httpcodec.MessageHeader;
039import org.jdrupes.httpcodec.ProtocolException;
040import org.jdrupes.httpcodec.protocols.http.HttpField;
041import org.jdrupes.httpcodec.protocols.http.HttpResponse;
042import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder;
043import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder;
044import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
045import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
046import org.jdrupes.httpcodec.types.Converters;
047import org.jgrapes.core.Channel;
048import org.jgrapes.core.ClassChannel;
049import org.jgrapes.core.Component;
050import org.jgrapes.core.Components;
051import org.jgrapes.core.Components.PoolingIndex;
052import org.jgrapes.core.EventPipeline;
053import org.jgrapes.core.annotation.Handler;
054import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
055import org.jgrapes.http.events.HostUnresolved;
056import org.jgrapes.http.events.HttpConnected;
057import org.jgrapes.http.events.Request;
058import org.jgrapes.http.events.Response;
059import org.jgrapes.http.events.WebSocketClose;
060import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
061import org.jgrapes.io.events.Close;
062import org.jgrapes.io.events.Closed;
063import org.jgrapes.io.events.IOError;
064import org.jgrapes.io.events.Input;
065import org.jgrapes.io.events.OpenTcpConnection;
066import org.jgrapes.io.events.Output;
067import org.jgrapes.io.util.ManagedBuffer;
068import org.jgrapes.io.util.ManagedBufferPool;
069import org.jgrapes.net.TcpChannel;
070import org.jgrapes.net.events.Connected;
071
072/**
073 * A converter component that receives and sends web application
074 * layer messages and byte buffers on associated network channels.
075 */
076@SuppressWarnings("PMD.ExcessiveImports")
077public class HttpConnector extends Component {
078
079    private int applicationBufferSize = -1;
080    private final Channel netMainChannel;
081    @SuppressWarnings("PMD.UseConcurrentHashMap")
082    private final Map<SocketAddress, Set<WebAppMsgChannel>> connecting
083        = new HashMap<>();
084    private final PoolingIndex<SocketAddress, TcpChannel> pooled
085        = new PoolingIndex<>();
086
087    /**
088     * Denotes the network channel in handler annotations.
089     */
090    private static class NetworkChannel extends ClassChannel {
091    }
092
093    /**
094     * Create a new connector that uses the {@code networkChannel} for network
095     * level I/O.
096     * 
097     * @param appChannel
098     *            this component's channel
099     * @param networkChannel
100     *            the channel for network level I/O
101     */
102    public HttpConnector(Channel appChannel, Channel networkChannel) {
103        super(appChannel, ChannelReplacements.create()
104            .add(NetworkChannel.class, networkChannel));
105        this.netMainChannel = networkChannel;
106    }
107
108    /**
109     * Sets the size of the buffers used for {@link Input} events
110     * on the application channel. Defaults to the upstream buffer size
111     * minus 512 (estimate for added protocol overhead).
112     * 
113     * @param applicationBufferSize the size to set
114     * @return the http server for easy chaining
115     */
116    public HttpConnector setApplicationBufferSize(int applicationBufferSize) {
117        this.applicationBufferSize = applicationBufferSize;
118        return this;
119    }
120
121    /**
122     * Returns the size of the application side (receive) buffers.
123     * 
124     * @return the value or -1 if not set
125     */
126    public int applicationBufferSize() {
127        return applicationBufferSize;
128    }
129
130    /**
131     * Starts the processing of a request from the application layer.
132     * When a network connection has been established, the application
133     * layer will be informed by a {@link HttpConnected} event, fired
134     * on a subchannel that is created for the processing of this
135     * request.
136     *
137     * @param event the request
138     * @throws InterruptedException if processing is interrupted
139     * @throws IOException Signals that an I/O exception has occurred.
140     */
141    @Handler
142    public void onRequest(Request.Out event)
143            throws InterruptedException, IOException {
144        new WebAppMsgChannel(event);
145    }
146
147    /**
148     * Handles output from the application. This may be the payload
149     * of e.g. a POST or data to be transferes on a websocket connection.
150     *
151     * @param event the event
152     * @param appChannel the application layer channel
153     * @throws InterruptedException the interrupted exception
154     */
155    @Handler
156    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
157            throws InterruptedException {
158        appChannel.handleAppOutput(event);
159    }
160
161    /**
162     * Called when the network connection is established. Triggers the
163     * frther processing of the initial request.
164     *
165     * @param event the event
166     * @param netConnChannel the network layer channel
167     * @throws InterruptedException if the execution is interrupted
168     * @throws IOException Signals that an I/O exception has occurred.
169     */
170    @Handler(channels = NetworkChannel.class)
171    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
172    public void onConnected(Connected event, TcpChannel netConnChannel)
173            throws InterruptedException, IOException {
174        // Check if an app channel has been waiting for such a connection
175        WebAppMsgChannel[] appChannel = { null };
176        synchronized (connecting) {
177            connecting.computeIfPresent(event.remoteAddress(), (key, set) -> {
178                Iterator<WebAppMsgChannel> iter = set.iterator();
179                appChannel[0] = iter.next();
180                iter.remove();
181                return set.isEmpty() ? null : set;
182            });
183        }
184        if (appChannel[0] != null) {
185            appChannel[0].connected(netConnChannel);
186        }
187    }
188
189    /**
190     * Handles I/O error events from the network layer.
191     *
192     * @param event the event
193     * @throws IOException Signals that an I/O exception has occurred.
194     */
195    @Handler(channels = NetworkChannel.class)
196    public void onIoError(IOError event) throws IOException {
197        for (Channel channel : event.channels()) {
198            if (channel instanceof TcpChannel) {
199                // Error while using established network connection
200                TcpChannel netConnChannel = (TcpChannel) channel;
201                Optional<WebAppMsgChannel> appChannel
202                    = netConnChannel.associated(WebAppMsgChannel.class);
203                if (appChannel.isPresent()) {
204                    // Error while using a network connection
205                    appChannel.get().handleIoError(event, netConnChannel);
206                    continue;
207                }
208                // Just in case...
209                pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
210                continue;
211            }
212            // Error while trying to establish the network connection
213            if (event.event() instanceof OpenTcpConnection) {
214                OpenTcpConnection connEvent
215                    = (OpenTcpConnection) event.event();
216                Optional<Set<WebAppMsgChannel>> erroneous;
217                synchronized (connecting) {
218                    erroneous = Optional
219                        .ofNullable(connecting.get(connEvent.address()));
220                    connecting.remove(connEvent.address());
221                }
222                erroneous.ifPresent(set -> {
223                    for (WebAppMsgChannel chann : set) {
224                        chann.openError(event);
225                    }
226                });
227            }
228        }
229    }
230
231    /**
232     * Processes any input from the network layer.
233     *
234     * @param event the event
235     * @param netConnChannel the network layer channel
236     * @throws InterruptedException if the thread is interrupted
237     * @throws ProtocolException if the protocol is violated
238     */
239    @Handler(channels = NetworkChannel.class)
240    public void onInput(Input<ByteBuffer> event, TcpChannel netConnChannel)
241            throws InterruptedException, ProtocolException {
242        Optional<WebAppMsgChannel> appChannel
243            = netConnChannel.associated(WebAppMsgChannel.class);
244        if (appChannel.isPresent()) {
245            appChannel.get().handleNetInput(event, netConnChannel);
246        }
247    }
248
249    /**
250     * Called when the network connection is closed. 
251     *
252     * @param event the event
253     * @param netConnChannel the net conn channel
254     */
255    @Handler(channels = NetworkChannel.class)
256    public void onClosed(Closed event, TcpChannel netConnChannel) {
257        netConnChannel.associated(WebAppMsgChannel.class).ifPresent(
258            appChannel -> appChannel.handleClosed(event));
259        pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
260    }
261
262    /**
263     * Handles a close event from the application channel. Such an
264     * event may only be fired of the connection has been upgraded
265     * to a websocket connection.
266     *
267     * @param event the event
268     * @param appChannel the application channel
269     */
270    @Handler
271    public void onClose(Close event, WebAppMsgChannel appChannel) {
272        appChannel.handleClose(event);
273    }
274
275    /**
276     * An application layer channel. When an object is created, it is first
277     * inserted into the {@link HttpConnector#connecting} map. Once a network
278     * channel has been assigned to it, it is primarily referenced by that 
279     * network channel. 
280     */
281    private class WebAppMsgChannel extends DefaultIOSubchannel {
282        // Starts as ClientEngine<HttpRequest,HttpResponse> but may change
283        private final ClientEngine<?, ?> engine
284            = new ClientEngine<>(new HttpRequestEncoder(),
285                new HttpResponseDecoder());
286        private final InetSocketAddress serverAddress;
287        private final Request.Out request;
288        private ManagedBuffer<ByteBuffer> outBuffer;
289        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
290                ByteBuffer> byteBufferPool;
291        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
292                CharBuffer> charBufferPool;
293        private ManagedBufferPool<?, ?> currentPool;
294        private TcpChannel netConnChannel;
295        private final EventPipeline downPipeline;
296        private WsMessageHeader currentWsMessage;
297
298        /**
299         * Instantiates a new channel.
300         *
301         * @param event the event
302         * @param netChannel the net channel
303         * @throws InterruptedException 
304         * @throws IOException 
305         */
306        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
307        public WebAppMsgChannel(Request.Out event)
308                throws InterruptedException, IOException {
309            super(channel(), newEventPipeline());
310
311            // Downstream pipeline, needed even if connection fails
312            downPipeline = newEventPipeline();
313
314            // Extract request data and check host
315            request = event;
316            serverAddress = new InetSocketAddress(
317                event.requestUri().getHost(), event.requestUri().getPort());
318            if (serverAddress.isUnresolved()) {
319                downPipeline.fire(
320                    new HostUnresolved(event, "Host cannot be resolved."),
321                    this);
322                return;
323            }
324
325            // Re-use TCP connection, if possible
326            TcpChannel recycled = pooled.poll(serverAddress);
327            if (recycled != null) {
328                connected(recycled);
329                return;
330            }
331            synchronized (connecting) {
332                connecting
333                    .computeIfAbsent(serverAddress, key -> new HashSet<>())
334                    .add(this);
335            }
336
337            // Fire on main network channel (targeting the tcp connector)
338            // as a follow up event (using the current pipeline).
339            fire(new OpenTcpConnection(serverAddress), netMainChannel);
340        }
341
342        /**
343         * Error in response to trying to open a new TCP connection.
344         *
345         * @param event the event
346         */
347        public void openError(IOError event) {
348            // Already removed from connecting by caller, simply forward.
349            downPipeline.fire(IOError.duplicate(event), this);
350        }
351
352        /**
353         * Error from established TCP connection.
354         *
355         * @param event the event
356         * @param netConnChannel the network channel
357         */
358        public void handleIoError(IOError event, TcpChannel netConnChannel) {
359            downPipeline.fire(IOError.duplicate(event), this);
360        }
361
362        /**
363         * Sets the network connection channel for this application channel.
364         *
365         * @param netConnChannel the net conn channel
366         * @throws InterruptedException the interrupted exception
367         * @throws IOException Signals that an I/O exception has occurred.
368         */
369        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
370        public final void connected(TcpChannel netConnChannel)
371                throws InterruptedException, IOException {
372            // Associate the network channel with this application channel
373            this.netConnChannel = netConnChannel;
374            netConnChannel.setAssociated(WebAppMsgChannel.class, this);
375            request.connectedCallback().ifPresent(
376                consumer -> consumer.accept(request, netConnChannel));
377
378            // Estimate "good" application buffer size
379            int bufferSize = applicationBufferSize;
380            if (bufferSize <= 0) {
381                bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512;
382                if (bufferSize < 4096) {
383                    bufferSize = 4096;
384                }
385            }
386            String channelName = Components.objectName(HttpConnector.this)
387                + "." + Components.objectName(this);
388            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
389            charBufferPool().setName(channelName + ".upstream.charBuffers");
390            // Allocate downstream buffer pools. Note that decoding WebSocket
391            // network packets may result in several WS frames that are each
392            // delivered in independent events. Therefore provide some
393            // additional buffers.
394            final int bufSize = bufferSize;
395            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
396                () -> {
397                    return ByteBuffer.allocate(bufSize);
398                }, 2, 100)
399                    .setName(channelName + ".downstream.byteBuffers");
400            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
401                () -> {
402                    return CharBuffer.allocate(bufSize);
403                }, 2, 100)
404                    .setName(channelName + ".downstream.charBuffers");
405
406            sendMessageUpstream(request.httpRequest(), netConnChannel);
407
408            // Forward Connected event downstream to e.g. start preparation
409            // of output events for payload data.
410            downPipeline.fire(new HttpConnected(request,
411                netConnChannel.localAddress(), netConnChannel.remoteAddress()),
412                this);
413        }
414
415        @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
416        private void sendMessageUpstream(MessageHeader message,
417                TcpChannel netConnChannel) {
418            // Now send request as if it came from downstream (to
419            // avoid confusion with output events that may be
420            // generated in parallel, see below).
421            responsePipeline().submit("SynchronizedResponse",
422                new Callable<Void>() {
423
424                    @SuppressWarnings({ "PMD.CommentRequired",
425                        "PMD.AvoidBranchingStatementAsLastInLoop",
426                        "PMD.AvoidDuplicateLiterals" })
427                    public Void call() throws InterruptedException {
428                        @SuppressWarnings("unchecked")
429                        ClientEngine<MessageHeader, MessageHeader> untypedEngine
430                            = (ClientEngine<MessageHeader,
431                                    MessageHeader>) engine;
432                        untypedEngine.encode(message);
433                        boolean hasBody = message.hasPayload();
434                        while (true) {
435                            outBuffer
436                                = netConnChannel.byteBufferPool().acquire();
437                            Codec.Result result
438                                = engine.encode(Codec.EMPTY_IN,
439                                    outBuffer.backingBuffer(), !hasBody);
440                            if (result.isOverflow()) {
441                                netConnChannel
442                                    .respond(Output.fromSink(outBuffer, false));
443                                continue;
444                            }
445                            if (hasBody) {
446                                // Keep buffer with incomplete request to be
447                                // further
448                                // filled by subsequent Output events
449                                break;
450                            }
451                            // Request is completely encoded
452                            if (outBuffer.position() > 0) {
453                                netConnChannel
454                                    .respond(Output.fromSink(outBuffer, true));
455                            } else {
456                                outBuffer.unlockBuffer();
457                            }
458                            outBuffer = null;
459                            if (result.closeConnection()) {
460                                netConnChannel.respond(new Close());
461                            }
462                            break;
463                        }
464                        return null;
465                    }
466                });
467        }
468
469        @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity",
470            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
471            "PMD.AvoidDuplicateLiterals" })
472        public void handleAppOutput(Output<?> event)
473                throws InterruptedException {
474            Buffer eventData = event.data();
475            Buffer input;
476            if (eventData instanceof ByteBuffer) {
477                input = ((ByteBuffer) eventData).duplicate();
478            } else if (eventData instanceof CharBuffer) {
479                input = ((CharBuffer) eventData).duplicate();
480            } else {
481                return;
482            }
483            if (engine.switchedTo().equals(Optional.of("websocket"))
484                && currentWsMessage == null) {
485                // When switched to WebSockets, we only have Input and Output
486                // events. Add header automatically.
487                @SuppressWarnings("unchecked")
488                ClientEngine<MessageHeader, ?> wsEngine
489                    = (ClientEngine<MessageHeader, ?>) engine;
490                currentWsMessage = new WsMessageHeader(
491                    event.buffer().backingBuffer() instanceof CharBuffer,
492                    true);
493                wsEngine.encode(currentWsMessage);
494            }
495            while (input.hasRemaining() || event.isEndOfRecord()) {
496                if (outBuffer == null) {
497                    outBuffer = netConnChannel.byteBufferPool().acquire();
498                }
499                Codec.Result result = engine.encode(input,
500                    outBuffer.backingBuffer(), event.isEndOfRecord());
501                if (result.isOverflow()) {
502                    netConnChannel.respond(Output.fromSink(outBuffer, false));
503                    outBuffer = netConnChannel.byteBufferPool().acquire();
504                    continue;
505                }
506                if (event.isEndOfRecord() || result.closeConnection()) {
507                    if (outBuffer.position() > 0) {
508                        netConnChannel
509                            .respond(Output.fromSink(outBuffer, true));
510                    } else {
511                        outBuffer.unlockBuffer();
512                    }
513                    outBuffer = null;
514                    if (result.closeConnection()) {
515                        netConnChannel.respond(new Close());
516                    }
517                    break;
518                }
519            }
520            if (engine.switchedTo().equals(Optional.of("websocket"))
521                && event.isEndOfRecord()) {
522                currentWsMessage = null;
523            }
524        }
525
526        @SuppressWarnings({ "PMD.CommentRequired",
527            "PMD.DataflowAnomalyAnalysis" })
528        public void handleNetInput(Input<ByteBuffer> event,
529                TcpChannel netConnChannel)
530                throws InterruptedException, ProtocolException {
531            // Send the data from the event through the decoder.
532            ByteBuffer inData = event.data();
533            // Don't unnecessary allocate a buffer, may be header only message
534            ManagedBuffer<?> bodyData = null;
535            boolean wasOverflow = false;
536            Decoder.Result<?> result;
537            while (inData.hasRemaining()) {
538                if (wasOverflow) {
539                    // Message has (more) body
540                    bodyData = currentPool.acquire();
541                }
542                result = engine.decode(inData,
543                    bodyData == null ? null : bodyData.backingBuffer(),
544                    event.isEndOfRecord());
545                if (result.response().isPresent()) {
546                    sendMessageUpstream(result.response().get(),
547                        netConnChannel);
548                    if (result.isResponseOnly()) {
549                        maybeReleaseConnection(result);
550                        continue;
551                    }
552                }
553                if (result.isHeaderCompleted()) {
554                    MessageHeader header
555                        = engine.responseDecoder().header().get();
556                    if (!handleResponseHeader(header)) {
557                        maybeReleaseConnection(result);
558                        break;
559                    }
560                }
561                if (bodyData != null) {
562                    if (bodyData.position() > 0) {
563                        boolean eor
564                            = !result.isOverflow() && !result.isUnderflow();
565                        downPipeline.fire(Input.fromSink(bodyData, eor), this);
566                    } else {
567                        bodyData.unlockBuffer();
568                    }
569                    bodyData = null;
570                }
571                maybeReleaseConnection(result);
572                wasOverflow = result.isOverflow();
573            }
574        }
575
576        private boolean handleResponseHeader(MessageHeader response) {
577            if (response instanceof HttpResponse) {
578                HttpResponse httpResponse = (HttpResponse) response;
579                if (httpResponse.hasPayload()) {
580                    if (httpResponse.findValue(
581                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
582                        .map(type -> type.value().topLevelType()
583                            .equalsIgnoreCase("text"))
584                        .orElse(false)) {
585                        currentPool = charBufferPool;
586                    } else {
587                        currentPool = byteBufferPool;
588                    }
589                }
590                downPipeline.fire(new Response(httpResponse), this);
591            } else if (response instanceof WsMessageHeader) {
592                WsMessageHeader wsMessage = (WsMessageHeader) response;
593                if (wsMessage.hasPayload()) {
594                    if (wsMessage.isTextMode()) {
595                        currentPool = charBufferPool;
596                    } else {
597                        currentPool = byteBufferPool;
598                    }
599                }
600            } else if (response instanceof WsCloseFrame) {
601                downPipeline.fire(
602                    new WebSocketClose((WsCloseFrame) response, this));
603            }
604            return true;
605        }
606
607        private void maybeReleaseConnection(Decoder.Result<?> result) {
608            if (result.isOverflow() || result.isUnderflow()) {
609                // Data remains to be processed
610                return;
611            }
612            MessageHeader header
613                = engine.responseDecoder().header().get();
614            // Don't release if something follows
615            if (header instanceof HttpResponse
616                && ((HttpResponse) header).statusCode() % 100 == 1) {
617                return;
618            }
619            if (engine.switchedTo().equals(Optional.of("websocket"))) {
620                if (!result.closeConnection()) {
621                    return;
622                }
623                // Is web socket close, inform application layer
624                downPipeline.fire(new Closed(), this);
625            }
626            netConnChannel.setAssociated(WebAppMsgChannel.class, null);
627            if (!result.closeConnection()) {
628                // May be reused
629                pooled.add(serverAddress, netConnChannel);
630            }
631            netConnChannel = null;
632        }
633
634        @SuppressWarnings("PMD.CommentRequired")
635        public void handleClose(Close event) {
636            if (engine.switchedTo().equals(Optional.of("websocket"))) {
637                sendMessageUpstream(new WsCloseFrame(null, null),
638                    netConnChannel);
639            }
640        }
641
642        @SuppressWarnings("PMD.CommentRequired")
643        public void handleClosed(Closed event) {
644            if (engine.switchedTo().equals(Optional.of("websocket"))) {
645                downPipeline.fire(new Closed(), this);
646            }
647        }
648
649    }
650
651}