001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.util.HashSet; 022import java.util.Set; 023import java.util.concurrent.ExecutorService; 024import org.jgrapes.core.Channel; 025import org.jgrapes.core.Component; 026import org.jgrapes.core.Components; 027import org.jgrapes.core.Event; 028import org.jgrapes.core.EventPipeline; 029import org.jgrapes.core.Manager; 030import org.jgrapes.core.Subchannel; 031import org.jgrapes.core.annotation.Handler; 032import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements; 033import org.jgrapes.core.events.Stop; 034import org.jgrapes.io.events.Close; 035 036/** 037 * A base class for components that manage {@link Subchannel}s representing 038 * some kind of connection to a server or service. 039 * 040 * @param <C> the type of the managed connections 041 */ 042public abstract class ConnectionManager< 043 C extends ConnectionManager<C>.Connection> 044 extends Component { 045 046 protected final Set<C> connections = new HashSet<>(); 047 private ExecutorService executorService; 048 049 /** 050 * Creates a new component base with its channel set to 051 * itself. 052 */ 053 public ConnectionManager() { 054 super(); 055 } 056 057 /** 058 * Creates a new component base with its channel set to the given 059 * channel. As a special case {@link Channel#SELF} can be 060 * passed to the constructor to make the component use itself 061 * as channel. The special value is necessary as you 062 * obviously cannot pass an object to be constructed to its 063 * constructor. 064 * 065 * @param componentChannel the channel that the component's 066 * handlers listen on by default and that 067 * {@link Manager#fire(Event, Channel...)} sends the event to 068 */ 069 public ConnectionManager(Channel componentChannel, 070 ChannelReplacements channelReplacements) { 071 super(componentChannel, channelReplacements); 072 } 073 074 /** 075 * Creates a new component base like 076 * {@link Component#Component(Channel)} but with channel mappings 077 * for {@link Handler} annotations. 078 * 079 * @param componentChannel the channel that the component's 080 * handlers listen on by default and that 081 * {@link Manager#fire(Event, Channel...)} sends the event to 082 */ 083 public ConnectionManager(Channel componentChannel) { 084 super(componentChannel); 085 } 086 087 /** 088 * If connections are event generators, register the component as 089 * generator upon the creation of the first connection and unregister 090 * it when closing the last one. 091 * 092 * @return true, if connections generate 093 */ 094 protected abstract boolean connectionsGenerate(); 095 096 /** 097 * Sets an executor service to be used by the downstream event 098 * pipelines. Setting this to an executor service with a limited 099 * number of threads allows to control the maximum load caused 100 * by events generated by this component. 101 * 102 * @param executorService the executorService to set 103 * @return the connection manager for easy chaining 104 * @see Manager#newEventPipeline(ExecutorService) 105 */ 106 public ConnectionManager<C> 107 setExecutorService(ExecutorService executorService) { 108 this.executorService = executorService; 109 return this; 110 } 111 112 /** 113 * Returns the executor service. 114 * 115 * @return the executorService 116 */ 117 public ExecutorService executorService() { 118 if (executorService == null) { 119 return Components.defaultExecutorService(); 120 } 121 return executorService; 122 } 123 124 /** 125 * Closes all connections. 126 * 127 * @param event the event 128 */ 129 @Handler 130 public void onStop(Stop event) { 131 while (true) { 132 C connection; 133 synchronized (connections) { 134 var itr = connections.iterator(); 135 if (!itr.hasNext()) { 136 return; 137 } 138 connection = itr.next(); 139 } 140 connection.close(); 141 } 142 } 143 144 /** 145 * Closes the given connection. 146 * 147 * @param event the event 148 * @param connection the connection 149 */ 150 @Handler 151 public void onClose(Close event, C connection) { 152 synchronized (this) { 153 if (connections.contains(connection)) { 154 connection.close(); 155 } 156 } 157 } 158 159 /** 160 * The base class for the connections managed by this component. 161 */ 162 public class Connection extends Subchannel.DefaultSubchannel { 163 164 private final EventPipeline downPipeline; 165 166 /** 167 * @param mainChannel 168 */ 169 @SuppressWarnings({ "unchecked", 170 "PMD.ConstructorCallsOverridableMethod" }) 171 public Connection(Channel mainChannel) { 172 super(mainChannel); 173 synchronized (ConnectionManager.this) { 174 if (connections.isEmpty() && connectionsGenerate()) { 175 registerAsGenerator(); 176 } 177 connections.add((C) this); 178 } 179 if (executorService == null) { 180 downPipeline = newEventPipeline(); 181 } else { 182 downPipeline = newEventPipeline(executorService); 183 } 184 } 185 186 /** 187 * Gets the down pipeline. 188 * 189 * @return the downPipeline 190 */ 191 public EventPipeline downPipeline() { 192 return downPipeline; 193 } 194 195 /** 196 * Closes the connection. If the last connection is closed 197 * and the component is a generator (see 198 * {@link ConnectionManager#connectionsGenerate()), the component 199 * is unregistered as generator. 200 */ 201 public void close() { 202 synchronized (this) { 203 connections.remove(this); 204 if (connections.isEmpty() && connectionsGenerate()) { 205 unregisterAsGenerator(); 206 } 207 } 208 } 209 210 } 211}