001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.Future; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.function.Consumer; 030import org.jgrapes.core.Associator; 031import org.jgrapes.core.Channel; 032import org.jgrapes.core.CompletionEvent; 033import org.jgrapes.core.CompletionLock; 034import org.jgrapes.core.Eligible; 035import org.jgrapes.core.Event; 036import org.jgrapes.core.EventPipeline; 037 038/** 039 * Provides the implementations of methods to class {@link Event} that 040 * need access to classes or methods that are visible in the implementation 041 * package only. The class is not intended to be used as base class 042 * for any other class. 043 * 044 * @param <T> the result type of the event. Use {@link Void} if handling 045 * the event does not produce a result 046 */ 047@SuppressWarnings("PMD.TooManyMethods") 048public abstract class EventBase<T> 049 implements Eligible, Future<T>, Associator { 050 051 /** The event that caused this event. */ 052 private EventBase<?> generatedBy; 053 /** Number of events that have to be processed until completion. 054 * This is one for the event itself and one more for each event 055 * that has this event as its cause. */ 056 private final AtomicInteger openCount = new AtomicInteger(1); 057 /** Completion locks. */ 058 private Set<CompletionLockBase> completionLocks; 059 /** Set when the event is enqueued, reset when it has been completed. */ 060 private EventProcessor processedBy; 061 /** The events to be fired upon completion. Using this attribute 062 * provides a slightly faster access than invoking 063 * {@link Event#completionEvents()}, which wraps the result in 064 * an unmodifiable set. */ 065 protected Set<Event<?>> completionEvents; 066 /** Temporarily set when invoking a handler, only to be used by 067 * {@link #handlingError(EventPipeline, Throwable)}. */ 068 protected Channel invokedFor; 069 /** Set when the event has been completed. */ 070 protected boolean completed; 071 private boolean requiresResult; 072 /** Event is tracked by {@link VerboseHandlerReference}. */ 073 private boolean tracked = true; 074 /** Event handler to be invoked after resumeHandling. */ 075 private Iterator<HandlerReference> suspendedHandlers; 076 private Runnable whenResumed; 077 078 /** 079 * See {@link Event#channels()}. 080 * 081 * @return the channel[] 082 */ 083 public abstract Channel[] channels(); 084 085 /** 086 * See {@link Event#handled()}. 087 */ 088 protected abstract void handled(); 089 090 /** 091 * See {@link Event#isStopped()}. 092 */ 093 public abstract boolean isStopped(); 094 095 /** 096 * See {@link Event#currentResults()}. 097 */ 098 protected abstract List<T> currentResults(); 099 100 /** 101 * See {@link Event#setRequiresResult(boolean)}. 102 */ 103 protected EventBase<T> setRequiresResult(boolean value) { 104 if (requiresResult == value) { 105 return this; 106 } 107 if (value) { 108 openCount.incrementAndGet(); 109 requiresResult = true; 110 } else { 111 requiresResult = false; 112 decrementOpen(); 113 } 114 return this; 115 } 116 117 /** 118 * See {@link Event#firstResultAssigned()}. 119 */ 120 protected void firstResultAssigned() { 121 if (requiresResult) { 122 decrementOpen(); 123 } 124 } 125 126 /** 127 * Returns <code>true</code> if the event has been enqueued in a pipeline. 128 * 129 * @return the result 130 */ 131 protected boolean enqueued() { 132 return processedBy != null || completed || isCancelled(); 133 } 134 135 /** 136 * Invoked when an exception occurs while invoking a handler for an event. 137 * 138 * @param eventProcessor the manager that has invoked the handler 139 * @param throwable the exception that has been thrown by the handler 140 */ 141 protected abstract void handlingError( 142 EventPipeline eventProcessor, Throwable throwable); 143 144 /** 145 * If an event is fired while processing another event, note 146 * the event being processed. This allows us to track the cause 147 * of events to the "initial" (externally) generated event that 148 * triggered everything. 149 * 150 * @param causingEvent the causing event to set 151 */ 152 /* default */ void generatedBy(EventBase<?> causingEvent) { 153 generatedBy = causingEvent; 154 if (causingEvent != null) { 155 causingEvent.openCount.incrementAndGet(); 156 } 157 } 158 159 /** 160 * Set the processor that will (eventually) process the event. 161 * 162 * @param processor the processor 163 */ 164 /* default */ void processedBy(EventProcessor processor) { 165 this.processedBy = processor; 166 } 167 168 public Optional<EventPipeline> processedBy() { 169 return Optional.ofNullable(processedBy).map( 170 procBy -> procBy.asEventPipeline()); 171 } 172 173 /** 174 * Suspend the invocation of the remaining handlers for this event. 175 * May only be called in a handler for the event. Must be balanced 176 * by an invocation of {@link #resumeHandling()}. 177 */ 178 public void suspendHandling() { 179 suspendHandling(null); 180 } 181 182 /** 183 * Suspend the invocation of the remaining handlers for this event. 184 * May only be called in a handler for the event. Must be balanced 185 * by an invocation of {@link #resumeHandling()}. 186 * 187 * @param whenResumed some function to be executed when handling is resumed 188 */ 189 public void suspendHandling(Runnable whenResumed) { 190 if (processedBy == null) { 191 throw new IllegalStateException("May only be called from handler."); 192 } 193 this.whenResumed = whenResumed; 194 processedBy.suspendHandling(this); 195 } 196 197 /* default */ void invokeWhenResumed() { 198 if (whenResumed != null) { 199 whenResumed.run(); 200 whenResumed = null; 201 } 202 } 203 204 /** 205 * Resume the invocation of handlers for this event. 206 * 207 * @see #suspendHandling() 208 */ 209 public void resumeHandling() { 210 if (processedBy == null) { 211 throw new IllegalStateException("Lost processor."); 212 } 213 processedBy.resumeHandling(this); 214 } 215 216 /* default */ Iterator<HandlerReference> clearSuspendedHandlers() { 217 var result = suspendedHandlers; 218 suspendedHandlers = null; 219 return result; 220 } 221 222 /* default */ void setSuspendedHandlers( 223 Iterator<HandlerReference> suspendedHandlers) { 224 this.suspendedHandlers = suspendedHandlers; 225 } 226 227 /** 228 * @param pipeline 229 */ 230 @SuppressWarnings("PMD.CognitiveComplexity") 231 /* default */ void decrementOpen() { 232 if (openCount.decrementAndGet() == 0 && !completed) { 233 synchronized (this) { 234 completed = true; 235 notifyAll(); 236 } 237 if (completionEvents != null && !isCancelled()) { 238 processedBy.updateNewEventsParent(generatedBy); 239 for (Event<?> e : completionEvents) { 240 Channel[] completeChannels = e.channels(); 241 if (completeChannels.length == 0) { 242 // Note that channels() cannot be empty, as it is set 243 // when firing the event and an event is never fired 244 // on no channels. 245 completeChannels = channels(); 246 e.setChannels(completeChannels); 247 } 248 processedBy.add(e, completeChannels); 249 } 250 } 251 if (generatedBy != null) { 252 generatedBy.decrementOpen(); 253 } 254 processedBy = null; // No longer needed 255 } 256 } 257 258 /** 259 * Adds the given completion lock. 260 * 261 * @param lock the lock 262 * @see CompletionLock 263 */ 264 /* default */ Event<T> addCompletionLock(CompletionLockBase lock) { 265 synchronized (this) { 266 if (completionLocks == null) { 267 completionLocks = Collections.synchronizedSet(new HashSet<>()); 268 } 269 } 270 if (completionLocks.add(lock)) { 271 openCount.incrementAndGet(); 272 lock.startTimer(); 273 } 274 return (Event<T>) this; 275 } 276 277 /** 278 * Removes the given completion lock. 279 * 280 * @param lock the lock 281 * @see CompletionLock 282 */ 283 /* default */ void removeCompletionLock(CompletionLockBase lock) { 284 if (completionLocks == null) { 285 return; 286 } 287 if (completionLocks.remove(lock)) { 288 decrementOpen(); 289 } 290 lock.cancelTimer(); 291 } 292 293 /** 294 * Disables tracking for this event and all events generated 295 * when handling it. 296 */ 297 public Event<T> disableTracking() { 298 tracked = false; 299 return (Event<T>) this; 300 } 301 302 /** 303 * Whether the event (and all events generated when handling it) 304 * is tracked. 305 * 306 * @return `true` if event is tracked 307 */ 308 public boolean isTracked() { 309 return tracked; 310 } 311 312 @SuppressWarnings("PMD.UselessParentheses") 313 /* default */ boolean isTrackable() { 314 return generatedBy == null ? tracked 315 : (tracked && generatedBy.isTrackable()); 316 } 317 318 /** 319 * Adds the given event to the events to be thrown when this event 320 * has completed (see {@link #isDone()}). Such an event is called 321 * a "completion event". 322 * 323 * Completion events are considered to be caused by the event that 324 * caused the completed event. If an event *e1* caused an event 325 * *e2* which has a completion event *e2c*, *e1* is only put in 326 * state completed when *e2c* has been handled. 327 * 328 * Completion events are handled by the same {@link EventProcessor} 329 * as the event that has been completed. 330 * 331 * @param completionEvent the completion event to add 332 * @return the object for easy chaining 333 * @see #onCompletion(Event, Consumer) 334 */ 335 public abstract Event<T> addCompletionEvent(Event<?> completionEvent); 336 337 /** 338 * Invokes the consumer when the event is completed. This is 339 * a shortcut for registering a {@link CompletionEvent} and 340 * providing a handler for the completion event that invokes 341 * the consumer. 342 * 343 * The static form is required because otherwise the compiler cannot 344 * infer the type of the consumer's argument. 345 * 346 * @param <T> the result type of the event 347 * @param <E> the type of the event 348 * @param event the event 349 * @param consumer the consumer 350 * @return the event 351 */ 352 public static <T, E extends Event<T>> E onCompletion(E event, 353 Consumer<E> consumer) { 354 event.addCompletionEvent(new ActionEvent<Event<T>>( 355 event.getClass().getSimpleName() + "CompletionAction") { 356 @Override 357 public void execute() throws Exception { 358 consumer.accept(event); 359 } 360 }); 361 return event; 362 } 363}