001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.ArrayDeque; 022import java.util.Queue; 023import java.util.concurrent.ConcurrentLinkedDeque; 024import java.util.concurrent.ExecutorService; 025import org.jgrapes.core.Channel; 026import org.jgrapes.core.Components; 027import org.jgrapes.core.Event; 028import org.jgrapes.core.events.Start; 029 030/** 031 * The buffering event pipeline is used before a tree has been started. 032 * It simply buffers all events until a {@link Start} event is added. 033 */ 034public class BufferingEventPipeline implements InternalEventPipeline { 035 036 private final ComponentTree componentTree; 037 /** Buffered events. */ 038 private Queue<EventChannelsTuple> buffered = new ArrayDeque<>(); 039 /** The event pipeline that we delegate to after the start 040 * event has been detected. */ 041 private InternalEventPipeline activePipeline; 042 043 /** 044 * Instantiates a new buffering event pipeline. 045 * 046 * @param componentTree the component tree 047 */ 048 /* default */ BufferingEventPipeline(ComponentTree componentTree) { 049 super(); 050 this.componentTree = componentTree; 051 } 052 053 @Override 054 public void merge(InternalEventPipeline other) { 055 synchronized (this) { 056 if (!(other instanceof BufferingEventPipeline)) { 057 throw new IllegalArgumentException( 058 "Can only merge events from an BufferingEventPipeline."); 059 } 060 buffered.addAll(((BufferingEventPipeline) other).retrieveEvents()); 061 } 062 } 063 064 @Override 065 public <T extends Event<?>> T add(T event, Channel... channels) { 066 synchronized (this) { 067 // If thread1 adds the start event and thread2 gets here before 068 // thread1 has changed the event processor for the tree, send the 069 // event to the event processor that should already have been used. 070 if (activePipeline != null) { 071 activePipeline.add(event, channels); 072 return event; 073 } 074 // Invoke although argument is null! 075 ((EventBase<?>) event).generatedBy(null); 076 EventChannelsTuple.addTo(buffered, event, channels); 077 if (event instanceof Start) { 078 // Merge all events into a "standard" event processor 079 // and set it as default processor for the tree (with 080 // any thread specific pipelines taking precedence). 081 EventProcessor processor = new EventProcessor(componentTree); 082 activePipeline 083 = new FeedBackPipelineFilter(componentTree, processor); 084 componentTree.setEventPipeline(activePipeline); 085 processor.add(buffered); 086 } 087 return event; 088 } 089 } 090 091 /* default */ Queue<EventChannelsTuple> retrieveEvents() { 092 synchronized (this) { 093 Queue<EventChannelsTuple> old = buffered; 094 buffered = new ConcurrentLinkedDeque<>(); 095 notifyAll(); 096 return old; 097 } 098 } 099 100 @Override 101 public void awaitExhaustion() throws InterruptedException { 102 synchronized (this) { 103 while (!buffered.isEmpty()) { 104 wait(); 105 } 106 } 107 } 108 109 /* 110 * (non-Javadoc) 111 * 112 * @see org.jgrapes.core.internal.InternalEventPipeline#executorService() 113 */ 114 @Override 115 public ExecutorService executorService() { 116 return Components.defaultExecutorService(); 117 } 118 119 /* 120 * (non-Javadoc) 121 * 122 * @see java.lang.Object#toString() 123 */ 124 @Override 125 public String toString() { 126 StringBuilder builder = new StringBuilder(50); 127 builder.append("BufferingEventPipeline ["); 128 // Avoid problem with concurrency 129 var bufd = buffered; 130 if (bufd != null) { 131 builder.append("buffered=").append(bufd); 132 } 133 builder.append(']'); 134 return builder.toString(); 135 } 136}