001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.concurrent.ExecutorService;
022
023import org.jgrapes.core.Channel;
024import org.jgrapes.core.Components;
025import org.jgrapes.core.Event;
026import org.jgrapes.core.EventPipeline;
027
028/**
029 * This class provides the default implementation of an {@link EventPipeline}.
030 */
031public class EventProcessor implements InternalEventPipeline, Runnable {
032
033    @SuppressWarnings("PMD.FieldNamingConventions")
034    protected static final ThreadLocal<EventBase<?>> newEventsParent
035        = new ThreadLocal<>();
036
037    private final ExecutorService executorService;
038    private final ComponentTree componentTree;
039    private final EventPipeline asEventPipeline;
040    protected final EventQueue queue = new EventQueue();
041    private boolean isExecuting;
042
043    /**
044     * Instantiates a new event processor.
045     *
046     * @param tree the tree
047     */
048    /* default */ EventProcessor(ComponentTree tree) {
049        this(tree, Components.defaultExecutorService());
050    }
051
052    /* default */ EventProcessor(ComponentTree tree,
053            ExecutorService executorService) {
054        this.componentTree = tree;
055        this.executorService = executorService;
056        asEventPipeline = new CheckingPipelineFilter(tree, this);
057    }
058
059    /**
060     * Gets the component tree.
061     *
062     * @return the component tree
063     */
064    protected ComponentTree tree() {
065        return componentTree;
066    }
067
068    /* default */ EventPipeline asEventPipeline() {
069        return asEventPipeline;
070    }
071
072    /**
073     * Called before adding completion events. The parent of
074     * a completion event is not the event that has completed but
075     * the event that generated the original event.
076     *
077     * @param parent the new parent
078     */
079    /* default */ void updateNewEventsParent(EventBase<?> parent) {
080        newEventsParent.set(parent);
081    }
082
083    @Override
084    public <T extends Event<?>> T add(T event, Channel... channels) {
085        ((EventBase<?>) event).generatedBy(newEventsParent.get());
086        ((EventBase<?>) event).processedBy(this);
087        queue.add(event, channels);
088        synchronized (this) {
089            if (!isExecuting) {
090                GeneratorRegistry.instance().add(this);
091                isExecuting = true;
092                executorService.execute(this);
093            }
094        }
095        return event;
096    }
097
098    /* default */ void add(EventQueue source) {
099        while (true) {
100            EventChannelsTuple entry = source.poll();
101            if (entry == null) {
102                break;
103            }
104            entry.event.processedBy(this);
105            queue.add(entry);
106        }
107        synchronized (this) {
108            if (!isExecuting) {
109                GeneratorRegistry.instance().add(this);
110                isExecuting = true;
111                executorService.execute(this);
112            }
113        }
114    }
115
116    @Override
117    public void merge(InternalEventPipeline other) {
118        if (!(other instanceof BufferingEventPipeline)) {
119            throw new IllegalArgumentException(
120                "Can only merge events from an BufferingEventPipeline.");
121        }
122        add(((BufferingEventPipeline) other).retrieveEvents());
123    }
124
125    @Override
126    public void run() {
127        String origName = Thread.currentThread().getName();
128        try {
129            Thread.currentThread().setName(
130                origName + " (P" + Components.objectId(this) + ")");
131            componentTree.setDispatchingPipeline(this);
132            while (true) {
133                // No lock needed if queue is filled
134                EventChannelsTuple next = queue.peek();
135                if (next == null) {
136                    synchronized (this) {
137                        // Retry with lock for proper synchronization
138                        next = queue.peek();
139                        if (next == null) {
140                            GeneratorRegistry.instance().remove(this);
141                            isExecuting = false;
142                            break;
143                        }
144                    }
145                }
146                newEventsParent.set(next.event);
147                componentTree.dispatch(
148                    asEventPipeline, next.event, next.channels);
149                newEventsParent.get().decrementOpen();
150                queue.remove();
151            }
152        } finally {
153            newEventsParent.set(null);
154            componentTree.setDispatchingPipeline(null);
155            Thread.currentThread().setName(origName);
156        }
157    }
158
159    /*
160     * (non-Javadoc)
161     * 
162     * @see org.jgrapes.core.internal.InternalEventPipeline#executorService()
163     */
164    @Override
165    public ExecutorService executorService() {
166        return executorService;
167    }
168
169    /*
170     * (non-Javadoc)
171     * 
172     * @see java.lang.Object#toString()
173     */
174    @Override
175    public String toString() {
176        StringBuilder builder = new StringBuilder();
177        builder.append(Components.objectName(this))
178            .append(" [");
179        if (queue != null) {
180            builder.append("queue=")
181                .append(queue);
182        }
183        builder.append(']');
184        return builder.toString();
185    }
186
187}