001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2024 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.http;
020
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.SocketAddress;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.CharBuffer;
027import java.util.Optional;
028import java.util.concurrent.Callable;
029import org.jdrupes.httpcodec.ClientEngine;
030import org.jdrupes.httpcodec.Codec;
031import org.jdrupes.httpcodec.Decoder;
032import org.jdrupes.httpcodec.MessageHeader;
033import org.jdrupes.httpcodec.ProtocolException;
034import org.jdrupes.httpcodec.protocols.http.HttpField;
035import org.jdrupes.httpcodec.protocols.http.HttpResponse;
036import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder;
037import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder;
038import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame;
039import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader;
040import org.jdrupes.httpcodec.types.Converters;
041import org.jgrapes.core.Channel;
042import org.jgrapes.core.ClassChannel;
043import org.jgrapes.core.Component;
044import org.jgrapes.core.Components;
045import org.jgrapes.core.Components.PoolingIndex;
046import org.jgrapes.core.EventPipeline;
047import org.jgrapes.core.annotation.Handler;
048import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
049import org.jgrapes.http.events.HostUnresolved;
050import org.jgrapes.http.events.HttpConnected;
051import org.jgrapes.http.events.Request;
052import org.jgrapes.http.events.Response;
053import org.jgrapes.http.events.WebSocketClose;
054import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
055import org.jgrapes.io.events.Close;
056import org.jgrapes.io.events.Closed;
057import org.jgrapes.io.events.IOError;
058import org.jgrapes.io.events.Input;
059import org.jgrapes.io.events.OpenSocketConnection;
060import org.jgrapes.io.events.Output;
061import org.jgrapes.io.util.ManagedBuffer;
062import org.jgrapes.io.util.ManagedBufferPool;
063import org.jgrapes.net.SocketIOChannel;
064import org.jgrapes.net.events.ClientConnected;
065
066/**
067 * A converter component that receives and sends web application
068 * layer messages and byte buffers on associated network channels.
069 */
070@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" })
071public class HttpConnector extends Component {
072
073    private int applicationBufferSize = -1;
074    private final Channel netMainChannel;
075    private final Channel netSecureChannel;
076    private final PoolingIndex<SocketAddress, SocketIOChannel> pooled
077        = new PoolingIndex<>();
078
079    /**
080     * Denotes the network channel in handler annotations.
081     */
082    private static final class NetworkChannel extends ClassChannel {
083    }
084
085    /**
086     * Create a new connector that uses the {@code networkChannel} for network
087     * level I/O.
088     * 
089     * @param appChannel
090     *            this component's channel
091     * @param networkChannel
092     *            the channel for network level I/O
093     * @param secureChannel
094     *            the channel for secure network level I/O
095     */
096    public HttpConnector(Channel appChannel, Channel networkChannel,
097            Channel secureChannel) {
098        super(appChannel, ChannelReplacements.create()
099            .add(NetworkChannel.class, networkChannel, secureChannel));
100        this.netMainChannel = networkChannel;
101        this.netSecureChannel = secureChannel;
102    }
103
104    /**
105     * Create a new connector that uses the {@code networkChannel} for network
106     * level I/O.
107     * 
108     * @param appChannel
109     *            this component's channel
110     * @param networkChannel
111     *            the channel for network level I/O
112     */
113    public HttpConnector(Channel appChannel, Channel networkChannel) {
114        super(appChannel, ChannelReplacements.create()
115            .add(NetworkChannel.class, networkChannel));
116        this.netMainChannel = networkChannel;
117        this.netSecureChannel = null;
118    }
119
120    /**
121     * Sets the size of the buffers used for {@link Input} events
122     * on the application channel. Defaults to the upstream buffer size
123     * minus 512 (estimate for added protocol overhead).
124     * 
125     * @param applicationBufferSize the size to set
126     * @return the http server for easy chaining
127     */
128    public HttpConnector setApplicationBufferSize(int applicationBufferSize) {
129        this.applicationBufferSize = applicationBufferSize;
130        return this;
131    }
132
133    /**
134     * Returns the size of the application side (receive) buffers.
135     * 
136     * @return the value or -1 if not set
137     */
138    public int applicationBufferSize() {
139        return applicationBufferSize;
140    }
141
142    /**
143     * Starts the processing of a request from the application layer.
144     * When a network connection has been established, the application
145     * layer will be informed by a {@link HttpConnected} event, fired
146     * on a subchannel that is created for the processing of this
147     * request.
148     *
149     * @param event the request
150     * @throws InterruptedException if processing is interrupted
151     * @throws IOException Signals that an I/O exception has occurred.
152     */
153    @Handler
154    public void onRequest(Request.Out event)
155            throws InterruptedException, IOException {
156        new WebAppMsgChannel(event);
157    }
158
159    /**
160     * Handles output from the application. This may be the payload
161     * of e.g. a POST or data to be transferes on a websocket connection.
162     *
163     * @param event the event
164     * @param appChannel the application layer channel
165     * @throws InterruptedException the interrupted exception
166     */
167    @Handler
168    @SuppressWarnings({ "PMD.CompareObjectsWithEquals",
169        "PMD.AvoidDuplicateLiterals" })
170    public void onOutput(Output<?> event, WebAppMsgChannel appChannel)
171            throws InterruptedException {
172        if (appChannel.httpConnector() == this) {
173            appChannel.handleAppOutput(event);
174        }
175    }
176
177    /**
178     * Called when the network connection is established. Triggers the
179     * further processing of the initial request.
180     *
181     * @param event the event
182     * @param netConnChannel the network layer channel
183     * @throws InterruptedException if the execution is interrupted
184     * @throws IOException Signals that an I/O exception has occurred.
185     */
186    @Handler(channels = NetworkChannel.class)
187    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
188    public void onConnected(ClientConnected event,
189            SocketIOChannel netConnChannel)
190            throws InterruptedException, IOException {
191        // Check if this is a response to our request
192        var appChannel = event.openEvent().associated(WebAppMsgChannel.class)
193            .filter(c -> c.httpConnector() == this);
194        if (appChannel.isPresent()) {
195            appChannel.get().connected(netConnChannel);
196        }
197    }
198
199    /**
200     * Handles I/O error events from the network layer.
201     *
202     * @param event the event
203     * @throws IOException Signals that an I/O exception has occurred.
204     */
205    @Handler(channels = NetworkChannel.class)
206    @SuppressWarnings("PMD.CompareObjectsWithEquals")
207    public void onIoError(IOError event) throws IOException {
208        for (Channel channel : event.channels()) {
209            if (channel instanceof SocketIOChannel netConnChannel) {
210                // Error while using established network connection
211                Optional<WebAppMsgChannel> appChannel
212                    = netConnChannel.associated(WebAppMsgChannel.class)
213                        .filter(c -> c.httpConnector() == this);
214                if (appChannel.isPresent()) {
215                    // Error while using a network connection
216                    appChannel.get().handleIoError(event, netConnChannel);
217                    continue;
218                }
219                // Just in case...
220                pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
221                continue;
222            }
223
224            // Error while trying to establish the network connection
225            if (event.event() instanceof OpenSocketConnection connEvent) {
226                connEvent.associated(WebAppMsgChannel.class)
227                    .filter(c -> c.httpConnector() == this).ifPresent(c -> {
228                        c.openError(event);
229                    });
230            }
231        }
232    }
233
234    /**
235     * Processes any input from the network layer.
236     *
237     * @param event the event
238     * @param netConnChannel the network layer channel
239     * @throws InterruptedException if the thread is interrupted
240     * @throws ProtocolException if the protocol is violated
241     */
242    @Handler(channels = NetworkChannel.class)
243    @SuppressWarnings("PMD.CompareObjectsWithEquals")
244    public void onInput(Input<ByteBuffer> event, SocketIOChannel netConnChannel)
245            throws InterruptedException, ProtocolException {
246        Optional<WebAppMsgChannel> appChannel
247            = netConnChannel.associated(WebAppMsgChannel.class)
248                .filter(c -> c.httpConnector() == this);
249        if (appChannel.isPresent()) {
250            appChannel.get().handleNetInput(event, netConnChannel);
251        }
252    }
253
254    /**
255     * Called when the network connection is closed. 
256     *
257     * @param event the event
258     * @param netConnChannel the net conn channel
259     */
260    @Handler(channels = NetworkChannel.class)
261    public void onClosed(Closed<?> event, SocketIOChannel netConnChannel) {
262        netConnChannel.associated(WebAppMsgChannel.class)
263            .filter(c -> c.httpConnector() == this).ifPresent(
264                appChannel -> appChannel.handleClosed(event));
265        pooled.remove(netConnChannel.remoteAddress(), netConnChannel);
266    }
267
268    /**
269     * Handles a close event from the application channel. Such an
270     * event may only be fired if the connection has been upgraded
271     * to a websocket connection.
272     *
273     * @param event the event
274     * @param appChannel the application channel
275     */
276    @Handler
277    @SuppressWarnings("PMD.CompareObjectsWithEquals")
278    public void onClose(Close event, WebAppMsgChannel appChannel) {
279        if (appChannel.httpConnector() == this) {
280            appChannel.handleClose(event);
281        }
282    }
283
284    /**
285     * An application layer channel.
286     */
287    private class WebAppMsgChannel extends DefaultIOSubchannel {
288        // Starts as ClientEngine<HttpRequest,HttpResponse> but may change
289        private final ClientEngine<?, ?> engine
290            = new ClientEngine<>(new HttpRequestEncoder(),
291                new HttpResponseDecoder());
292        private final InetSocketAddress serverAddress;
293        private final Request.Out request;
294        private ManagedBuffer<ByteBuffer> outBuffer;
295        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
296                ByteBuffer> byteBufferPool;
297        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
298                CharBuffer> charBufferPool;
299        private ManagedBufferPool<?, ?> currentPool;
300        private SocketIOChannel netConnChannel;
301        private final EventPipeline downPipeline;
302        private WsMessageHeader currentWsMessage;
303
304        /**
305         * Instantiates a new channel.
306         *
307         * @param event the event
308         * @param netChannel the net channel
309         * @throws InterruptedException 
310         * @throws IOException 
311         */
312        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
313        public WebAppMsgChannel(Request.Out event)
314                throws InterruptedException, IOException {
315            super(channel(), newEventPipeline());
316
317            // Downstream pipeline, needed even if connection fails
318            downPipeline = newEventPipeline();
319
320            // Extract request data and check host
321            request = event;
322            var uri = request.requestUri();
323            var port = uri.getPort();
324            if (port == -1) {
325                if ("https".equalsIgnoreCase(uri.getScheme())) {
326                    port = 443;
327                } else if ("http".equalsIgnoreCase(uri.getScheme())) {
328                    port = 80;
329                }
330            }
331            serverAddress = new InetSocketAddress(uri.getHost(), port);
332            if (serverAddress.isUnresolved()) {
333                downPipeline.fire(new HostUnresolved(event,
334                    "Host cannot be resolved."), this);
335                return;
336            }
337
338            // Re-use network connection, if possible
339            SocketIOChannel recycled = pooled.poll(serverAddress);
340            if (recycled != null) {
341                connected(recycled);
342                return;
343            }
344
345            // Fire on network channel (targeting the network connector)
346            // as a follow up event (using the current pipeline).
347            var useSecure = uri.getScheme().equalsIgnoreCase("https")
348                && netSecureChannel != null;
349            fire(new OpenSocketConnection(serverAddress)
350                .setAssociated(WebAppMsgChannel.class, this),
351                useSecure ? netSecureChannel : netMainChannel);
352        }
353
354        private HttpConnector httpConnector() {
355            return HttpConnector.this;
356        }
357
358        /**
359         * Error in response to trying to open a new TCP connection.
360         *
361         * @param event the event
362         */
363        public void openError(IOError event) {
364            // Already removed from connecting by caller, simply forward.
365            downPipeline.fire(IOError.duplicate(event), this);
366        }
367
368        /**
369         * Error from established TCP connection.
370         *
371         * @param event the event
372         * @param netConnChannel the network channel
373         */
374        public void handleIoError(IOError event,
375                SocketIOChannel netConnChannel) {
376            downPipeline.fire(IOError.duplicate(event), this);
377        }
378
379        /**
380         * Sets the network connection channel for this application channel.
381         *
382         * @param netConnChannel the net conn channel
383         * @throws InterruptedException the interrupted exception
384         * @throws IOException Signals that an I/O exception has occurred.
385         */
386        @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
387        public final void connected(SocketIOChannel netConnChannel)
388                throws InterruptedException, IOException {
389            // Associate the network channel with this application channel
390            this.netConnChannel = netConnChannel;
391            netConnChannel.setAssociated(WebAppMsgChannel.class, this);
392            request.connectedCallback().ifPresent(
393                consumer -> consumer.accept(request, netConnChannel));
394
395            // Estimate "good" application buffer size
396            int bufferSize = applicationBufferSize;
397            if (bufferSize <= 0) {
398                bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512;
399                if (bufferSize < 4096) {
400                    bufferSize = 4096;
401                }
402            }
403            String channelName = Components.objectName(HttpConnector.this)
404                + "." + Components.objectName(this);
405            byteBufferPool().setName(channelName + ".upstream.byteBuffers");
406            charBufferPool().setName(channelName + ".upstream.charBuffers");
407            // Allocate downstream buffer pools. Note that decoding WebSocket
408            // network packets may result in several WS frames that are each
409            // delivered in independent events. Therefore provide some
410            // additional buffers.
411            final int bufSize = bufferSize;
412            byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
413                () -> {
414                    return ByteBuffer.allocate(bufSize);
415                }, 2, 100)
416                    .setName(channelName + ".downstream.byteBuffers");
417            charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
418                () -> {
419                    return CharBuffer.allocate(bufSize);
420                }, 2, 100)
421                    .setName(channelName + ".downstream.charBuffers");
422
423            sendMessageUpstream(request.httpRequest(), netConnChannel);
424
425            // Forward Connected event downstream to e.g. start preparation
426            // of output events for payload data.
427            downPipeline.fire(new HttpConnected(request,
428                netConnChannel.localAddress(), netConnChannel.remoteAddress()),
429                this);
430        }
431
432        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
433            "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" })
434        private void sendMessageUpstream(MessageHeader message,
435                SocketIOChannel netConnChannel) {
436            // Now send request as if it came from downstream (to
437            // avoid confusion with output events that may be
438            // generated in parallel, see below).
439            responsePipeline().submit("SynchronizedResponse",
440                new Callable<Void>() {
441
442                    @Override
443                    @SuppressWarnings({ "PMD.CommentRequired",
444                        "PMD.AvoidBranchingStatementAsLastInLoop",
445                        "PMD.AvoidDuplicateLiterals",
446                        "PMD.AvoidInstantiatingObjectsInLoops" })
447                    public Void call() throws InterruptedException {
448                        @SuppressWarnings("unchecked")
449                        ClientEngine<MessageHeader, MessageHeader> untypedEngine
450                            = (ClientEngine<MessageHeader,
451                                    MessageHeader>) engine;
452                        untypedEngine.encode(message);
453                        boolean hasBody = message.hasPayload();
454                        while (true) {
455                            outBuffer
456                                = netConnChannel.byteBufferPool().acquire();
457                            Codec.Result result
458                                = engine.encode(Codec.EMPTY_IN,
459                                    outBuffer.backingBuffer(), !hasBody);
460                            if (result.isOverflow()) {
461                                netConnChannel
462                                    .respond(Output.fromSink(outBuffer, false));
463                                continue;
464                            }
465                            if (hasBody) {
466                                // Keep buffer with incomplete request to be
467                                // further
468                                // filled by subsequent Output events
469                                break;
470                            }
471                            // Request is completely encoded
472                            if (outBuffer.position() > 0) {
473                                netConnChannel
474                                    .respond(Output.fromSink(outBuffer, true));
475                            } else {
476                                outBuffer.unlockBuffer();
477                            }
478                            outBuffer = null;
479                            if (result.closeConnection()) {
480                                netConnChannel.respond(new Close());
481                            }
482                            break;
483                        }
484                        return null;
485                    }
486                });
487        }
488
489        @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity",
490            "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops",
491            "PMD.AvoidDuplicateLiterals", "PMD.CognitiveComplexity" })
492        public void handleAppOutput(Output<?> event)
493                throws InterruptedException {
494            Buffer eventData = event.data();
495            Buffer input;
496            if (eventData instanceof ByteBuffer) {
497                input = ((ByteBuffer) eventData).duplicate();
498            } else if (eventData instanceof CharBuffer) {
499                input = ((CharBuffer) eventData).duplicate();
500            } else {
501                return;
502            }
503            if (engine.switchedTo().equals(Optional.of("websocket"))
504                && currentWsMessage == null) {
505                // When switched to WebSockets, we only have Input and Output
506                // events. Add header automatically.
507                @SuppressWarnings("unchecked")
508                ClientEngine<MessageHeader, ?> wsEngine
509                    = (ClientEngine<MessageHeader, ?>) engine;
510                currentWsMessage = new WsMessageHeader(
511                    event.buffer().backingBuffer() instanceof CharBuffer,
512                    true);
513                wsEngine.encode(currentWsMessage);
514            }
515            while (input.hasRemaining() || event.isEndOfRecord()) {
516                if (outBuffer == null) {
517                    outBuffer = netConnChannel.byteBufferPool().acquire();
518                }
519                Codec.Result result = engine.encode(input,
520                    outBuffer.backingBuffer(), event.isEndOfRecord());
521                if (result.isOverflow()) {
522                    netConnChannel.respond(Output.fromSink(outBuffer, false));
523                    outBuffer = netConnChannel.byteBufferPool().acquire();
524                    continue;
525                }
526                if (event.isEndOfRecord() || result.closeConnection()) {
527                    if (outBuffer.position() > 0) {
528                        netConnChannel
529                            .respond(Output.fromSink(outBuffer, true));
530                    } else {
531                        outBuffer.unlockBuffer();
532                    }
533                    outBuffer = null;
534                    if (result.closeConnection()) {
535                        netConnChannel.respond(new Close());
536                    }
537                    break;
538                }
539            }
540            if (engine.switchedTo().equals(Optional.of("websocket"))
541                && event.isEndOfRecord()) {
542                currentWsMessage = null;
543            }
544        }
545
546        @SuppressWarnings({ "PMD.CommentRequired",
547            "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" })
548        public void handleNetInput(Input<ByteBuffer> event,
549                SocketIOChannel netConnChannel)
550                throws InterruptedException, ProtocolException {
551            // Send the data from the event through the decoder.
552            ByteBuffer inData = event.data();
553            // Don't unnecessary allocate a buffer, may be header only message
554            ManagedBuffer<?> bodyData = null;
555            boolean wasOverflow = false;
556            Decoder.Result<?> result;
557            while (inData.hasRemaining()) {
558                if (wasOverflow) {
559                    // Message has (more) body
560                    bodyData = currentPool.acquire();
561                }
562                result = engine.decode(inData,
563                    bodyData == null ? null : bodyData.backingBuffer(),
564                    event.isEndOfRecord());
565                if (result.response().isPresent()) {
566                    sendMessageUpstream(result.response().get(),
567                        netConnChannel);
568                    if (result.isResponseOnly()) {
569                        maybeReleaseConnection(result);
570                        continue;
571                    }
572                }
573                if (result.isHeaderCompleted()) {
574                    MessageHeader header
575                        = engine.responseDecoder().header().get();
576                    if (!handleResponseHeader(header)) {
577                        maybeReleaseConnection(result);
578                        break;
579                    }
580                }
581                if (bodyData != null) {
582                    if (bodyData.position() > 0) {
583                        boolean eor
584                            = !result.isOverflow() && !result.isUnderflow();
585                        downPipeline.fire(Input.fromSink(bodyData, eor), this);
586                    } else {
587                        bodyData.unlockBuffer();
588                    }
589                    bodyData = null;
590                }
591                maybeReleaseConnection(result);
592                wasOverflow = result.isOverflow();
593            }
594        }
595
596        @SuppressWarnings("PMD.CognitiveComplexity")
597        private boolean handleResponseHeader(MessageHeader response) {
598            if (response instanceof HttpResponse) {
599                HttpResponse httpResponse = (HttpResponse) response;
600                if (httpResponse.hasPayload()) {
601                    if (httpResponse.findValue(
602                        HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE)
603                        .map(type -> "text"
604                            .equalsIgnoreCase(type.value().topLevelType()))
605                        .orElse(false)) {
606                        currentPool = charBufferPool;
607                    } else {
608                        currentPool = byteBufferPool;
609                    }
610                }
611                downPipeline.fire(new Response(httpResponse), this);
612            } else if (response instanceof WsMessageHeader) {
613                WsMessageHeader wsMessage = (WsMessageHeader) response;
614                if (wsMessage.hasPayload()) {
615                    if (wsMessage.isTextMode()) {
616                        currentPool = charBufferPool;
617                    } else {
618                        currentPool = byteBufferPool;
619                    }
620                }
621            } else if (response instanceof WsCloseFrame) {
622                downPipeline.fire(
623                    new WebSocketClose((WsCloseFrame) response, this));
624            }
625            return true;
626        }
627
628        private void maybeReleaseConnection(Decoder.Result<?> result) {
629            if (result.isOverflow() || result.isUnderflow()) {
630                // Data remains to be processed
631                return;
632            }
633            MessageHeader header
634                = engine.responseDecoder().header().get();
635            // Don't release if something follows
636            if (header instanceof HttpResponse
637                && ((HttpResponse) header).statusCode() % 100 == 1) {
638                return;
639            }
640            if (engine.switchedTo().equals(Optional.of("websocket"))) {
641                if (!result.closeConnection()) {
642                    return;
643                }
644                // Is web socket close, inform application layer
645                downPipeline.fire(new Closed<Void>(), this);
646            }
647            netConnChannel.setAssociated(WebAppMsgChannel.class, null);
648            if (!result.closeConnection()) {
649                // May be reused
650                pooled.add(serverAddress, netConnChannel);
651            }
652            netConnChannel = null;
653        }
654
655        @SuppressWarnings("PMD.CommentRequired")
656        public void handleClose(Close event) {
657            if (engine.switchedTo().equals(Optional.of("websocket"))) {
658                sendMessageUpstream(new WsCloseFrame(null, null),
659                    netConnChannel);
660            }
661        }
662
663        @SuppressWarnings("PMD.CommentRequired")
664        public void handleClosed(Closed<?> event) {
665            if (engine.switchedTo().equals(Optional.of("websocket"))) {
666                downPipeline.fire(new Closed<Void>(), this);
667            }
668        }
669
670    }
671
672}