001/*
002 * JGrapes Event driven Framework
003 * Copyright (C) 2018 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.nio.channels.SocketChannel;
024import org.jgrapes.core.Channel;
025import org.jgrapes.core.Event;
026import org.jgrapes.core.Self;
027import org.jgrapes.core.annotation.Handler;
028import org.jgrapes.core.events.Error;
029import org.jgrapes.io.NioHandler;
030import org.jgrapes.io.events.Close;
031import org.jgrapes.io.events.ConnectError;
032import org.jgrapes.io.events.NioRegistration;
033import org.jgrapes.io.events.OpenTcpConnection;
034import org.jgrapes.io.events.Opening;
035import org.jgrapes.net.events.ClientConnected;
036import org.jgrapes.net.events.Connected;
037
038/**
039 * A component that reads from or write to a TCP connection.
040 */
041public class TcpConnector extends TcpConnectionManager {
042
043    /**
044     * Create a new instance using the given channel.
045     * 
046     * @param channel the component's channel 
047     */
048    public TcpConnector(Channel channel) {
049        super(channel);
050    }
051
052    /**
053     * Creates a new connector, using itself as component channel. 
054     */
055    public TcpConnector() {
056        this(Channel.SELF);
057    }
058
059    @Override
060    public TcpConnector setBufferSize(int size) {
061        super.setBufferSize(size);
062        return this;
063    }
064
065    /**
066     * Opens a connection to the end point specified in the event.
067     *
068     * @param event the event
069     */
070    @Handler
071    public void onOpenConnection(OpenTcpConnection event) {
072        try {
073            @SuppressWarnings("PMD.CloseResource")
074            SocketChannel socketChannel = SocketChannel.open(event.address());
075            channels.add(new TcpChannelImpl(event, socketChannel));
076        } catch (ConnectException e) {
077            fire(new ConnectError(event, "Connection refused.", e));
078        } catch (IOException e) {
079            fire(new ConnectError(event, "Failed to open TCP connection.", e));
080        }
081    }
082
083    /**
084     * Called when the new socket channel has successfully been registered
085     * with the nio dispatcher.
086     *
087     * @param event the event
088     * @throws InterruptedException the interrupted exception
089     * @throws IOException Signals that an I/O exception has occurred.
090     */
091    @Handler(channels = Self.class)
092    public void onRegistered(NioRegistration.Completed event)
093            throws InterruptedException, IOException {
094        NioHandler handler = event.event().handler();
095        if (!(handler instanceof TcpChannelImpl)) {
096            return;
097        }
098        if (event.event().get() == null) {
099            fire(new Error(event, "Registration failed, no NioDispatcher?",
100                new Throwable()));
101            return;
102        }
103        TcpChannelImpl channel = (TcpChannelImpl) handler;
104        Connected<?> connected;
105        if (channel.openEvent().isPresent()) {
106            connected = new ClientConnected(channel.openEvent().get(),
107                channel.nioChannel().getLocalAddress(),
108                channel.nioChannel().getRemoteAddress());
109        } else {
110            connected
111                = new Connected<>(channel.nioChannel().getLocalAddress(),
112                    channel.nioChannel().getRemoteAddress());
113        }
114        var registration = event.event().get();
115        // (1) Opening, (2) Connected, (3) start processing input
116        channel.downPipeline()
117            .fire(Event.onCompletion(new Opening<Void>(), e -> {
118                channel.downPipeline().fire(connected, channel);
119                channel.registrationComplete(registration);
120            }), channel);
121    }
122
123    /**
124     * Shuts down the one of the connections.
125     *
126     * @param event the event
127     * @throws IOException if an I/O exception occurred
128     * @throws InterruptedException if the execution was interrupted
129     */
130    @Handler
131    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
132    public void onClose(Close event) throws IOException, InterruptedException {
133        for (Channel channel : event.channels()) {
134            if (channel instanceof TcpChannelImpl
135                && channels.contains(channel)) {
136                ((TcpChannelImpl) channel).close();
137            }
138        }
139    }
140}