001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.ArrayDeque;
022import java.util.Queue;
023import java.util.concurrent.ConcurrentLinkedDeque;
024import java.util.concurrent.ExecutorService;
025import org.jgrapes.core.Channel;
026import org.jgrapes.core.Components;
027import org.jgrapes.core.Event;
028import org.jgrapes.core.events.Start;
029
030/**
031 * The buffering event pipeline is used before a tree has been started. 
032 * It simply buffers all events until a {@link Start} event is added.
033 */
034public class BufferingEventPipeline implements InternalEventPipeline {
035
036    private final ComponentTree componentTree;
037    /** Buffered events. */
038    private Queue<EventChannelsTuple> buffered = new ArrayDeque<>();
039    /** The event pipeline that we delegate to after the start
040     * event has been detected. */
041    private InternalEventPipeline activePipeline;
042
043    /**
044     * Instantiates a new buffering event pipeline.
045     *
046     * @param componentTree the component tree
047     */
048    /* default */ BufferingEventPipeline(ComponentTree componentTree) {
049        super();
050        this.componentTree = componentTree;
051    }
052
053    @Override
054    public void merge(InternalEventPipeline other) {
055        synchronized (this) {
056            if (!(other instanceof BufferingEventPipeline)) {
057                throw new IllegalArgumentException(
058                    "Can only merge events from an BufferingEventPipeline.");
059            }
060            buffered.addAll(((BufferingEventPipeline) other).retrieveEvents());
061        }
062    }
063
064    @Override
065    public <T extends Event<?>> T add(T event, Channel... channels) {
066        synchronized (this) {
067            // If thread1 adds the start event and thread2 gets here before
068            // thread1 has changed the event processor for the tree, send the
069            // event to the event processor that should already have been used.
070            if (activePipeline != null) {
071                activePipeline.add(event, channels);
072                return event;
073            }
074            // Invoke although argument is null!
075            ((EventBase<?>) event).generatedBy(null);
076            EventChannelsTuple.addTo(buffered, event, channels);
077            if (event instanceof Start) {
078                // Merge all events into a "standard" event processor
079                // and set it as default processor for the tree (with
080                // any thread specific pipelines taking precedence).
081                EventProcessor processor = new EventProcessor(componentTree);
082                activePipeline
083                    = new FeedBackPipelineFilter(componentTree, processor);
084                componentTree.setEventPipeline(activePipeline);
085                processor.add(buffered);
086            }
087            return event;
088        }
089    }
090
091    /* default */ Queue<EventChannelsTuple> retrieveEvents() {
092        synchronized (this) {
093            Queue<EventChannelsTuple> old = buffered;
094            buffered = new ConcurrentLinkedDeque<>();
095            notifyAll();
096            return old;
097        }
098    }
099
100    @Override
101    public void awaitExhaustion() throws InterruptedException {
102        synchronized (this) {
103            while (!buffered.isEmpty()) {
104                wait();
105            }
106        }
107    }
108
109    /*
110     * (non-Javadoc)
111     * 
112     * @see org.jgrapes.core.internal.InternalEventPipeline#executorService()
113     */
114    @Override
115    public ExecutorService executorService() {
116        return Components.defaultExecutorService();
117    }
118
119    /*
120     * (non-Javadoc)
121     * 
122     * @see java.lang.Object#toString()
123     */
124    @Override
125    public String toString() {
126        StringBuilder builder = new StringBuilder(50);
127        builder.append("BufferingEventPipeline [");
128        // Avoid problem with concurrency
129        var bufd = buffered;
130        if (bufd != null) {
131            builder.append("buffered=").append(bufd);
132        }
133        builder.append(']');
134        return builder.toString();
135    }
136}