001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core; 020 021import java.lang.reflect.Array; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.function.BiConsumer; 035import org.jgrapes.core.events.HandlingError; 036import org.jgrapes.core.internal.EventBase; 037 038/** 039 * This class is the base class for all events. 040 * 041 * By default (i.e. as implemented by this class), the event's kind is 042 * represented by its Java class and the eligibility is based on 043 * the "is a" relationship between classes. An event is eligible if its class 044 * is equal to or a super class of the class used as criterion. 045 * This default behavior can be changed by overriding the 046 * methods from {@link Eligible}. See {@link NamedEvent} as an example. 047 * 048 * @param <T> 049 * the result type of the event. Use {@link Void} if handling the 050 * event does not produce a result 051 */ 052@SuppressWarnings({ "PMD.GodClass", "PMD.TooManyMethods" }) 053public class Event<T> extends EventBase<T> { 054 055 /** The channels that this event is to be fired on if no 056 * channels are specified explicitly when firing. */ 057 private Channel[] channels; 058 /** Indicates that the event should not processed further. */ 059 private boolean stopped; 060 /** The results of handling the event (if any). */ 061 private List<T> results; 062 /** Context data. */ 063 private Map<Object, Object> contextData; 064 private boolean cancelled; 065 066 /** 067 * Creates a new event. Passing channels is equivalent to first 068 * creating the event and then calling {@link #setChannels(Channel...)} 069 * with the given channels. 070 * 071 * @param channels the channels to set 072 */ 073 public Event(Channel... channels) { 074 super(); 075 this.channels = Arrays.copyOf(channels, channels.length); 076 } 077 078 /** 079 * Returns the class of this event as representation of its kind. 080 * 081 * @return the class of this event 082 * 083 * @see org.jgrapes.core.Eligible#defaultCriterion() 084 */ 085 @Override 086 public Object defaultCriterion() { 087 return getClass(); 088 } 089 090 /** 091 * Returns <code>true</code> if the `criterion` 092 * is of the same class or a base class of this event's class. 093 * 094 * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object) 095 */ 096 @Override 097 public boolean isEligibleFor(Object criterion) { 098 return Class.class.isInstance(criterion) 099 && ((Class<?>) criterion).isAssignableFrom(getClass()); 100 } 101 102 /** 103 * Return the event pipeline that currently processes the event 104 * (if any). 105 * 106 * @return the event pipeline if the event is being processed 107 */ 108 @SuppressWarnings({ "PMD.UselessOverridingMethod", 109 "PMD.AvoidDuplicateLiterals" }) 110 @Override 111 public Optional<EventPipeline> processedBy() { 112 return super.processedBy(); 113 } 114 115 /** 116 * Implements the default behavior for handling events thrown 117 * by a handler. Fires a {@link HandlingError handling error} event 118 * for this event and the given throwable. 119 * 120 * @see HandlingError 121 */ 122 @Override 123 protected void handlingError( 124 EventPipeline eventProcessor, Throwable throwable) { 125 if (invokedFor != null) { 126 eventProcessor.fire(new HandlingError(this, throwable), invokedFor); 127 } else { 128 eventProcessor.fire(new HandlingError(this, throwable), channels()); 129 } 130 } 131 132 /** 133 * Sets the channels that the event is fired on if no channels 134 * are specified explicitly when firing the event 135 * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}). 136 * 137 * @param channels the channels to set 138 * @return the object for easy chaining 139 * 140 * @throws IllegalStateException if the method is called after 141 * this event has been fired 142 */ 143 public Event<T> setChannels(Channel... channels) { 144 if (enqueued()) { 145 throw new IllegalStateException( 146 "Channels cannot be changed after fire"); 147 } 148 this.channels = Arrays.copyOf(channels, channels.length); 149 return this; 150 } 151 152 /** 153 * Returns the channels associated with the event. Before an 154 * event has been fired, this returns the channels set with 155 * {@link #setChannels(Channel[])}. After an event has been 156 * fired, this returns the channels that the event has 157 * effectively been fired on 158 * (see {@link Manager#fire(Event, Channel...)}). 159 * 160 * @return the channels (never `null`, but may be empty) 161 */ 162 @Override 163 public Channel[] channels() { 164 return Arrays.copyOf(channels, channels.length); 165 } 166 167 /** 168 * Returns the subset of channels that are assignable to the given type. 169 * 170 * @param <C> the given type's class 171 * @param type the class to look for 172 * @return the filtered channels 173 * @see #channels() 174 */ 175 @SuppressWarnings({ "unchecked", "PMD.ShortVariable", 176 "PMD.AvoidDuplicateLiterals" }) 177 public <C> C[] channels(Class<C> type) { 178 return Arrays.stream(channels) 179 .filter(c -> type.isAssignableFrom(c.getClass())).toArray( 180 size -> (C[]) Array.newInstance(type, size)); 181 } 182 183 /** 184 * Execute the given handler for all channels of the given type. 185 * 186 * @param <E> the type of the event 187 * @param <C> the type of the channel 188 * @param type the channel type 189 * @param handler the handler 190 */ 191 @SuppressWarnings({ "unchecked", "PMD.ShortVariable" }) 192 public <E extends EventBase<?>, C extends Channel> void forChannels( 193 Class<C> type, BiConsumer<E, C> handler) { 194 Arrays.stream(channels) 195 .filter(c -> type.isAssignableFrom(c.getClass())) 196 .forEach(c -> handler.accept((E) this, (C) c)); 197 } 198 199 /** 200 * Returns the events to be thrown when this event has completed 201 * (see {@link #isDone()}). 202 * 203 * @return the completed events 204 */ 205 public Set<Event<?>> completionEvents() { 206 return completionEvents == null ? Collections.emptySet() 207 : Collections.unmodifiableSet(completionEvents); 208 } 209 210 /** 211 * {@inheritDoc} 212 */ 213 @Override 214 public Event<T> addCompletionEvent(Event<?> completionEvent) { 215 if (completionEvents == null) { 216 completionEvents = new HashSet<>(); 217 } 218 completionEvents.add(completionEvent); 219 return this; 220 } 221 222 /* 223 * (non-Javadoc) 224 * 225 * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean) 226 */ 227 @Override 228 public Event<T> setRequiresResult(boolean value) { 229 return (Event<T>) super.setRequiresResult(value); 230 } 231 232 /** 233 * Check if this event has completed. An event is completed 234 * if 235 * * all its handlers have been invoked (or the event has 236 * been stopped or cancelled), 237 * * all events caused by it have completed, 238 * * no {@link CompletionLock}s remain, and 239 * * a result has been set (only required if 240 * {@link #setRequiresResult(boolean)} has been called with `true`). 241 * 242 * @return the completed state 243 */ 244 @Override 245 public boolean isDone() { 246 return completed; 247 } 248 249 /** 250 * Invoked after all handlers for the event have been executed. 251 * May be overridden by derived classes to cause some immediate effect 252 * (instead of e.g. waiting for the completion event). The default 253 * implementation does nothing. This method is invoked by the event 254 * handler thread and must not block. 255 */ 256 protected void handled() { 257 // Default is to do nothing. 258 } 259 260 /** 261 * {@inheritDoc} 262 */ 263 @Override 264 @SuppressWarnings("PMD.UselessOverridingMethod") 265 public void suspendHandling() { 266 super.suspendHandling(); 267 } 268 269 /** 270 * {@inheritDoc} 271 */ 272 @Override 273 @SuppressWarnings("PMD.UselessOverridingMethod") 274 public void suspendHandling(Runnable whenResumed) { 275 super.suspendHandling(whenResumed); 276 } 277 278 /** 279 * {@inheritDoc} 280 */ 281 @Override 282 @SuppressWarnings("PMD.UselessOverridingMethod") 283 public void resumeHandling() { 284 super.resumeHandling(); 285 } 286 287 /** 288 * Can be called during the execution of an event handler to indicate 289 * that the event should not be processed further. All remaining 290 * handlers for this event will be skipped. 291 * 292 * @return the object for easy chaining 293 */ 294 public Event<T> stop() { 295 stopped = true; 296 // Just in case. 297 resumeHandling(); 298 return this; 299 } 300 301 /** 302 * Returns <code>true</code> if {@link #stop} has been called. 303 * 304 * @return the stopped state 305 */ 306 public boolean isStopped() { 307 return stopped; 308 } 309 310 /** 311 * Prevents the invocation of further handlers (like {@link #stop()} 312 * and (in addition) the invocation of any added completed events. 313 * 314 * @param mayInterruptIfRunning ignored 315 * @return `false` if the event has already been completed 316 * @see java.util.concurrent.Future#cancel(boolean) 317 */ 318 @Override 319 public boolean cancel(boolean mayInterruptIfRunning) { 320 if (!completed && !cancelled) { 321 stop(); 322 cancelled = true; 323 return true; 324 } 325 return false; 326 } 327 328 @Override 329 public boolean isCancelled() { 330 return cancelled; 331 } 332 333 /** 334 * Sets the result of handling this event. If this method is invoked 335 * more then once, the various results are collected in a list. This 336 * can happen if the event is handled by several components. 337 * 338 * @param result the result to set 339 * @return the object for easy chaining 340 */ 341 public Event<T> setResult(T result) { 342 synchronized (this) { 343 if (results == null) { 344 // Make sure that we have a valid result before 345 // calling decrementOpen 346 results = new ArrayList<>(); 347 results.add(result); 348 firstResultAssigned(); 349 return this; 350 } 351 results.add(result); 352 return this; 353 } 354 } 355 356 /** 357 * Allows access to the intermediate result before the 358 * completion of the event. 359 * 360 * @return the intermediate results (which may be an empty list) 361 */ 362 protected List<T> currentResults() { 363 return results == null ? Collections.emptyList() 364 : Collections.unmodifiableList(results); 365 } 366 367 /** 368 * Tie the result of this event to the result of the other event. 369 * Changes of either event's results will subsequently be applied 370 * to both events. 371 * <P> 372 * This is useful when an event is replaced by another event during 373 * handling like: 374 * {@code fire((new Event()).tieTo(oldEvent.stop()))} 375 * 376 * @param other the event to tie to 377 * @return the object for easy chaining 378 */ 379 public Event<T> tieTo(Event<T> other) { 380 synchronized (this) { 381 if (other.results == null) { 382 other.results = new ArrayList<>(); 383 } 384 results = other.results; 385 return this; 386 } 387 } 388 389 /** 390 * Waits for the event to be completed (see {@link #isDone()}) 391 * and returns the first (or only) result. 392 * 393 * @see Future#get() 394 */ 395 @Override 396 public T get() throws InterruptedException { 397 while (true) { 398 synchronized (this) { 399 if (completed) { 400 return results == null || results.isEmpty() 401 ? null 402 : results.get(0); 403 } 404 wait(); 405 } 406 } 407 } 408 409 /** 410 * Causes the invoking thread to wait until the processing of the 411 * event has been completed (see {@link #isDone()}) or the given 412 * timeout has expired and returns the first (or only) result. 413 * 414 * @return the result 415 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 416 */ 417 @Override 418 public T get(long timeout, TimeUnit unit) 419 throws InterruptedException, TimeoutException { 420 synchronized (this) { 421 if (completed) { 422 return results == null || results.isEmpty() 423 ? null 424 : results.get(0); 425 } 426 wait(unit.toMillis(timeout)); 427 } 428 if (completed) { 429 return results == null || results.isEmpty() 430 ? null 431 : results.get(0); 432 } 433 throw new TimeoutException(); 434 } 435 436 /** 437 * Waits for the event to be completed (see {@link #isDone()}) 438 * and returns the list of results (which may be empty if the 439 * event's result type is {@link Void}). 440 * 441 * @return the results 442 * @see Future#get() 443 */ 444 public List<T> results() throws InterruptedException { 445 while (true) { 446 synchronized (this) { 447 if (completed) { 448 return results == null ? Collections.emptyList() 449 : Collections.unmodifiableList(results); 450 } 451 wait(); 452 } 453 } 454 } 455 456 /** 457 * Causes the invoking thread to wait until the processing of the 458 * event has been completed (see {@link #isDone()}) or given timeout 459 * has expired and returns the list of results (which may be empty 460 * if the event's result type is {@link Void}). 461 * 462 * @return the results 463 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 464 */ 465 public List<T> results(long timeout, TimeUnit unit) 466 throws InterruptedException, TimeoutException { 467 synchronized (this) { 468 if (completed) { 469 return results == null ? Collections.emptyList() 470 : Collections.unmodifiableList(results); 471 } 472 wait(unit.toMillis(timeout)); 473 } 474 if (completed) { 475 return results == null ? Collections.emptyList() 476 : Collections.unmodifiableList(results); 477 } 478 throw new TimeoutException(); 479 } 480 481 @Override 482 @SuppressWarnings({ "PMD.ShortVariable", "unchecked" }) 483 public <A extends Associator> A setAssociated(Object by, Object with) { 484 if (contextData == null) { 485 contextData = new ConcurrentHashMap<>(); 486 } 487 if (with == null) { 488 contextData.remove(by); 489 } else { 490 contextData.put(by, with); 491 } 492 return (A) this; 493 } 494 495 @Override 496 @SuppressWarnings("PMD.ShortVariable") 497 public <V> Optional<V> associated(Object by, Class<V> type) { 498 if (contextData == null) { 499 return Optional.empty(); 500 } 501 return Optional.ofNullable(contextData.get(by)) 502 .filter(found -> type.isAssignableFrom(found.getClass())) 503 .map(type::cast); 504 } 505 506 /* 507 * (non-Javadoc) 508 * 509 * @see java.lang.Object#toString() 510 */ 511 @Override 512 public String toString() { 513 StringBuilder builder = new StringBuilder(); 514 builder.append(Components.objectName(this)) 515 .append(" ["); 516 if (channels != null) { 517 builder.append("channels="); 518 builder.append(Channel.toString(channels)); 519 } 520 builder.append(']'); 521 return builder.toString(); 522 } 523 524}