001/*
002 * JGrapes Event driven Framework
003 * Copyright (C) 2018 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.nio.channels.SocketChannel;
024import org.jgrapes.core.Channel;
025import org.jgrapes.core.Event;
026import org.jgrapes.core.Self;
027import org.jgrapes.core.annotation.Handler;
028import org.jgrapes.core.events.Error;
029import org.jgrapes.io.NioHandler;
030import org.jgrapes.io.events.Close;
031import org.jgrapes.io.events.ConnectError;
032import org.jgrapes.io.events.NioRegistration;
033import org.jgrapes.io.events.OpenSocketConnection;
034import org.jgrapes.io.events.Opening;
035import org.jgrapes.net.events.ClientConnected;
036import org.jgrapes.net.events.Connected;
037
038/**
039 * A component that reads from or write to a socket connection.
040 */
041public class SocketConnector extends SocketConnectionManager {
042
043    /**
044     * Create a new instance using the given channel.
045     * 
046     * @param channel the component's channel 
047     */
048    public SocketConnector(Channel channel) {
049        super(channel);
050    }
051
052    /**
053     * Creates a new connector, using itself as component channel. 
054     */
055    public SocketConnector() {
056        this(Channel.SELF);
057    }
058
059    @Override
060    public SocketConnector setBufferSize(int size) {
061        super.setBufferSize(size);
062        return this;
063    }
064
065    /**
066     * Opens a connection to the end point specified in the event.
067     *
068     * @param event the event
069     */
070    @Handler
071    public void onOpenConnection(OpenSocketConnection event) {
072        try {
073            @SuppressWarnings("PMD.CloseResource")
074            SocketChannel socketChannel = SocketChannel.open(event.address());
075            channels.add(new SocketChannelImpl(event, socketChannel));
076        } catch (ConnectException e) {
077            fire(new ConnectError(event, "Connection refused.", e));
078        } catch (IOException e) {
079            fire(new ConnectError(event, "Failed to open socket connection.",
080                e));
081        }
082    }
083
084    /**
085     * Called when the new socket channel has successfully been registered
086     * with the nio dispatcher.
087     *
088     * @param event the event
089     * @throws InterruptedException the interrupted exception
090     * @throws IOException Signals that an I/O exception has occurred.
091     */
092    @Handler(channels = Self.class)
093    public void onRegistered(NioRegistration.Completed event)
094            throws InterruptedException, IOException {
095        NioHandler handler = event.event().handler();
096        if (!(handler instanceof SocketChannelImpl)) {
097            return;
098        }
099        if (event.event().get() == null) {
100            fire(new Error(event, "Registration failed, no NioDispatcher?",
101                new Throwable()));
102            return;
103        }
104        SocketChannelImpl channel = (SocketChannelImpl) handler;
105        Connected<?> connected;
106        if (channel.openEvent().isPresent()) {
107            connected = new ClientConnected(channel.openEvent().get(),
108                channel.nioChannel().getLocalAddress(),
109                channel.nioChannel().getRemoteAddress());
110        } else {
111            connected
112                = new Connected<>(channel.nioChannel().getLocalAddress(),
113                    channel.nioChannel().getRemoteAddress());
114        }
115        var registration = event.event().get();
116        // (1) Opening, (2) Connected, (3) start processing input
117        channel.downPipeline()
118            .fire(Event.onCompletion(new Opening<Void>(), e -> {
119                channel.downPipeline().fire(connected, channel);
120                channel.registrationComplete(registration);
121            }), channel);
122    }
123
124    /**
125     * Shuts down the one of the connections.
126     *
127     * @param event the event
128     * @throws IOException if an I/O exception occurred
129     * @throws InterruptedException if the execution was interrupted
130     */
131    @Handler
132    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
133    public void onClose(Close event) throws IOException, InterruptedException {
134        for (Channel channel : event.channels()) {
135            if (channel instanceof SocketChannelImpl
136                && channels.contains(channel)) {
137                ((SocketChannelImpl) channel).close();
138            }
139        }
140    }
141}