001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2026 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Queue;
024import java.util.Set;
025import java.util.concurrent.ConcurrentLinkedDeque;
026import java.util.concurrent.ConcurrentLinkedQueue;
027import java.util.concurrent.ExecutorService;
028import org.jgrapes.core.Channel;
029import org.jgrapes.core.Components;
030import org.jgrapes.core.Event;
031import org.jgrapes.core.EventPipeline;
032
033/**
034 * This class provides the default implementation of an {@link EventPipeline}.
035 */
036@SuppressWarnings("PMD.AvoidSynchronizedStatement")
037public class EventProcessor implements InternalEventPipeline, Runnable {
038
039    @SuppressWarnings("PMD.FieldNamingConventions")
040    protected static final ThreadLocal<EventBase<?>> newEventsParent
041        = new ThreadLocal<>();
042
043    private final ExecutorService executorService;
044    private final ComponentTree componentTree;
045    private final EventPipeline asEventPipeline;
046    // Must not use synchronized in toString, leads to unexpected deadlock
047    protected final Queue<EventChannelsTuple> queue
048        = new ConcurrentLinkedQueue<>();
049    private Iterator<HandlerReference> invoking;
050    // Used by this thread only.
051    private final Set<EventBase<?>> suspended = new HashSet<>();
052    // Only this thread can remove, but others might add.
053    private final Queue<EventBase<?>> toBeResumed
054        = new ConcurrentLinkedDeque<>();
055    private boolean isExecuting;
056    private final ThreadLocal<Thread> executor = new ThreadLocal<>();
057
058    /**
059     * Instantiates a new event processor.
060     *
061     * @param tree the tree
062     */
063    /* default */ EventProcessor(ComponentTree tree) {
064        this(tree, Components.defaultExecutorService());
065    }
066
067    /* default */ EventProcessor(ComponentTree tree,
068            ExecutorService executorService) {
069        this.componentTree = tree;
070        this.executorService = executorService;
071        asEventPipeline = new CheckingPipelineFilter(tree, this);
072    }
073
074    /**
075     * Gets the component tree.
076     *
077     * @return the component tree
078     */
079    protected ComponentTree tree() {
080        return componentTree;
081    }
082
083    /* default */ EventPipeline asEventPipeline() {
084        return asEventPipeline;
085    }
086
087    /**
088     * Called before adding completion events. The parent of
089     * a completion event is not the event that has completed but
090     * the event that generated the original event.
091     *
092     * @param parent the new parent
093     */
094    /* default */ void updateNewEventsParent(EventBase<?> parent) {
095        newEventsParent.set(parent);
096    }
097
098    @Override
099    public <T extends Event<?>> T add(T event, Channel... channels) {
100        ((EventBase<?>) event).generatedBy(newEventsParent.get()); // NOPMD
101        ((EventBase<?>) event).processedBy(this); // NOPMD (cast)
102        synchronized (this) {
103            EventChannelsTuple.addTo(queue, event, channels);
104            if (!isExecuting) {
105                // Queue was initially empty, this starts it
106                GeneratorRegistry.instance().add(this);
107                isExecuting = true;
108                executorService.execute(this);
109            }
110        }
111        return event;
112    }
113
114    /* default */ void add(Queue<EventChannelsTuple> source) {
115        synchronized (this) {
116            while (true) {
117                EventChannelsTuple entry = source.poll();
118                if (entry == null) {
119                    break;
120                }
121                entry.event.processedBy(this);
122                queue.add(entry);
123            }
124            if (!isExecuting) {
125                GeneratorRegistry.instance().add(this);
126                isExecuting = true;
127                executorService.execute(this);
128            }
129        }
130    }
131
132    @Override
133    public void merge(InternalEventPipeline other) {
134        if (!(other instanceof BufferingEventPipeline)) {
135            throw new IllegalArgumentException(
136                "Can only merge events from an BufferingEventPipeline.");
137        }
138        add(((BufferingEventPipeline) other).retrieveEvents());
139    }
140
141    @Override
142    public void run() {
143        String origName = Thread.currentThread().getName();
144        try {
145            Thread.currentThread().setName(
146                origName + " (P" + Components.objectId(this) + ")");
147            executor.set(Thread.currentThread());
148            componentTree.setDispatchingPipeline(this);
149            while (true) {
150                // No lock needed, only this thread can remove from resumed
151                var resumedEvent = toBeResumed.poll();
152                if (resumedEvent != null) {
153                    if (suspended.remove(resumedEvent)) {
154                        resumedEvent.invokeWhenResumed();
155                        invokeHandlers(resumedEvent.clearSuspendedHandlers(),
156                            resumedEvent);
157                    }
158                    continue;
159                }
160
161                EventChannelsTuple next;
162                synchronized (this) {
163                    next = queue.poll();
164                    if (next == null) {
165                        // Everything is done, though suspended handlers
166                        // may cause this processor to be reactivated.
167                        GeneratorRegistry.instance().remove(this);
168                        isExecuting = false;
169                        synchronized (executor) {
170                            executor.notifyAll();
171                        }
172                        break;
173                    }
174                }
175                @SuppressWarnings("PMD.LooseCoupling")
176                HandlerList handlers
177                    = componentTree.getEventHandlers(next.event, next.channels);
178                invokeHandlers(handlers.iterator(), next.event);
179            }
180        } finally {
181            // This processor should now only be (strongly) referenced
182            // from suspended events (if any exist), the
183            // CheckingPipelineFilter (which is only referenced from this)
184            // and some component tree, if this is the tree's default
185            // processor.
186            newEventsParent.set(null);
187            componentTree.setDispatchingPipeline(null);
188            executor.set(null);
189            Thread.currentThread().setName(origName);
190        }
191    }
192
193    /**
194     * Invoke all (remaining) handlers with the given event as parameter.
195     *
196     * @param handlers the handlers
197     * @param event the event
198     */
199    private void invokeHandlers(Iterator<HandlerReference> handlers,
200            EventBase<?> event) {
201        try {
202            invoking = handlers;
203            newEventsParent.set(event);
204            // invoking may be set to null by suspendHandling()
205            while (invoking != null && invoking.hasNext()) {
206                HandlerReference hdlr = invoking.next();
207                try {
208                    if (event.isStopped()) {
209                        break;
210                    }
211                    hdlr.invoke(event);
212                } catch (AssertionError t) {
213                    // JUnit support
214                    CoreUtils.setAssertionError(t);
215                    event.handlingError(asEventPipeline, t);
216                    event.invokedFor = null;
217                } catch (Error e) { // NOPMD
218                    // Wouldn't have caught it, if it was possible.
219                    throw e;
220                } catch (Throwable t) { // NOPMD
221                    // Errors have been rethrown, so this should work.
222                    event.handlingError(asEventPipeline, t);
223                    event.invokedFor = null;
224                }
225            }
226        } catch (AssertionError t) {
227            // JUnit support
228            CoreUtils.setAssertionError(t);
229            event.handlingError(asEventPipeline, t);
230        } catch (Error e) { // NOPMD
231            // Wouldn't have caught it, if it was possible.
232            throw e;
233        } catch (Throwable t) { // NOPMD
234            // Errors have been rethrown, so this should work.
235            event.handlingError(asEventPipeline, t);
236        } finally {
237            if (invoking != null) {
238                event.handled();
239                invoking = null;
240                newEventsParent.get().decrementOpen();
241            }
242        }
243    }
244
245    @SuppressWarnings("PMD.CompareObjectsWithEquals")
246    /* default */ void suspendHandling(EventBase<?> event) {
247        if (Thread.currentThread() != executor.get()) {
248            throw new IllegalStateException("May only be called from handler.");
249        }
250        if (!invoking.hasNext()) {
251            // Last anyway, nothing to be done
252            return;
253        }
254        event.setSuspendedHandlers(invoking);
255        invoking = null;
256        suspended.add(event);
257        // Just in case (might happen)
258        toBeResumed.remove(event);
259    }
260
261    /* default */ void resumeHandling(EventBase<?> event) {
262        toBeResumed.add(event);
263        synchronized (this) {
264            if (!isExecuting) {
265                // There were no more events, restart
266                GeneratorRegistry.instance().add(this);
267                isExecuting = true;
268                executorService.execute(this);
269            }
270        }
271    }
272
273    /*
274     * (non-Javadoc)
275     * 
276     * @see org.jgrapes.core.internal.InternalEventPipeline#executorService()
277     */
278    @Override
279    public ExecutorService executorService() {
280        return executorService;
281    }
282
283    @Override
284    public void awaitExhaustion() throws InterruptedException {
285        synchronized (executor) {
286            while (isExecuting) {
287                executor.wait();
288            }
289        }
290    }
291
292    /*
293     * (non-Javadoc)
294     * 
295     * @see java.lang.Object#toString()
296     */
297    @Override
298    public String toString() {
299        StringBuilder builder = new StringBuilder();
300        builder.append(Components.objectName(this))
301            .append(" [");
302        if (queue != null) {
303            builder.append("queue=").append(queue);
304        }
305        builder.append(']');
306        return builder.toString();
307    }
308
309}