001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2026 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core;
020
021import java.lang.reflect.Array;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.function.BiConsumer;
035import org.jgrapes.core.events.HandlingError;
036import org.jgrapes.core.internal.EventBase;
037
038/**
039 * This class is the base class for all events.
040 *  
041 * By default (i.e. as implemented by this class), the event's kind is
042 * represented by its Java class and the eligibility is based on
043 * the "is a" relationship between classes. An event is eligible if its class
044 * is equal to or a super class of the class used as criterion. 
045 * This default behavior can be changed by overriding the
046 * methods from {@link Eligible}. See {@link NamedEvent} as an example.
047 * 
048 * @param <T>
049 *            the result type of the event. Use {@link Void} if handling the
050 *            event does not produce a result
051 */
052@SuppressWarnings({ "PMD.GodClass", "PMD.TooManyMethods",
053    "PMD.AvoidSynchronizedStatement" })
054public class Event<T> extends EventBase<T> {
055
056    /** The channels that this event is to be fired on if no
057     * channels are specified explicitly when firing. */
058    private Channel[] channels;
059    /** Indicates that the event should not processed further. */
060    private boolean stopped;
061    /** The results of handling the event (if any). */
062    private List<T> results;
063    /** Context data. */
064    private Map<Object, Object> contextData;
065    private boolean cancelled;
066
067    /**
068     * Creates a new event. Passing channels is equivalent to first
069     * creating the event and then calling {@link #setChannels(Channel...)}
070     * with the given channels.
071     * 
072     * @param channels the channels to set
073     */
074    public Event(Channel... channels) {
075        super();
076        this.channels = Arrays.copyOf(channels, channels.length);
077    }
078
079    /**
080     * Returns the class of this event as representation of its kind.
081     * 
082     * @return the class of this event
083     * 
084     * @see org.jgrapes.core.Eligible#defaultCriterion()
085     */
086    @Override
087    public Object defaultCriterion() {
088        return getClass();
089    }
090
091    /**
092     * Returns <code>true</code> if the `criterion` 
093     * is of the same class or a base class of this event's class.
094     * 
095     * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object)
096     */
097    @Override
098    public boolean isEligibleFor(Object criterion) {
099        return Class.class.isInstance(criterion)
100            && ((Class<?>) criterion).isAssignableFrom(getClass());
101    }
102
103    /**
104     * Return the event pipeline that currently processes the event
105     * (if any).
106     * 
107     * @return the event pipeline if the event is being processed
108     */
109    @SuppressWarnings({ "PMD.UselessOverridingMethod",
110        "PMD.AvoidDuplicateLiterals" })
111    @Override
112    public Optional<EventPipeline> processedBy() {
113        return super.processedBy();
114    }
115
116    /**
117     * Implements the default behavior for handling events thrown
118     * by a handler. Fires a {@link HandlingError handling error} event
119     * for this event and the given throwable.
120     * 
121     * @see HandlingError
122     */
123    @Override
124    protected void handlingError(
125            EventPipeline eventProcessor, Throwable throwable) {
126        if (invokedFor != null) {
127            eventProcessor.fire(new HandlingError(this, throwable), invokedFor);
128        } else {
129            eventProcessor.fire(new HandlingError(this, throwable), channels());
130        }
131    }
132
133    /**
134     * Sets the channels that the event is fired on if no channels
135     * are specified explicitly when firing the event
136     * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}).
137     * 
138     * @param channels the channels to set
139     * @return the object for easy chaining
140     * 
141     * @throws IllegalStateException if the method is called after
142     * this event has been fired
143     */
144    public Event<T> setChannels(Channel... channels) {
145        if (enqueued()) {
146            throw new IllegalStateException(
147                "Channels cannot be changed after fire");
148        }
149        this.channels = Arrays.copyOf(channels, channels.length);
150        return this;
151    }
152
153    /**
154     * Returns the channels associated with the event. Before an
155     * event has been fired, this returns the channels set with
156     * {@link #setChannels(Channel[])}. After an event has been
157     * fired, this returns the channels that the event has
158     * effectively been fired on 
159     * (see {@link Manager#fire(Event, Channel...)}).
160     * 
161     * @return the channels (never `null`, but may be empty)
162     */
163    @Override
164    public Channel[] channels() {
165        return Arrays.copyOf(channels, channels.length);
166    }
167
168    /**
169     * Returns the subset of channels that are assignable to the given type.
170     * 
171     * @param <C> the given type's class
172     * @param type the class to look for
173     * @return the filtered channels
174     * @see #channels()
175     */
176    @SuppressWarnings({ "unchecked" })
177    public <C> C[] channels(Class<C> type) {
178        return Arrays.stream(channels)
179            .filter(c -> type.isAssignableFrom(c.getClass())).toArray(
180                size -> (C[]) Array.newInstance(type, size));
181    }
182
183    /**
184     * Execute the given handler for all channels of the given type.
185     * 
186     * @param <E> the type of the event
187     * @param <C> the type of the channel
188     * @param type the channel type
189     * @param handler the handler
190     */
191    @SuppressWarnings({ "unchecked" })
192    public <E extends EventBase<?>, C extends Channel> void forChannels(
193            Class<C> type, BiConsumer<E, C> handler) {
194        Arrays.stream(channels)
195            .filter(c -> type.isAssignableFrom(c.getClass()))
196            .forEach(c -> handler.accept((E) this, (C) c));
197    }
198
199    /**
200     * Returns the events to be thrown when this event has completed
201     * (see {@link #isDone()}).
202     * 
203     * @return the completed events
204     */
205    public Set<Event<?>> completionEvents() {
206        return completionEvents == null ? Collections.emptySet()
207            : Collections.unmodifiableSet(completionEvents);
208    }
209
210    /**
211     * {@inheritDoc}
212     */
213    @Override
214    public Event<T> addCompletionEvent(Event<?> completionEvent) {
215        if (completionEvents == null) {
216            completionEvents = new HashSet<>();
217        }
218        completionEvents.add(completionEvent);
219        return this;
220    }
221
222    /*
223     * (non-Javadoc)
224     * 
225     * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean)
226     */
227    @Override
228    public Event<T> setRequiresResult(boolean value) {
229        return (Event<T>) super.setRequiresResult(value);
230    }
231
232    /**
233     * Check if this event has completed. An event is completed
234     * if 
235     *  * all its handlers have been invoked (or the event has
236     *    been stopped or cancelled), 
237     *  * all events caused by it have completed,
238     *  * no {@link CompletionLock}s remain, and  
239     *  * a result has been set (only required if 
240     *    {@link #setRequiresResult(boolean)} has been called with `true`).
241     * 
242     * @return the completed state
243     */
244    @Override
245    public boolean isDone() {
246        return completed;
247    }
248
249    /**
250     * Invoked after all handlers for the event have been executed. 
251     * May be overridden by derived classes to cause some immediate effect
252     * (instead of e.g. waiting for the completion event). The default 
253     * implementation does nothing. This method is invoked by the event 
254     * handler thread and must not block.
255     */
256    @Override
257    protected void handled() {
258        // Default is to do nothing.
259    }
260
261    /**
262     * {@inheritDoc}
263     */
264    @Override
265    @SuppressWarnings("PMD.UselessOverridingMethod")
266    public void suspendHandling() {
267        super.suspendHandling();
268    }
269
270    /**
271     * {@inheritDoc}
272     */
273    @Override
274    @SuppressWarnings("PMD.UselessOverridingMethod")
275    public void suspendHandling(Runnable whenResumed) {
276        super.suspendHandling(whenResumed);
277    }
278
279    /**
280     * {@inheritDoc}
281     */
282    @Override
283    @SuppressWarnings("PMD.UselessOverridingMethod")
284    public void resumeHandling() {
285        super.resumeHandling();
286    }
287
288    /**
289     * Can be called during the execution of an event handler to indicate
290     * that the event should not be processed further. All remaining 
291     * handlers for this event will be skipped.
292     * 
293     * @return the object for easy chaining
294     */
295    public Event<T> stop() {
296        stopped = true;
297        // Just in case.
298        resumeHandling();
299        return this;
300    }
301
302    /**
303     * Returns <code>true</code> if {@link #stop} has been called.
304     * 
305     * @return the stopped state
306     */
307    @Override
308    public boolean isStopped() {
309        return stopped;
310    }
311
312    /**
313     * Prevents the invocation of further handlers (like {@link #stop()} 
314     * and (in addition) the invocation of any added completed events.
315     * 
316     * @param mayInterruptIfRunning ignored
317     * @return `false` if the event has already been completed
318     * @see java.util.concurrent.Future#cancel(boolean)
319     */
320    @Override
321    public boolean cancel(boolean mayInterruptIfRunning) {
322        if (!completed && !cancelled) {
323            stop();
324            cancelled = true;
325            return true;
326        }
327        return false;
328    }
329
330    @Override
331    public boolean isCancelled() {
332        return cancelled;
333    }
334
335    /**
336     * Sets the result of handling this event. If this method is invoked 
337     * more then once, the various results are collected in a list. This
338     * can happen if the event is handled by several components. 
339     * 
340     * @param result the result to set
341     * @return the object for easy chaining
342     */
343    public Event<T> setResult(T result) {
344        synchronized (this) {
345            if (results == null) {
346                // Make sure that we have a valid result before
347                // calling decrementOpen
348                results = new ArrayList<>();
349                results.add(result);
350                firstResultAssigned();
351                return this;
352            }
353            results.add(result);
354            return this;
355        }
356    }
357
358    /**
359     * Allows access to the intermediate result before the 
360     * completion of the event. 
361     * 
362     * @return the intermediate results (which may be an empty list)
363     */
364    @Override
365    protected List<T> currentResults() {
366        return results == null ? Collections.emptyList()
367            : Collections.unmodifiableList(results);
368    }
369
370    /**
371     * Tie the result of this event to the result of the other event.
372     * Changes of either event's results will subsequently be applied
373     * to both events.
374     * <P>
375     * This is useful when an event is replaced by another event during
376     * handling like:
377     * {@code fire((new Event()).tieTo(oldEvent.stop()))}  
378     * 
379     * @param other the event to tie to
380     * @return the object for easy chaining
381     */
382    public Event<T> tieTo(Event<T> other) {
383        synchronized (this) {
384            if (other.results == null) {
385                other.results = new ArrayList<>();
386            }
387            results = other.results;
388            return this;
389        }
390    }
391
392    /**
393     * Waits for the event to be completed (see {@link #isDone()})
394     * and returns the first (or only) result.
395     * 
396     * @see Future#get()
397     */
398    @Override
399    public T get() throws InterruptedException {
400        while (true) {
401            synchronized (this) {
402                if (completed) {
403                    return results == null || results.isEmpty()
404                        ? null
405                        : results.get(0);
406                }
407                wait();
408            }
409        }
410    }
411
412    /**
413     * Causes the invoking thread to wait until the processing of the 
414     * event has been completed (see {@link #isDone()}) or the given 
415     * timeout has expired and returns the first (or only) result. 
416     * 
417     * @return the result
418     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
419     */
420    @Override
421    public T get(long timeout, TimeUnit unit)
422            throws InterruptedException, TimeoutException {
423        synchronized (this) {
424            if (completed) {
425                return results == null || results.isEmpty()
426                    ? null
427                    : results.get(0);
428            }
429            wait(unit.toMillis(timeout));
430        }
431        if (completed) {
432            return results == null || results.isEmpty()
433                ? null
434                : results.get(0);
435        }
436        throw new TimeoutException();
437    }
438
439    /**
440     * Waits for the event to be completed (see {@link #isDone()})
441     * and returns the list of results (which may be empty if the
442     * event's result type is {@link Void}).
443     * 
444     * @return the results
445     * @see Future#get()
446     */
447    public List<T> results() throws InterruptedException {
448        while (true) {
449            synchronized (this) {
450                if (completed) {
451                    return results == null ? Collections.emptyList()
452                        : Collections.unmodifiableList(results);
453                }
454                wait();
455            }
456        }
457    }
458
459    /**
460     * Causes the invoking thread to wait until the processing of the 
461     * event has been completed (see {@link #isDone()}) or given timeout 
462     * has expired and returns the list of results (which may be empty
463     * if the event's result type is {@link Void}). 
464     * 
465     * @return the results
466     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
467     */
468    public List<T> results(long timeout, TimeUnit unit)
469            throws InterruptedException, TimeoutException {
470        synchronized (this) {
471            if (completed) {
472                return results == null ? Collections.emptyList()
473                    : Collections.unmodifiableList(results);
474            }
475            wait(unit.toMillis(timeout));
476        }
477        if (completed) {
478            return results == null ? Collections.emptyList()
479                : Collections.unmodifiableList(results);
480        }
481        throw new TimeoutException();
482    }
483
484    @Override
485    @SuppressWarnings({ "PMD.ShortVariable", "unchecked" })
486    public <A extends Associator> A setAssociated(Object by, Object with) {
487        if (contextData == null) {
488            contextData = new ConcurrentHashMap<>();
489        }
490        if (with == null) {
491            contextData.remove(by);
492        } else {
493            contextData.put(by, with);
494        }
495        return (A) this;
496    }
497
498    @Override
499    @SuppressWarnings("PMD.ShortVariable")
500    public <V> Optional<V> associated(Object by, Class<V> type) {
501        if (contextData == null) {
502            return Optional.empty();
503        }
504        return Optional.ofNullable(contextData.get(by))
505            .filter(found -> type.isAssignableFrom(found.getClass()))
506            .map(type::cast);
507    }
508
509    /*
510     * (non-Javadoc)
511     * 
512     * @see java.lang.Object#toString()
513     */
514    @Override
515    public String toString() {
516        StringBuilder builder = new StringBuilder();
517        builder.append(Components.objectName(this))
518            .append(" [");
519        if (channels != null) {
520            builder.append("channels=").append(Channel.toString(channels));
521        }
522        builder.append(']');
523        return builder.toString();
524    }
525
526}