001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.channels.SelectableChannel; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.Selector; 026import java.util.Set; 027import org.jgrapes.core.Component; 028import org.jgrapes.core.Components; 029import org.jgrapes.core.annotation.Handler; 030import org.jgrapes.core.events.Start; 031import org.jgrapes.core.events.Stop; 032import org.jgrapes.io.events.NioRegistration; 033 034/** 035 * A helper component that provides the central hub for non blocking 036 * I/O components. Exactly one {@code NioDispatcher} must exist in 037 * any tree with {@link NioHandler} components. 038 */ 039public class NioDispatcher extends Component implements Runnable { 040 041 private final Selector selector; 042 private Thread runner; 043 private final Object selectorGate = new Object(); 044 045 /** 046 * Creates a new Dispatcher. 047 * 048 * @throws IOException if an I/O exception occurred 049 */ 050 public NioDispatcher() throws IOException { 051 selector = Selector.open(); 052 } 053 054 /** 055 * Starts this dispatcher. A dispatcher has an associated thread that 056 * keeps it running. 057 * 058 * @param event the event 059 */ 060 @Handler 061 public void onStart(Start event) { 062 synchronized (this) { 063 if (runner != null && !runner.isInterrupted()) { 064 return; 065 } 066 runner = (Components.useVirtualThreads() ? Thread.ofVirtual() 067 : Thread.ofPlatform()).name(Components.simpleObjectName(this)) 068 .start(this); 069 } 070 } 071 072 /** 073 * Stops the thread that is associated with this dispatcher. 074 * 075 * @param event the event 076 * @throws InterruptedException if the execution is interrupted 077 */ 078 @Handler(priority = -10_000) 079 public void onStop(Stop event) throws InterruptedException { 080 synchronized (this) { 081 if (runner == null) { 082 return; 083 } 084 // It just might happen that the wakeup() occurs between the 085 // check for running and the select() in the thread's run loop, 086 // but we -- obviously -- cannot put the select() in a 087 // synchronized(this). 088 while (runner.isAlive()) { 089 runner.interrupt(); // *Should* be sufficient, but... 090 selector.wakeup(); // Make sure 091 runner.join(10); 092 } 093 runner = null; 094 } 095 } 096 097 /** 098 * Invoked once by the thread associated with the dispatcher. Handles 099 * all events from the underlying {@link Selector}. 100 */ 101 @Override 102 @SuppressWarnings({ "PMD.EmptySynchronizedBlock", "PMD.EmptyCatchBlock", 103 "PMD.AvoidCatchingThrowable", "PMD.EmptyControlStatement" }) 104 public void run() { 105 try { 106 registerAsGenerator(); 107 while (!Thread.currentThread().isInterrupted()) { 108 try { 109 selector.select(); 110 Set<SelectionKey> selected = selector.selectedKeys(); 111 for (SelectionKey key : selected) { 112 ((NioHandler) key.attachment()) 113 .handleOps(key.readyOps()); 114 } 115 selected.clear(); 116 synchronized (selectorGate) { 117 // Delay next iteration if another thread has the lock. 118 // "Find bugs" complains, but this is really okay. 119 } 120 } catch (InterruptedIOException | InterruptedException 121 | Error e) { 122 break; 123 } catch (Throwable e) { 124 // Ignore anything else, this loop is crucial. 125 } 126 } 127 } finally { 128 unregisterAsGenerator(); 129 } 130 } 131 132 /** 133 * Handle the NIO registration. 134 * 135 * @param event the event 136 * @throws IOException Signals that an I/O exception has occurred. 137 */ 138 @Handler 139 public void onNioRegistration(NioRegistration event) 140 throws IOException { 141 @SuppressWarnings("PMD.CloseResource") 142 SelectableChannel channel = event.ioChannel(); 143 channel.configureBlocking(false); 144 SelectionKey key; 145 synchronized (selectorGate) { 146 selector.wakeup(); // make sure selector isn't blocking 147 key = channel.register( 148 selector, event.ops(), event.handler()); 149 } 150 event.setResult(new Registration(key)); 151 } 152 153 /** 154 * Represents a NIO registration. 155 */ 156 public class Registration extends NioRegistration.Registration { 157 158 private final SelectionKey key; 159 160 /** 161 * Instantiates a new registration. 162 * 163 * @param key the key 164 */ 165 public Registration(SelectionKey key) { 166 super(); 167 this.key = key; 168 } 169 170 @Override 171 public void updateInterested(int ops) { 172 synchronized (selectorGate) { 173 selector.wakeup(); // make sure selector isn't blocking 174 key.interestOps(ops); 175 } 176 } 177 } 178 179}