001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2026 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Queue; 024import java.util.Set; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import java.util.concurrent.ExecutorService; 028import org.jgrapes.core.Channel; 029import org.jgrapes.core.Components; 030import org.jgrapes.core.Event; 031import org.jgrapes.core.EventPipeline; 032 033/** 034 * This class provides the default implementation of an {@link EventPipeline}. 035 */ 036@SuppressWarnings("PMD.AvoidSynchronizedStatement") 037public class EventProcessor implements InternalEventPipeline, Runnable { 038 039 @SuppressWarnings("PMD.FieldNamingConventions") 040 protected static final ThreadLocal<EventBase<?>> newEventsParent 041 = new ThreadLocal<>(); 042 043 private final ExecutorService executorService; 044 private final ComponentTree componentTree; 045 private final EventPipeline asEventPipeline; 046 // Must not use synchronized in toString, leads to unexpected deadlock 047 protected final Queue<EventChannelsTuple> queue 048 = new ConcurrentLinkedQueue<>(); 049 private Iterator<HandlerReference> invoking; 050 // Used by this thread only. 051 private final Set<EventBase<?>> suspended = new HashSet<>(); 052 // Only this thread can remove, but others might add. 053 private final Queue<EventBase<?>> toBeResumed 054 = new ConcurrentLinkedDeque<>(); 055 private boolean isExecuting; 056 private final ThreadLocal<Thread> executor = new ThreadLocal<>(); 057 058 /** 059 * Instantiates a new event processor. 060 * 061 * @param tree the tree 062 */ 063 /* default */ EventProcessor(ComponentTree tree) { 064 this(tree, Components.defaultExecutorService()); 065 } 066 067 /* default */ EventProcessor(ComponentTree tree, 068 ExecutorService executorService) { 069 this.componentTree = tree; 070 this.executorService = executorService; 071 asEventPipeline = new CheckingPipelineFilter(tree, this); 072 } 073 074 /** 075 * Gets the component tree. 076 * 077 * @return the component tree 078 */ 079 protected ComponentTree tree() { 080 return componentTree; 081 } 082 083 /* default */ EventPipeline asEventPipeline() { 084 return asEventPipeline; 085 } 086 087 /** 088 * Called before adding completion events. The parent of 089 * a completion event is not the event that has completed but 090 * the event that generated the original event. 091 * 092 * @param parent the new parent 093 */ 094 /* default */ void updateNewEventsParent(EventBase<?> parent) { 095 newEventsParent.set(parent); 096 } 097 098 @Override 099 public <T extends Event<?>> T add(T event, Channel... channels) { 100 ((EventBase<?>) event).generatedBy(newEventsParent.get()); // NOPMD 101 ((EventBase<?>) event).processedBy(this); // NOPMD (cast) 102 synchronized (this) { 103 EventChannelsTuple.addTo(queue, event, channels); 104 if (!isExecuting) { 105 // Queue was initially empty, this starts it 106 GeneratorRegistry.instance().add(this); 107 isExecuting = true; 108 executorService.execute(this); 109 } 110 } 111 return event; 112 } 113 114 /* default */ void add(Queue<EventChannelsTuple> source) { 115 synchronized (this) { 116 while (true) { 117 EventChannelsTuple entry = source.poll(); 118 if (entry == null) { 119 break; 120 } 121 entry.event.processedBy(this); 122 queue.add(entry); 123 } 124 if (!isExecuting) { 125 GeneratorRegistry.instance().add(this); 126 isExecuting = true; 127 executorService.execute(this); 128 } 129 } 130 } 131 132 @Override 133 public void merge(InternalEventPipeline other) { 134 if (!(other instanceof BufferingEventPipeline)) { 135 throw new IllegalArgumentException( 136 "Can only merge events from an BufferingEventPipeline."); 137 } 138 add(((BufferingEventPipeline) other).retrieveEvents()); 139 } 140 141 @Override 142 public void run() { 143 String origName = Thread.currentThread().getName(); 144 try { 145 Thread.currentThread().setName( 146 origName + " (P" + Components.objectId(this) + ")"); 147 executor.set(Thread.currentThread()); 148 componentTree.setDispatchingPipeline(this); 149 while (true) { 150 // No lock needed, only this thread can remove from resumed 151 var resumedEvent = toBeResumed.poll(); 152 if (resumedEvent != null) { 153 if (suspended.remove(resumedEvent)) { 154 resumedEvent.invokeWhenResumed(); 155 invokeHandlers(resumedEvent.clearSuspendedHandlers(), 156 resumedEvent); 157 } 158 continue; 159 } 160 161 EventChannelsTuple next; 162 synchronized (this) { 163 next = queue.poll(); 164 if (next == null) { 165 // Everything is done, though suspended handlers 166 // may cause this processor to be reactivated. 167 GeneratorRegistry.instance().remove(this); 168 isExecuting = false; 169 synchronized (executor) { 170 executor.notifyAll(); 171 } 172 break; 173 } 174 } 175 @SuppressWarnings("PMD.LooseCoupling") 176 HandlerList handlers 177 = componentTree.getEventHandlers(next.event, next.channels); 178 invokeHandlers(handlers.iterator(), next.event); 179 } 180 } finally { 181 // This processor should now only be (strongly) referenced 182 // from suspended events (if any exist), the 183 // CheckingPipelineFilter (which is only referenced from this) 184 // and some component tree, if this is the tree's default 185 // processor. 186 newEventsParent.set(null); 187 componentTree.setDispatchingPipeline(null); 188 executor.set(null); 189 Thread.currentThread().setName(origName); 190 } 191 } 192 193 /** 194 * Invoke all (remaining) handlers with the given event as parameter. 195 * 196 * @param handlers the handlers 197 * @param event the event 198 */ 199 private void invokeHandlers(Iterator<HandlerReference> handlers, 200 EventBase<?> event) { 201 try { 202 invoking = handlers; 203 newEventsParent.set(event); 204 // invoking may be set to null by suspendHandling() 205 while (invoking != null && invoking.hasNext()) { 206 HandlerReference hdlr = invoking.next(); 207 try { 208 if (event.isStopped()) { 209 break; 210 } 211 hdlr.invoke(event); 212 } catch (AssertionError t) { 213 // JUnit support 214 CoreUtils.setAssertionError(t); 215 event.handlingError(asEventPipeline, t); 216 event.invokedFor = null; 217 } catch (Error e) { // NOPMD 218 // Wouldn't have caught it, if it was possible. 219 throw e; 220 } catch (Throwable t) { // NOPMD 221 // Errors have been rethrown, so this should work. 222 event.handlingError(asEventPipeline, t); 223 event.invokedFor = null; 224 } 225 } 226 } catch (AssertionError t) { 227 // JUnit support 228 CoreUtils.setAssertionError(t); 229 event.handlingError(asEventPipeline, t); 230 } catch (Error e) { // NOPMD 231 // Wouldn't have caught it, if it was possible. 232 throw e; 233 } catch (Throwable t) { // NOPMD 234 // Errors have been rethrown, so this should work. 235 event.handlingError(asEventPipeline, t); 236 } finally { 237 if (invoking != null) { 238 event.handled(); 239 invoking = null; 240 newEventsParent.get().decrementOpen(); 241 } 242 } 243 } 244 245 @SuppressWarnings("PMD.CompareObjectsWithEquals") 246 /* default */ void suspendHandling(EventBase<?> event) { 247 if (Thread.currentThread() != executor.get()) { 248 throw new IllegalStateException("May only be called from handler."); 249 } 250 if (!invoking.hasNext()) { 251 // Last anyway, nothing to be done 252 return; 253 } 254 event.setSuspendedHandlers(invoking); 255 invoking = null; 256 suspended.add(event); 257 // Just in case (might happen) 258 toBeResumed.remove(event); 259 } 260 261 /* default */ void resumeHandling(EventBase<?> event) { 262 toBeResumed.add(event); 263 synchronized (this) { 264 if (!isExecuting) { 265 // There were no more events, restart 266 GeneratorRegistry.instance().add(this); 267 isExecuting = true; 268 executorService.execute(this); 269 } 270 } 271 } 272 273 /* 274 * (non-Javadoc) 275 * 276 * @see org.jgrapes.core.internal.InternalEventPipeline#executorService() 277 */ 278 @Override 279 public ExecutorService executorService() { 280 return executorService; 281 } 282 283 @Override 284 public void awaitExhaustion() throws InterruptedException { 285 synchronized (executor) { 286 while (isExecuting) { 287 executor.wait(); 288 } 289 } 290 } 291 292 /* 293 * (non-Javadoc) 294 * 295 * @see java.lang.Object#toString() 296 */ 297 @Override 298 public String toString() { 299 StringBuilder builder = new StringBuilder(); 300 builder.append(Components.objectName(this)) 301 .append(" ["); 302 if (queue != null) { 303 builder.append("queue=").append(queue); 304 } 305 builder.append(']'); 306 return builder.toString(); 307 } 308 309}