001/*
002 * JGrapes Event driven Framework
003 * Copyright (C) 2018 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.nio.channels.SocketChannel;
024import org.jgrapes.core.Channel;
025import org.jgrapes.core.Event;
026import org.jgrapes.core.Self;
027import org.jgrapes.core.annotation.Handler;
028import org.jgrapes.core.events.Error;
029import org.jgrapes.io.NioHandler;
030import org.jgrapes.io.events.Close;
031import org.jgrapes.io.events.ConnectError;
032import org.jgrapes.io.events.NioRegistration;
033import org.jgrapes.io.events.OpenSocketConnection;
034import org.jgrapes.io.events.Opening;
035import org.jgrapes.net.events.ClientConnected;
036import org.jgrapes.net.events.Connected;
037
038/**
039 * A component that reads from or write to a socket connection.
040 */
041public class SocketConnector extends SocketConnectionManager {
042
043    /**
044     * Create a new instance using the given channel.
045     * 
046     * @param channel the component's channel 
047     */
048    public SocketConnector(Channel channel) {
049        super(channel);
050    }
051
052    /**
053     * Creates a new connector, using itself as component channel. 
054     */
055    public SocketConnector() {
056        this(Channel.SELF);
057    }
058
059    @Override
060    public SocketConnector setBufferSize(int size) {
061        super.setBufferSize(size);
062        return this;
063    }
064
065    /**
066     * Opens a connection to the end point specified in the event.
067     *
068     * @param event the event
069     */
070    @Handler
071    public void onOpenConnection(OpenSocketConnection event) {
072        try {
073            @SuppressWarnings("PMD.CloseResource")
074            SocketChannel socketChannel = SocketChannel.open(event.address());
075            new SocketChannelImpl(event, socketChannel);
076        } catch (ConnectException e) {
077            fire(new ConnectError(event, "Connection refused.", e));
078        } catch (IOException e) {
079            fire(new ConnectError(event, "Failed to open socket connection.",
080                e));
081        }
082    }
083
084    /**
085     * Called when the new socket channel has successfully been registered
086     * with the nio dispatcher.
087     *
088     * @param event the event
089     * @throws InterruptedException the interrupted exception
090     * @throws IOException Signals that an I/O exception has occurred.
091     */
092    @Handler(channels = Self.class)
093    public void onRegistered(NioRegistration.Completed event)
094            throws InterruptedException, IOException {
095        NioHandler handler = event.event().handler();
096        if (!(handler instanceof SocketChannelImpl)
097            || !channels.contains(handler)) {
098            return;
099        }
100        if (event.event().get() == null) {
101            fire(new Error(event, "Registration failed, no NioDispatcher?",
102                new Throwable()));
103            return;
104        }
105        SocketChannelImpl channel = (SocketChannelImpl) handler;
106        Connected<?> connected;
107        if (channel.openEvent().isPresent()) {
108            connected = new ClientConnected(channel.openEvent().get(),
109                channel.nioChannel().getLocalAddress(),
110                channel.nioChannel().getRemoteAddress());
111        } else {
112            connected
113                = new Connected<>(channel.nioChannel().getLocalAddress(),
114                    channel.nioChannel().getRemoteAddress());
115        }
116        var registration = event.event().get();
117        // (1) Opening, (2) Connected, (3) start processing input
118        channel.downPipeline()
119            .fire(Event.onCompletion(new Opening<Void>(), e -> {
120                channel.downPipeline().fire(connected, channel);
121                channel.registrationComplete(registration);
122            }), channel);
123    }
124
125    /**
126     * Shuts down the one of the connections.
127     *
128     * @param event the event
129     * @throws IOException if an I/O exception occurred
130     * @throws InterruptedException if the execution was interrupted
131     */
132    @Handler
133    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
134    public void onClose(Close event) throws IOException, InterruptedException {
135        for (Channel channel : event.channels()) {
136            if (channel instanceof SocketChannelImpl
137                && channels.contains(channel)) {
138                ((SocketChannelImpl) channel).close();
139            }
140        }
141    }
142}