001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2026 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.ArrayDeque; 022import java.util.Queue; 023import java.util.concurrent.ConcurrentLinkedDeque; 024import java.util.concurrent.ExecutorService; 025import org.jgrapes.core.Channel; 026import org.jgrapes.core.Components; 027import org.jgrapes.core.Event; 028import org.jgrapes.core.events.Start; 029 030/** 031 * The buffering event pipeline is used before a tree has been started. 032 * It simply buffers all events until a {@link Start} event is added. 033 */ 034@SuppressWarnings("PMD.AvoidSynchronizedStatement") 035public class BufferingEventPipeline implements InternalEventPipeline { 036 037 private final ComponentTree componentTree; 038 /** Buffered events. */ 039 private Queue<EventChannelsTuple> buffered = new ArrayDeque<>(); 040 /** The event pipeline that we delegate to after the start 041 * event has been detected. */ 042 private InternalEventPipeline activePipeline; 043 044 /** 045 * Instantiates a new buffering event pipeline. 046 * 047 * @param componentTree the component tree 048 */ 049 /* default */ BufferingEventPipeline(ComponentTree componentTree) { 050 super(); 051 this.componentTree = componentTree; 052 } 053 054 @Override 055 public void merge(InternalEventPipeline other) { 056 synchronized (this) { 057 if (!(other instanceof BufferingEventPipeline)) { 058 throw new IllegalArgumentException( 059 "Can only merge events from an BufferingEventPipeline."); 060 } 061 buffered.addAll(((BufferingEventPipeline) other).retrieveEvents()); 062 } 063 } 064 065 @Override 066 public <T extends Event<?>> T add(T event, Channel... channels) { 067 synchronized (this) { 068 // If thread1 adds the start event and thread2 gets here before 069 // thread1 has changed the event processor for the tree, send the 070 // event to the event processor that should already have been used. 071 if (activePipeline != null) { 072 activePipeline.add(event, channels); 073 return event; 074 } 075 // Invoke although argument is null! 076 ((EventBase<?>) event).generatedBy(null); // NOPMD (cast) 077 EventChannelsTuple.addTo(buffered, event, channels); 078 if (event instanceof Start) { 079 // Merge all events into a "standard" event processor 080 // and set it as default processor for the tree (with 081 // any thread specific pipelines taking precedence). 082 EventProcessor processor = new EventProcessor(componentTree); 083 activePipeline 084 = new FeedBackPipelineFilter(componentTree, processor); 085 componentTree.setEventPipeline(activePipeline); 086 processor.add(buffered); 087 } 088 return event; 089 } 090 } 091 092 /* default */ Queue<EventChannelsTuple> retrieveEvents() { 093 synchronized (this) { 094 Queue<EventChannelsTuple> old = buffered; 095 buffered = new ConcurrentLinkedDeque<>(); 096 notifyAll(); 097 return old; 098 } 099 } 100 101 @Override 102 public void awaitExhaustion() throws InterruptedException { 103 synchronized (this) { 104 while (!buffered.isEmpty()) { 105 wait(); 106 } 107 } 108 } 109 110 /* 111 * (non-Javadoc) 112 * 113 * @see org.jgrapes.core.internal.InternalEventPipeline#executorService() 114 */ 115 @Override 116 public ExecutorService executorService() { 117 return Components.defaultExecutorService(); 118 } 119 120 /* 121 * (non-Javadoc) 122 * 123 * @see java.lang.Object#toString() 124 */ 125 @Override 126 public String toString() { 127 StringBuilder builder = new StringBuilder(50); 128 builder.append("BufferingEventPipeline ["); 129 // Avoid problem with concurrency 130 var bufd = buffered; 131 if (bufd != null) { 132 builder.append("buffered=").append(bufd); 133 } 134 builder.append(']'); 135 return builder.toString(); 136 } 137}