001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2026 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.ArrayDeque;
022import java.util.Queue;
023import java.util.concurrent.ConcurrentLinkedDeque;
024import java.util.concurrent.ExecutorService;
025import org.jgrapes.core.Channel;
026import org.jgrapes.core.Components;
027import org.jgrapes.core.Event;
028import org.jgrapes.core.events.Start;
029
030/**
031 * The buffering event pipeline is used before a tree has been started. 
032 * It simply buffers all events until a {@link Start} event is added.
033 */
034@SuppressWarnings("PMD.AvoidSynchronizedStatement")
035public class BufferingEventPipeline implements InternalEventPipeline {
036
037    private final ComponentTree componentTree;
038    /** Buffered events. */
039    private Queue<EventChannelsTuple> buffered = new ArrayDeque<>();
040    /** The event pipeline that we delegate to after the start
041     * event has been detected. */
042    private InternalEventPipeline activePipeline;
043
044    /**
045     * Instantiates a new buffering event pipeline.
046     *
047     * @param componentTree the component tree
048     */
049    /* default */ BufferingEventPipeline(ComponentTree componentTree) {
050        super();
051        this.componentTree = componentTree;
052    }
053
054    @Override
055    public void merge(InternalEventPipeline other) {
056        synchronized (this) {
057            if (!(other instanceof BufferingEventPipeline)) {
058                throw new IllegalArgumentException(
059                    "Can only merge events from an BufferingEventPipeline.");
060            }
061            buffered.addAll(((BufferingEventPipeline) other).retrieveEvents());
062        }
063    }
064
065    @Override
066    public <T extends Event<?>> T add(T event, Channel... channels) {
067        synchronized (this) {
068            // If thread1 adds the start event and thread2 gets here before
069            // thread1 has changed the event processor for the tree, send the
070            // event to the event processor that should already have been used.
071            if (activePipeline != null) {
072                activePipeline.add(event, channels);
073                return event;
074            }
075            // Invoke although argument is null!
076            ((EventBase<?>) event).generatedBy(null); // NOPMD (cast)
077            EventChannelsTuple.addTo(buffered, event, channels);
078            if (event instanceof Start) {
079                // Merge all events into a "standard" event processor
080                // and set it as default processor for the tree (with
081                // any thread specific pipelines taking precedence).
082                EventProcessor processor = new EventProcessor(componentTree);
083                activePipeline
084                    = new FeedBackPipelineFilter(componentTree, processor);
085                componentTree.setEventPipeline(activePipeline);
086                processor.add(buffered);
087            }
088            return event;
089        }
090    }
091
092    /* default */ Queue<EventChannelsTuple> retrieveEvents() {
093        synchronized (this) {
094            Queue<EventChannelsTuple> old = buffered;
095            buffered = new ConcurrentLinkedDeque<>();
096            notifyAll();
097            return old;
098        }
099    }
100
101    @Override
102    public void awaitExhaustion() throws InterruptedException {
103        synchronized (this) {
104            while (!buffered.isEmpty()) {
105                wait();
106            }
107        }
108    }
109
110    /*
111     * (non-Javadoc)
112     * 
113     * @see org.jgrapes.core.internal.InternalEventPipeline#executorService()
114     */
115    @Override
116    public ExecutorService executorService() {
117        return Components.defaultExecutorService();
118    }
119
120    /*
121     * (non-Javadoc)
122     * 
123     * @see java.lang.Object#toString()
124     */
125    @Override
126    public String toString() {
127        StringBuilder builder = new StringBuilder(50);
128        builder.append("BufferingEventPipeline [");
129        // Avoid problem with concurrency
130        var bufd = buffered;
131        if (bufd != null) {
132            builder.append("buffered=").append(bufd);
133        }
134        builder.append(']');
135        return builder.toString();
136    }
137}