001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.net.SocketAddress; 023import java.nio.ByteBuffer; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.SocketChannel; 026import java.util.ArrayDeque; 027import java.util.HashSet; 028import java.util.Optional; 029import java.util.Queue; 030import java.util.Set; 031import java.util.concurrent.ExecutorService; 032import org.jgrapes.core.Channel; 033import org.jgrapes.core.Component; 034import org.jgrapes.core.Components; 035import org.jgrapes.core.EventPipeline; 036import org.jgrapes.core.Manager; 037import org.jgrapes.core.Subchannel; 038import org.jgrapes.core.annotation.Handler; 039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 040import org.jgrapes.io.NioHandler; 041import org.jgrapes.io.events.Closed; 042import org.jgrapes.io.events.HalfClosed; 043import org.jgrapes.io.events.Input; 044import org.jgrapes.io.events.NioRegistration; 045import org.jgrapes.io.events.NioRegistration.Registration; 046import org.jgrapes.io.events.OpenSocketConnection; 047import org.jgrapes.io.events.Output; 048import org.jgrapes.io.util.ManagedBuffer; 049import org.jgrapes.io.util.ManagedBufferPool; 050 051/** 052 * Provides a base class for the {@link SocketServer} and the 053 * {@link SocketConnector}. 054 */ 055@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount", 056 "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals", 057 "PMD.ExcessiveClassLength" }) 058public abstract class SocketConnectionManager extends Component { 059 060 private int bufferSize = 32_768; 061 protected final Set<SocketChannelImpl> channels = new HashSet<>(); 062 private ExecutorService executorService; 063 064 /** 065 * Creates a new server using the given channel. 066 * 067 * @param componentChannel the component's channel 068 */ 069 public SocketConnectionManager(Channel componentChannel) { 070 super(componentChannel); 071 } 072 073 /** 074 * Sets the buffer size for the send an receive buffers. 075 * If no size is set, a default value of 32768 will be used. 076 * 077 * @param bufferSize the size to use for the send and receive buffers 078 * @return the socket connection manager for easy chaining 079 */ 080 public SocketConnectionManager setBufferSize(int bufferSize) { 081 this.bufferSize = bufferSize; 082 return this; 083 } 084 085 /** 086 * Return the configured buffer size. 087 * 088 * @return the bufferSize 089 */ 090 public int bufferSize() { 091 return bufferSize; 092 } 093 094 /** 095 * Sets an executor service to be used by the event pipelines 096 * that process the data from the network. Setting this 097 * to an executor service with a limited number of threads 098 * allows to control the maximum load from the network. 099 * 100 * @param executorService the executorService to set 101 * @return the socket connection manager for easy chaining 102 * @see Manager#newEventPipeline(ExecutorService) 103 */ 104 public SocketConnectionManager 105 setExecutorService(ExecutorService executorService) { 106 this.executorService = executorService; 107 return this; 108 } 109 110 /** 111 * Returns the executor service. 112 * 113 * @return the executorService 114 */ 115 public ExecutorService executorService() { 116 return executorService; 117 } 118 119 /** 120 * Writes the data passed in the event. 121 * 122 * The end of record flag is used to determine if a channel is 123 * eligible for purging. If the flag is set and all output has 124 * been processed, the channel is purgeable until input is 125 * received or another output event causes the state to be 126 * reevaluated. 127 * 128 * @param event the event 129 * @param channel the channel 130 * @throws InterruptedException the interrupted exception 131 */ 132 @Handler 133 public void onOutput(Output<ByteBuffer> event, 134 SocketChannelImpl channel) throws InterruptedException { 135 if (channels.contains(channel)) { 136 channel.write(event); 137 } 138 } 139 140 /** 141 * Removes the channel from the set of registered channels. 142 * 143 * @param channel the channel 144 * @return true, if channel was registered 145 */ 146 protected boolean removeChannel(SocketChannelImpl channel) { 147 synchronized (channels) { 148 return channels.remove(channel); 149 } 150 } 151 152 /* 153 * (non-Javadoc) 154 * 155 * @see java.lang.Object#toString() 156 */ 157 @Override 158 public String toString() { 159 return Components.objectName(this); 160 } 161 162 /** 163 * The close state. 164 */ 165 private enum ConnectionState { 166 OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED 167 } 168 169 /** 170 * The purgeable state. 171 */ 172 @SuppressWarnings("PMD.ShortVariable") 173 private enum PurgeableState { 174 NO, PENDING, YES 175 } 176 177 /** 178 * The internal representation of a connection. 179 */ 180 /** 181 * 182 */ 183 @SuppressWarnings("PMD.GodClass") 184 protected class SocketChannelImpl 185 extends DefaultIOSubchannel implements NioHandler, SocketIOChannel { 186 187 private final OpenSocketConnection openEvent; 188 private final SocketChannel nioChannel; 189 private final SocketAddress localAddress; 190 private final SocketAddress remoteAddress; 191 private final EventPipeline downPipeline; 192 private final ManagedBufferPool<ManagedBuffer<ByteBuffer>, 193 ByteBuffer> readBuffers; 194 private Registration registration; 195 private int selectionKeys; 196 private final Queue< 197 ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites 198 = new ArrayDeque<>(); 199 private ConnectionState connState = ConnectionState.OPEN; 200 private PurgeableState purgeable = PurgeableState.NO; 201 private long becamePurgeableAt; 202 203 /** 204 * @param nioChannel the channel 205 * @throws IOException if an I/O error occurred 206 */ 207 public SocketChannelImpl(OpenSocketConnection openEvent, 208 SocketChannel nioChannel) throws IOException { 209 super(channel(), newEventPipeline()); 210 this.openEvent = openEvent; 211 this.nioChannel = nioChannel; 212 // Copy, because they are only available while channel is open. 213 localAddress = nioChannel.getLocalAddress(); 214 remoteAddress = nioChannel.getRemoteAddress(); 215 if (executorService == null) { 216 downPipeline = newEventPipeline(); 217 } else { 218 downPipeline = newEventPipeline(executorService); 219 } 220 String channelName 221 = Components.objectName(SocketConnectionManager.this) 222 + "." + Components.objectName(this); 223 224 // Prepare write buffers 225 int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 226 setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new, 227 () -> { 228 return ByteBuffer.allocate(writeBufferSize); 229 }, 2) 230 .setName(channelName + ".upstream.buffers")); 231 232 // Prepare read buffers 233 int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 234 readBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 235 () -> { 236 return ByteBuffer.allocate(readBufferSize); 237 }, 2) 238 .setName(channelName + ".downstream.buffers"); 239 240 // Ready to use 241 channels.add(this); 242 243 // Register with dispatcher 244 nioChannel.configureBlocking(false); 245 SocketConnectionManager.this.fire( 246 new NioRegistration(this, nioChannel, 0, 247 SocketConnectionManager.this), 248 Channel.BROADCAST); 249 } 250 251 /** 252 * Returns the event that caused this connection to be opened. 253 * 254 * May be `null` if the channel was created in response to a 255 * client connecting to the server. 256 * 257 * @return the event 258 */ 259 public Optional<OpenSocketConnection> openEvent() { 260 return Optional.ofNullable(openEvent); 261 } 262 263 /** 264 * Gets the nio channel. 265 * 266 * @return the nioChannel 267 */ 268 public SocketChannel nioChannel() { 269 return nioChannel; 270 } 271 272 @Override 273 public SocketAddress localAddress() { 274 return localAddress; 275 } 276 277 @Override 278 public SocketAddress remoteAddress() { 279 return remoteAddress; 280 } 281 282 /** 283 * Gets the read buffers. 284 * 285 * @return the readBuffers 286 */ 287 public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> 288 readBuffers() { 289 return readBuffers; 290 } 291 292 /** 293 * Gets the down pipeline. 294 * 295 * @return the downPipeline 296 */ 297 public EventPipeline downPipeline() { 298 return downPipeline; 299 } 300 301 /** 302 * Invoked when registration has completed. 303 * 304 * @param registration the registration (result from the 305 * {@link NioRegistration} event) 306 */ 307 public void registrationComplete(Registration registration) { 308 this.registration = registration; 309 selectionKeys |= SelectionKey.OP_READ; 310 registration.updateInterested(selectionKeys); 311 } 312 313 /** 314 * Checks if is purgeable. 315 * 316 * @return true, if is purgeable 317 */ 318 public boolean isPurgeable() { 319 return purgeable == PurgeableState.YES; 320 } 321 322 /** 323 * Gets the the time when the connection became purgeable. 324 * 325 * @return the time 326 */ 327 public long purgeableSince() { 328 return becamePurgeableAt; 329 } 330 331 /** 332 * Write the data on this channel. 333 * 334 * @param event the event 335 */ 336 public void write(Output<ByteBuffer> event) 337 throws InterruptedException { 338 synchronized (pendingWrites) { 339 if (!nioChannel.isOpen()) { 340 return; 341 } 342 ManagedBuffer<ByteBuffer>.ByteBufferView reader 343 = event.buffer().newByteBufferView(); 344 if (!pendingWrites.isEmpty()) { 345 reader.managedBuffer().lockBuffer(); 346 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 347 : PurgeableState.NO; 348 pendingWrites.add(reader); 349 return; 350 } 351 try { 352 nioChannel.write(reader.get()); 353 } catch (IOException e) { 354 forceClose(e); 355 return; 356 } 357 if (!reader.get().hasRemaining()) { 358 if (event.isEndOfRecord()) { 359 becamePurgeableAt = System.currentTimeMillis(); 360 purgeable = PurgeableState.YES; 361 } else { 362 purgeable = PurgeableState.NO; 363 } 364 return; 365 } 366 reader.managedBuffer().lockBuffer(); 367 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 368 : PurgeableState.NO; 369 pendingWrites.add(reader); 370 selectionKeys |= SelectionKey.OP_WRITE; 371 registration.updateInterested(selectionKeys); 372 } 373 } 374 375 @Override 376 public void handleOps(int ops) throws InterruptedException { 377 if ((ops & SelectionKey.OP_READ) != 0) { 378 handleReadOp(); 379 } 380 if ((ops & SelectionKey.OP_WRITE) != 0) { 381 handleWriteOp(); 382 } 383 } 384 385 /** 386 * Gets a buffer from the pool and reads available data into it. 387 * Sends the result as event. 388 * 389 * @throws InterruptedException 390 * @throws IOException 391 */ 392 @SuppressWarnings("PMD.EmptyCatchBlock") 393 private void handleReadOp() throws InterruptedException { 394 ManagedBuffer<ByteBuffer> buffer; 395 buffer = readBuffers.acquire(); 396 try { 397 int bytes = buffer.fillFromChannel(nioChannel); 398 if (bytes == 0) { 399 buffer.unlockBuffer(); 400 return; 401 } 402 if (bytes > 0) { 403 purgeable = PurgeableState.NO; 404 downPipeline.fire(Input.fromSink(buffer, false), this); 405 return; 406 } 407 } catch (IOException e) { 408 // Buffer already unlocked by fillFromChannel 409 forceClose(e); 410 return; 411 } 412 // EOF (-1) from other end 413 buffer.unlockBuffer(); 414 synchronized (nioChannel) { 415 if (connState == ConnectionState.HALF_CLOSED) { 416 // Other end confirms our close, complete close 417 try { 418 nioChannel.close(); 419 } catch (IOException e) { 420 // Ignored for close 421 } 422 connState = ConnectionState.CLOSED; 423 downPipeline.fire(new Closed<Void>(), this); 424 return; 425 } 426 } 427 // Other end initiates close 428 selectionKeys &= ~SelectionKey.OP_READ; 429 registration.updateInterested(selectionKeys); 430 downPipeline.submit("SendHalfClosed", () -> { 431 try { 432 // Inform downstream and wait until everything has settled. 433 newEventPipeline().fire(new HalfClosed(), this).get(); 434 // All settled. 435 removeChannel(this); 436 downPipeline.fire(new Closed<Void>(), this); 437 // Close our end if everything has been written. 438 synchronized (pendingWrites) { 439 synchronized (nioChannel) { 440 try { 441 if (!pendingWrites.isEmpty()) { 442 // Pending writes, delay close 443 connState = ConnectionState.DELAYED_REQUEST; 444 return; 445 } 446 // Nothing left to do, close 447 nioChannel.close(); 448 connState = ConnectionState.CLOSED; 449 } catch (IOException e) { 450 // Ignored for close 451 } 452 } 453 } 454 } catch (InterruptedException e) { 455 // Nothing to do about this 456 } 457 }); 458 } 459 460 /** 461 * Checks if there is still data to be written. This may be 462 * a left over in an incompletely written buffer or a complete 463 * pending buffer. 464 * 465 * @throws IOException 466 * @throws InterruptedException 467 */ 468 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 469 "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop", 470 "PMD.CognitiveComplexity" }) 471 private void handleWriteOp() throws InterruptedException { 472 while (true) { 473 ManagedBuffer<ByteBuffer>.ByteBufferView head; 474 synchronized (pendingWrites) { 475 if (pendingWrites.isEmpty()) { 476 // Nothing left to write, stop getting ops 477 selectionKeys &= ~SelectionKey.OP_WRITE; 478 registration.updateInterested(selectionKeys); 479 // Was the connection closed while we were writing? 480 if (connState == ConnectionState.DELAYED_REQUEST 481 || connState == ConnectionState.DELAYED_EVENT) { 482 synchronized (nioChannel) { 483 try { 484 if (connState == ConnectionState.DELAYED_REQUEST) { 485 // Delayed close request from other end, 486 // complete 487 nioChannel.close(); 488 connState = ConnectionState.CLOSED; 489 } 490 if (connState == ConnectionState.DELAYED_EVENT) { 491 // Delayed close from this end, initiate 492 nioChannel.shutdownOutput(); 493 connState = ConnectionState.HALF_CLOSED; 494 } 495 } catch (IOException e) { 496 // Ignored for close 497 } 498 } 499 } else { 500 if (purgeable == PurgeableState.PENDING) { 501 purgeable = PurgeableState.YES; 502 } 503 } 504 break; // Nothing left to do 505 } 506 head = pendingWrites.peek(); 507 if (!head.get().hasRemaining()) { 508 // Nothing left in head buffer, try next 509 head.managedBuffer().unlockBuffer(); 510 pendingWrites.remove(); 511 continue; 512 } 513 } 514 try { 515 nioChannel.write(head.get()); // write... 516 } catch (IOException e) { 517 forceClose(e); 518 return; 519 } 520 break; // ... and wait for next op 521 } 522 } 523 524 /** 525 * Closes this channel. 526 * 527 * @throws IOException if an error occurs 528 * @throws InterruptedException if the execution was interrupted 529 */ 530 public void close() throws IOException, InterruptedException { 531 if (!removeChannel(this)) { 532 return; 533 } 534 synchronized (pendingWrites) { 535 if (!pendingWrites.isEmpty()) { 536 // Pending writes, delay close until done 537 connState = ConnectionState.DELAYED_EVENT; 538 return; 539 } 540 // Nothing left to do, proceed 541 synchronized (nioChannel) { 542 if (nioChannel.isOpen()) { 543 // Initiate close, must be confirmed by other end 544 nioChannel.shutdownOutput(); 545 connState = ConnectionState.HALF_CLOSED; 546 } 547 } 548 } 549 } 550 551 @SuppressWarnings("PMD.EmptyCatchBlock") 552 private void forceClose(Throwable error) throws InterruptedException { 553 try { 554 nioChannel.close(); 555 connState = ConnectionState.CLOSED; 556 } catch (IOException e) { 557 // Closed only to make sure, any failure can be ignored. 558 } 559 if (removeChannel(this)) { 560 var evt = new Closed<Void>(error); 561 downPipeline.fire(evt, this); 562 } 563 } 564 565 /* 566 * (non-Javadoc) 567 * 568 * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString() 569 */ 570 @Override 571 @SuppressWarnings("PMD.CommentRequired") 572 public String toString() { 573 return Subchannel.toString(this); 574 } 575 } 576 577}