001/*
002 * JGrapes Event driven Framework
003 * Copyright (C) 2018 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.nio.channels.SocketChannel;
024
025import org.jgrapes.core.Channel;
026import org.jgrapes.core.Self;
027import org.jgrapes.core.annotation.Handler;
028import org.jgrapes.core.events.Error;
029import org.jgrapes.io.NioHandler;
030import org.jgrapes.io.events.Close;
031import org.jgrapes.io.events.ConnectError;
032import org.jgrapes.io.events.NioRegistration;
033import org.jgrapes.io.events.OpenTcpConnection;
034import org.jgrapes.net.events.Connected;
035
036/**
037 * A component that reads from or write to a TCP connection.
038 */
039public class TcpConnector extends TcpConnectionManager {
040
041    private int bufferSize = 1536;
042
043    /**
044     * Create a new instance using the given channel.
045     * 
046     * @param channel the component's channel 
047     */
048    public TcpConnector(Channel channel) {
049        super(channel);
050    }
051
052    /**
053     * Creates a new connector, using itself as component channel. 
054     */
055    public TcpConnector() {
056        this(Channel.SELF);
057    }
058
059    /**
060     * Sets the buffer size for the send an receive buffers.
061     * If no size is set, the system defaults will be used.
062     * 
063     * @param size the size to use for the send and receive buffers
064     * @return the TCP connector for easy chaining
065     */
066    public TcpConnector setBufferSize(int size) {
067        this.bufferSize = size;
068        return this;
069    }
070
071    /**
072     * Return the configured buffer size.
073     *
074     * @return the bufferSize
075     */
076    public int bufferSize() {
077        return bufferSize;
078    }
079
080    /**
081     * Opens a connection to the end point specified in the event.
082     *
083     * @param event the event
084     */
085    @Handler
086    public void onOpenConnection(OpenTcpConnection event) {
087        try {
088            SocketChannel socketChannel = SocketChannel.open(event.address());
089            channels.add(new TcpChannelImpl(socketChannel));
090        } catch (ConnectException e) {
091            fire(new ConnectError(event, "Connection refused.", e));
092        } catch (IOException e) {
093            fire(new ConnectError(event, "Failed to open TCP connection.", e));
094        }
095    }
096
097    /**
098     * Called when the new socket channel has successfully been registered
099     * with the nio dispatcher.
100     *
101     * @param event the event
102     * @throws InterruptedException the interrupted exception
103     * @throws IOException Signals that an I/O exception has occurred.
104     */
105    @Handler(channels = Self.class)
106    public void onRegistered(NioRegistration.Completed event)
107            throws InterruptedException, IOException {
108        NioHandler handler = event.event().handler();
109        if (!(handler instanceof TcpChannelImpl)) {
110            return;
111        }
112        if (event.event().get() == null) {
113            fire(new Error(event, "Registration failed, no NioDispatcher?",
114                new Throwable()));
115            return;
116        }
117        TcpChannelImpl channel = (TcpChannelImpl) handler;
118        channel.registrationComplete(event.event());
119        channel.downPipeline()
120            .fire(new Connected(channel.nioChannel().getLocalAddress(),
121                channel.nioChannel().getRemoteAddress()), channel);
122    }
123
124    /**
125     * Shuts down the one of the connections.
126     *
127     * @param event the event
128     * @throws IOException if an I/O exception occurred
129     * @throws InterruptedException if the execution was interrupted
130     */
131    @Handler
132    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
133    public void onClose(Close event) throws IOException, InterruptedException {
134        for (Channel channel : event.channels()) {
135            if (channel instanceof TcpChannelImpl
136                && channels.contains(channel)) {
137                ((TcpChannelImpl) channel).close();
138            }
139        }
140    }
141}