001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Queue; 024import java.util.Set; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import java.util.concurrent.ExecutorService; 028import org.jgrapes.core.Channel; 029import org.jgrapes.core.Components; 030import org.jgrapes.core.Event; 031import org.jgrapes.core.EventPipeline; 032 033/** 034 * This class provides the default implementation of an {@link EventPipeline}. 035 */ 036@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 037public class EventProcessor implements InternalEventPipeline, Runnable { 038 039 @SuppressWarnings("PMD.FieldNamingConventions") 040 protected static final ThreadLocal<EventBase<?>> newEventsParent 041 = new ThreadLocal<>(); 042 043 private final ExecutorService executorService; 044 private final ComponentTree componentTree; 045 private final EventPipeline asEventPipeline; 046 // Must not use synchronized in toString, leads to unexpected deadlock 047 protected final Queue<EventChannelsTuple> queue 048 = new ConcurrentLinkedQueue<>(); 049 private Iterator<HandlerReference> invoking; 050 // Used by this thread only. 051 private final Set<EventBase<?>> suspended = new HashSet<>(); 052 // Only this thread can remove, but others might add. 053 private final Queue<EventBase<?>> toBeResumed 054 = new ConcurrentLinkedDeque<>(); 055 private boolean isExecuting; 056 private final ThreadLocal<Thread> executor = new ThreadLocal<>(); 057 058 /** 059 * Instantiates a new event processor. 060 * 061 * @param tree the tree 062 */ 063 /* default */ EventProcessor(ComponentTree tree) { 064 this(tree, Components.defaultExecutorService()); 065 } 066 067 /* default */ EventProcessor(ComponentTree tree, 068 ExecutorService executorService) { 069 this.componentTree = tree; 070 this.executorService = executorService; 071 asEventPipeline = new CheckingPipelineFilter(tree, this); 072 } 073 074 /** 075 * Gets the component tree. 076 * 077 * @return the component tree 078 */ 079 protected ComponentTree tree() { 080 return componentTree; 081 } 082 083 /* default */ EventPipeline asEventPipeline() { 084 return asEventPipeline; 085 } 086 087 /** 088 * Called before adding completion events. The parent of 089 * a completion event is not the event that has completed but 090 * the event that generated the original event. 091 * 092 * @param parent the new parent 093 */ 094 /* default */ void updateNewEventsParent(EventBase<?> parent) { 095 newEventsParent.set(parent); 096 } 097 098 @Override 099 public <T extends Event<?>> T add(T event, Channel... channels) { 100 ((EventBase<?>) event).generatedBy(newEventsParent.get()); 101 ((EventBase<?>) event).processedBy(this); 102 synchronized (this) { 103 EventChannelsTuple.addTo(queue, event, channels); 104 if (!isExecuting) { 105 // Queue was initially empty, this starts it 106 GeneratorRegistry.instance().add(this); 107 isExecuting = true; 108 executorService.execute(this); 109 } 110 } 111 return event; 112 } 113 114 @SuppressWarnings("PMD.ConfusingTernary") 115 /* default */ void add(Queue<EventChannelsTuple> source) { 116 synchronized (this) { 117 while (true) { 118 EventChannelsTuple entry = source.poll(); 119 if (entry == null) { 120 break; 121 } 122 entry.event.processedBy(this); 123 queue.add(entry); 124 } 125 if (!isExecuting) { 126 GeneratorRegistry.instance().add(this); 127 isExecuting = true; 128 executorService.execute(this); 129 } 130 } 131 } 132 133 @Override 134 public void merge(InternalEventPipeline other) { 135 if (!(other instanceof BufferingEventPipeline)) { 136 throw new IllegalArgumentException( 137 "Can only merge events from an BufferingEventPipeline."); 138 } 139 add(((BufferingEventPipeline) other).retrieveEvents()); 140 } 141 142 @Override 143 @SuppressWarnings({ "PMD.AvoidDeeplyNestedIfStmts", 144 "PMD.CognitiveComplexity" }) 145 public void run() { 146 String origName = Thread.currentThread().getName(); 147 try { 148 Thread.currentThread().setName( 149 origName + " (P" + Components.objectId(this) + ")"); 150 executor.set(Thread.currentThread()); 151 componentTree.setDispatchingPipeline(this); 152 while (true) { 153 // No lock needed, only this thread can remove from resumed 154 var resumedEvent = toBeResumed.poll(); 155 if (resumedEvent != null) { 156 if (suspended.remove(resumedEvent)) { 157 resumedEvent.invokeWhenResumed(); 158 invokeHandlers(resumedEvent.clearSuspendedHandlers(), 159 resumedEvent); 160 } 161 continue; 162 } 163 164 EventChannelsTuple next; 165 synchronized (this) { 166 next = queue.poll(); 167 if (next == null) { 168 // Everything is done, though suspended handlers 169 // may cause this processor to be reactivated. 170 GeneratorRegistry.instance().remove(this); 171 isExecuting = false; 172 synchronized (executor) { 173 executor.notifyAll(); 174 } 175 break; 176 } 177 } 178 HandlerList handlers 179 = componentTree.getEventHandlers(next.event, next.channels); 180 invokeHandlers(handlers.iterator(), next.event); 181 } 182 } finally { 183 // This processor should now only be (strongly) referenced 184 // from suspended events (if any exist), the 185 // CheckingPipelineFilter (which is only referenced from this) 186 // and some component tree, if this is the tree's default 187 // processor. 188 newEventsParent.set(null); 189 componentTree.setDispatchingPipeline(null); 190 executor.set(null); 191 Thread.currentThread().setName(origName); 192 } 193 } 194 195 /** 196 * Invoke all (remaining) handlers with the given event as parameter. 197 * 198 * @param handlers the handlers 199 * @param event the event 200 */ 201 private void invokeHandlers(Iterator<HandlerReference> handlers, 202 EventBase<?> event) { 203 try { 204 invoking = handlers; 205 newEventsParent.set(event); 206 // invoking may be set to null by suspendHandling() 207 while (invoking != null && invoking.hasNext()) { 208 HandlerReference hdlr = invoking.next(); 209 try { 210 if (event.isStopped()) { 211 break; 212 } 213 hdlr.invoke(event); 214 } catch (AssertionError t) { 215 // JUnit support 216 CoreUtils.setAssertionError(t); 217 event.handlingError(asEventPipeline, t); 218 event.invokedFor = null; 219 } catch (Error e) { // NOPMD 220 // Wouldn't have caught it, if it was possible. 221 throw e; 222 } catch (Throwable t) { // NOPMD 223 // Errors have been rethrown, so this should work. 224 event.handlingError(asEventPipeline, t); 225 event.invokedFor = null; 226 } 227 } 228 } catch (AssertionError t) { 229 // JUnit support 230 CoreUtils.setAssertionError(t); 231 event.handlingError(asEventPipeline, t); 232 } catch (Error e) { // NOPMD 233 // Wouldn't have caught it, if it was possible. 234 throw e; 235 } catch (Throwable t) { // NOPMD 236 // Errors have been rethrown, so this should work. 237 event.handlingError(asEventPipeline, t); 238 } finally { // NOPMD 239 if (invoking != null) { 240 event.handled(); 241 invoking = null; 242 newEventsParent.get().decrementOpen(); 243 } 244 } 245 } 246 247 @SuppressWarnings("PMD.CompareObjectsWithEquals") 248 /* default */ void suspendHandling(EventBase<?> event) { 249 if (Thread.currentThread() != executor.get()) { 250 throw new IllegalStateException("May only be called from handler."); 251 } 252 if (!invoking.hasNext()) { 253 // Last anyway, nothing to be done 254 return; 255 } 256 event.setSuspendedHandlers(invoking); 257 invoking = null; 258 suspended.add(event); 259 // Just in case (might happen) 260 toBeResumed.remove(event); 261 } 262 263 /* default */ void resumeHandling(EventBase<?> event) { 264 toBeResumed.add(event); 265 synchronized (this) { 266 if (!isExecuting) { 267 // There were no more events, restart 268 GeneratorRegistry.instance().add(this); 269 isExecuting = true; 270 executorService.execute(this); 271 } 272 } 273 } 274 275 /* 276 * (non-Javadoc) 277 * 278 * @see org.jgrapes.core.internal.InternalEventPipeline#executorService() 279 */ 280 @Override 281 public ExecutorService executorService() { 282 return executorService; 283 } 284 285 @Override 286 public void awaitExhaustion() throws InterruptedException { 287 synchronized (executor) { 288 while (isExecuting) { 289 executor.wait(); 290 } 291 } 292 } 293 294 /* 295 * (non-Javadoc) 296 * 297 * @see java.lang.Object#toString() 298 */ 299 @Override 300 public String toString() { 301 StringBuilder builder = new StringBuilder(); 302 builder.append(Components.objectName(this)) 303 .append(" ["); 304 if (queue != null) { 305 builder.append("queue=").append(queue); 306 } 307 builder.append(']'); 308 return builder.toString(); 309 } 310 311}