001/* 002 * JGrapes Event driven Framework 003 * Copyright (C) 2018 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.nio.channels.SocketChannel; 024import org.jgrapes.core.Channel; 025import org.jgrapes.core.Event; 026import org.jgrapes.core.Self; 027import org.jgrapes.core.annotation.Handler; 028import org.jgrapes.core.events.Error; 029import org.jgrapes.io.NioHandler; 030import org.jgrapes.io.events.Close; 031import org.jgrapes.io.events.ConnectError; 032import org.jgrapes.io.events.NioRegistration; 033import org.jgrapes.io.events.OpenSocketConnection; 034import org.jgrapes.io.events.Opening; 035import org.jgrapes.net.events.ClientConnected; 036import org.jgrapes.net.events.Connected; 037 038/** 039 * A component that reads from or write to a socket connection. 040 */ 041public class SocketConnector extends SocketConnectionManager { 042 043 /** 044 * Create a new instance using the given channel. 045 * 046 * @param channel the component's channel 047 */ 048 public SocketConnector(Channel channel) { 049 super(channel); 050 } 051 052 /** 053 * Creates a new connector, using itself as component channel. 054 */ 055 public SocketConnector() { 056 this(Channel.SELF); 057 } 058 059 @Override 060 public SocketConnector setBufferSize(int size) { 061 super.setBufferSize(size); 062 return this; 063 } 064 065 /** 066 * Opens a connection to the end point specified in the event. 067 * 068 * @param event the event 069 */ 070 @Handler 071 public void onOpenConnection(OpenSocketConnection event) { 072 try { 073 @SuppressWarnings("PMD.CloseResource") 074 SocketChannel socketChannel = SocketChannel.open(event.address()); 075 new SocketChannelImpl(event, socketChannel); 076 } catch (ConnectException e) { 077 fire(new ConnectError(event, "Connection refused.", e)); 078 } catch (IOException e) { 079 fire(new ConnectError(event, "Failed to open socket connection.", 080 e)); 081 } 082 } 083 084 /** 085 * Called when the new socket channel has successfully been registered 086 * with the nio dispatcher. 087 * 088 * @param event the event 089 * @throws InterruptedException the interrupted exception 090 * @throws IOException Signals that an I/O exception has occurred. 091 */ 092 @Handler(channels = Self.class) 093 public void onRegistered(NioRegistration.Completed event) 094 throws InterruptedException, IOException { 095 NioHandler handler = event.event().handler(); 096 if (!(handler instanceof SocketChannelImpl) 097 || !channels.contains(handler)) { 098 return; 099 } 100 if (event.event().get() == null) { 101 fire(new Error(event, "Registration failed, no NioDispatcher?", 102 new Throwable())); 103 return; 104 } 105 SocketChannelImpl channel = (SocketChannelImpl) handler; 106 Connected<?> connected; 107 if (channel.openEvent().isPresent()) { 108 connected = new ClientConnected(channel.openEvent().get(), 109 channel.nioChannel().getLocalAddress(), 110 channel.nioChannel().getRemoteAddress()); 111 } else { 112 connected 113 = new Connected<>(channel.nioChannel().getLocalAddress(), 114 channel.nioChannel().getRemoteAddress()); 115 } 116 var registration = event.event().get(); 117 // (1) Opening, (2) Connected, (3) start processing input 118 channel.downPipeline() 119 .fire(Event.onCompletion(new Opening<Void>(), e -> { 120 channel.downPipeline().fire(connected, channel); 121 channel.registrationComplete(registration); 122 }), channel); 123 } 124 125 /** 126 * Shuts down the one of the connections. 127 * 128 * @param event the event 129 * @throws IOException if an I/O exception occurred 130 * @throws InterruptedException if the execution was interrupted 131 */ 132 @Handler 133 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 134 public void onClose(Close event) throws IOException, InterruptedException { 135 for (Channel channel : event.channels()) { 136 if (channel instanceof SocketChannelImpl 137 && channels.contains(channel)) { 138 ((SocketChannelImpl) channel).close(); 139 } 140 } 141 } 142}