001/* 002 * JGrapes Event driven Framework 003 * Copyright (C) 2018 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.nio.channels.SocketChannel; 024import org.jgrapes.core.Channel; 025import org.jgrapes.core.Event; 026import org.jgrapes.core.Self; 027import org.jgrapes.core.annotation.Handler; 028import org.jgrapes.core.events.Error; 029import org.jgrapes.io.NioHandler; 030import org.jgrapes.io.events.Close; 031import org.jgrapes.io.events.ConnectError; 032import org.jgrapes.io.events.NioRegistration; 033import org.jgrapes.io.events.OpenTcpConnection; 034import org.jgrapes.io.events.Opening; 035import org.jgrapes.net.events.ClientConnected; 036import org.jgrapes.net.events.Connected; 037 038/** 039 * A component that reads from or write to a TCP connection. 040 */ 041public class TcpConnector extends TcpConnectionManager { 042 043 /** 044 * Create a new instance using the given channel. 045 * 046 * @param channel the component's channel 047 */ 048 public TcpConnector(Channel channel) { 049 super(channel); 050 } 051 052 /** 053 * Creates a new connector, using itself as component channel. 054 */ 055 public TcpConnector() { 056 this(Channel.SELF); 057 } 058 059 @Override 060 public TcpConnector setBufferSize(int size) { 061 super.setBufferSize(size); 062 return this; 063 } 064 065 /** 066 * Opens a connection to the end point specified in the event. 067 * 068 * @param event the event 069 */ 070 @Handler 071 public void onOpenConnection(OpenTcpConnection event) { 072 try { 073 @SuppressWarnings("PMD.CloseResource") 074 SocketChannel socketChannel = SocketChannel.open(event.address()); 075 channels.add(new TcpChannelImpl(event, socketChannel)); 076 } catch (ConnectException e) { 077 fire(new ConnectError(event, "Connection refused.", e)); 078 } catch (IOException e) { 079 fire(new ConnectError(event, "Failed to open TCP connection.", e)); 080 } 081 } 082 083 /** 084 * Called when the new socket channel has successfully been registered 085 * with the nio dispatcher. 086 * 087 * @param event the event 088 * @throws InterruptedException the interrupted exception 089 * @throws IOException Signals that an I/O exception has occurred. 090 */ 091 @Handler(channels = Self.class) 092 public void onRegistered(NioRegistration.Completed event) 093 throws InterruptedException, IOException { 094 NioHandler handler = event.event().handler(); 095 if (!(handler instanceof TcpChannelImpl)) { 096 return; 097 } 098 if (event.event().get() == null) { 099 fire(new Error(event, "Registration failed, no NioDispatcher?", 100 new Throwable())); 101 return; 102 } 103 TcpChannelImpl channel = (TcpChannelImpl) handler; 104 Connected<?> connected; 105 if (channel.openEvent().isPresent()) { 106 connected = new ClientConnected(channel.openEvent().get(), 107 channel.nioChannel().getLocalAddress(), 108 channel.nioChannel().getRemoteAddress()); 109 } else { 110 connected 111 = new Connected<>(channel.nioChannel().getLocalAddress(), 112 channel.nioChannel().getRemoteAddress()); 113 } 114 var registration = event.event().get(); 115 // (1) Opening, (2) Connected, (3) start processing input 116 channel.downPipeline() 117 .fire(Event.onCompletion(new Opening<Void>(), e -> { 118 channel.downPipeline().fire(connected, channel); 119 channel.registrationComplete(registration); 120 }), channel); 121 } 122 123 /** 124 * Shuts down the one of the connections. 125 * 126 * @param event the event 127 * @throws IOException if an I/O exception occurred 128 * @throws InterruptedException if the execution was interrupted 129 */ 130 @Handler 131 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 132 public void onClose(Close event) throws IOException, InterruptedException { 133 for (Channel channel : event.channels()) { 134 if (channel instanceof TcpChannelImpl 135 && channels.contains(channel)) { 136 ((TcpChannelImpl) channel).close(); 137 } 138 } 139 } 140}