001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.channels.SelectableChannel;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.Selector;
026import java.util.Set;
027import org.jgrapes.core.Component;
028import org.jgrapes.core.Components;
029import org.jgrapes.core.annotation.Handler;
030import org.jgrapes.core.events.Start;
031import org.jgrapes.core.events.Stop;
032import org.jgrapes.io.events.NioRegistration;
033
034/**
035 * A helper component that provides the central hub for non blocking
036 * I/O components. Exactly one {@code NioDispatcher} must exist in
037 * any tree with {@link NioHandler} components. 
038 */
039public class NioDispatcher extends Component implements Runnable {
040
041    private final Selector selector;
042    private Thread runner;
043    private final Object selectorGate = new Object();
044
045    /**
046     * Creates a new Dispatcher.
047     * 
048     * @throws IOException if an I/O exception occurred
049     */
050    public NioDispatcher() throws IOException {
051        selector = Selector.open();
052    }
053
054    /**
055     * Starts this dispatcher. A dispatcher has an associated thread that
056     * keeps it running.
057     * 
058     * @param event the event
059     */
060    @Handler
061    public void onStart(Start event) {
062        synchronized (this) {
063            if (runner != null && !runner.isInterrupted()) {
064                return;
065            }
066            runner = (Components.useVirtualThreads() ? Thread.ofVirtual()
067                : Thread.ofPlatform()).name(Components.simpleObjectName(this))
068                    .start(this);
069        }
070    }
071
072    /**
073     * Stops the thread that is associated with this dispatcher.
074     * 
075     * @param event the event
076     * @throws InterruptedException if the execution is interrupted
077     */
078    @Handler(priority = -10_000)
079    public void onStop(Stop event) throws InterruptedException {
080        synchronized (this) {
081            if (runner == null) {
082                return;
083            }
084            // It just might happen that the wakeup() occurs between the
085            // check for running and the select() in the thread's run loop,
086            // but we -- obviously -- cannot put the select() in a
087            // synchronized(this).
088            while (runner.isAlive()) {
089                runner.interrupt(); // *Should* be sufficient, but...
090                selector.wakeup(); // Make sure
091                runner.join(10);
092            }
093            runner = null;
094        }
095    }
096
097    /**
098     * Invoked once by the thread associated with the dispatcher. Handles
099     * all events from the underlying {@link Selector}.  
100     */
101    @Override
102    @SuppressWarnings({ "PMD.EmptySynchronizedBlock", "PMD.EmptyCatchBlock",
103        "PMD.AvoidCatchingThrowable", "PMD.EmptyControlStatement" })
104    public void run() {
105        try {
106            registerAsGenerator();
107            while (!Thread.currentThread().isInterrupted()) {
108                try {
109                    selector.select();
110                    Set<SelectionKey> selected = selector.selectedKeys();
111                    for (SelectionKey key : selected) {
112                        ((NioHandler) key.attachment())
113                            .handleOps(key.readyOps());
114                    }
115                    selected.clear();
116                    synchronized (selectorGate) {
117                        // Delay next iteration if another thread has the lock.
118                        // "Find bugs" complains, but this is really okay.
119                    }
120                } catch (InterruptedIOException | InterruptedException
121                        | Error e) {
122                    break;
123                } catch (Throwable e) {
124                    // Ignore anything else, this loop is crucial.
125                }
126            }
127        } finally {
128            unregisterAsGenerator();
129        }
130    }
131
132    /**
133     * Handle the NIO registration.
134     *
135     * @param event the event
136     * @throws IOException Signals that an I/O exception has occurred.
137     */
138    @Handler
139    public void onNioRegistration(NioRegistration event)
140            throws IOException {
141        @SuppressWarnings("PMD.CloseResource")
142        SelectableChannel channel = event.ioChannel();
143        channel.configureBlocking(false);
144        SelectionKey key;
145        synchronized (selectorGate) {
146            selector.wakeup(); // make sure selector isn't blocking
147            key = channel.register(
148                selector, event.ops(), event.handler());
149        }
150        event.setResult(new Registration(key));
151    }
152
153    /**
154     * Represents a NIO registration.
155     */
156    public class Registration extends NioRegistration.Registration {
157
158        private final SelectionKey key;
159
160        /**
161         * Instantiates a new registration.
162         *
163         * @param key the key
164         */
165        public Registration(SelectionKey key) {
166            super();
167            this.key = key;
168        }
169
170        @Override
171        public void updateInterested(int ops) {
172            synchronized (selectorGate) {
173                selector.wakeup(); // make sure selector isn't blocking
174                key.interestOps(ops);
175            }
176        }
177    }
178
179}