001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Queue;
024import java.util.Set;
025import java.util.concurrent.ConcurrentLinkedDeque;
026import java.util.concurrent.ConcurrentLinkedQueue;
027import java.util.concurrent.ExecutorService;
028import org.jgrapes.core.Channel;
029import org.jgrapes.core.Components;
030import org.jgrapes.core.Event;
031import org.jgrapes.core.EventPipeline;
032
033/**
034 * This class provides the default implementation of an {@link EventPipeline}.
035 */
036@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
037public class EventProcessor implements InternalEventPipeline, Runnable {
038
039    @SuppressWarnings("PMD.FieldNamingConventions")
040    protected static final ThreadLocal<EventBase<?>> newEventsParent
041        = new ThreadLocal<>();
042
043    private final ExecutorService executorService;
044    private final ComponentTree componentTree;
045    private final EventPipeline asEventPipeline;
046    // Must not use synchronized in toString, leads to unexpected deadlock
047    protected final Queue<EventChannelsTuple> queue
048        = new ConcurrentLinkedQueue<>();
049    private Iterator<HandlerReference> invoking;
050    // Used by this thread only.
051    private final Set<EventBase<?>> suspended = new HashSet<>();
052    // Only this thread can remove, but others might add.
053    private final Queue<EventBase<?>> toBeResumed
054        = new ConcurrentLinkedDeque<>();
055    private boolean isExecuting;
056    private final ThreadLocal<Thread> executor = new ThreadLocal<>();
057
058    /**
059     * Instantiates a new event processor.
060     *
061     * @param tree the tree
062     */
063    /* default */ EventProcessor(ComponentTree tree) {
064        this(tree, Components.defaultExecutorService());
065    }
066
067    /* default */ EventProcessor(ComponentTree tree,
068            ExecutorService executorService) {
069        this.componentTree = tree;
070        this.executorService = executorService;
071        asEventPipeline = new CheckingPipelineFilter(tree, this);
072    }
073
074    /**
075     * Gets the component tree.
076     *
077     * @return the component tree
078     */
079    protected ComponentTree tree() {
080        return componentTree;
081    }
082
083    /* default */ EventPipeline asEventPipeline() {
084        return asEventPipeline;
085    }
086
087    /**
088     * Called before adding completion events. The parent of
089     * a completion event is not the event that has completed but
090     * the event that generated the original event.
091     *
092     * @param parent the new parent
093     */
094    /* default */ void updateNewEventsParent(EventBase<?> parent) {
095        newEventsParent.set(parent);
096    }
097
098    @Override
099    public <T extends Event<?>> T add(T event, Channel... channels) {
100        ((EventBase<?>) event).generatedBy(newEventsParent.get());
101        ((EventBase<?>) event).processedBy(this);
102        synchronized (this) {
103            EventChannelsTuple.addTo(queue, event, channels);
104            if (!isExecuting) {
105                // Queue was initially empty, this starts it
106                GeneratorRegistry.instance().add(this);
107                isExecuting = true;
108                executorService.execute(this);
109            }
110        }
111        return event;
112    }
113
114    @SuppressWarnings("PMD.ConfusingTernary")
115    /* default */ void add(Queue<EventChannelsTuple> source) {
116        synchronized (this) {
117            while (true) {
118                EventChannelsTuple entry = source.poll();
119                if (entry == null) {
120                    break;
121                }
122                entry.event.processedBy(this);
123                queue.add(entry);
124            }
125            if (!isExecuting) {
126                GeneratorRegistry.instance().add(this);
127                isExecuting = true;
128                executorService.execute(this);
129            }
130        }
131    }
132
133    @Override
134    public void merge(InternalEventPipeline other) {
135        if (!(other instanceof BufferingEventPipeline)) {
136            throw new IllegalArgumentException(
137                "Can only merge events from an BufferingEventPipeline.");
138        }
139        add(((BufferingEventPipeline) other).retrieveEvents());
140    }
141
142    @Override
143    @SuppressWarnings({ "PMD.AvoidDeeplyNestedIfStmts",
144        "PMD.CognitiveComplexity" })
145    public void run() {
146        String origName = Thread.currentThread().getName();
147        try {
148            Thread.currentThread().setName(
149                origName + " (P" + Components.objectId(this) + ")");
150            executor.set(Thread.currentThread());
151            componentTree.setDispatchingPipeline(this);
152            while (true) {
153                // No lock needed, only this thread can remove from resumed
154                var resumedEvent = toBeResumed.poll();
155                if (resumedEvent != null) {
156                    if (suspended.remove(resumedEvent)) {
157                        resumedEvent.invokeWhenResumed();
158                        invokeHandlers(resumedEvent.clearSuspendedHandlers(),
159                            resumedEvent);
160                    }
161                    continue;
162                }
163
164                EventChannelsTuple next;
165                synchronized (this) {
166                    next = queue.poll();
167                    if (next == null) {
168                        // Everything is done, though suspended handlers
169                        // may cause this processor to be reactivated.
170                        GeneratorRegistry.instance().remove(this);
171                        isExecuting = false;
172                        synchronized (executor) {
173                            executor.notifyAll();
174                        }
175                        break;
176                    }
177                }
178                HandlerList handlers
179                    = componentTree.getEventHandlers(next.event, next.channels);
180                invokeHandlers(handlers.iterator(), next.event);
181            }
182        } finally {
183            // This processor should now only be (strongly) referenced
184            // from suspended events (if any exist), the
185            // CheckingPipelineFilter (which is only referenced from this)
186            // and some component tree, if this is the tree's default
187            // processor.
188            newEventsParent.set(null);
189            componentTree.setDispatchingPipeline(null);
190            executor.set(null);
191            Thread.currentThread().setName(origName);
192        }
193    }
194
195    /**
196     * Invoke all (remaining) handlers with the given event as parameter.
197     *
198     * @param handlers the handlers
199     * @param event the event
200     */
201    private void invokeHandlers(Iterator<HandlerReference> handlers,
202            EventBase<?> event) {
203        try {
204            invoking = handlers;
205            newEventsParent.set(event);
206            // invoking may be set to null by suspendHandling()
207            while (invoking != null && invoking.hasNext()) {
208                HandlerReference hdlr = invoking.next();
209                try {
210                    if (event.isStopped()) {
211                        break;
212                    }
213                    hdlr.invoke(event);
214                } catch (AssertionError t) {
215                    // JUnit support
216                    CoreUtils.setAssertionError(t);
217                    event.handlingError(asEventPipeline, t);
218                } catch (Error e) { // NOPMD
219                    // Wouldn't have caught it, if it was possible.
220                    throw e;
221                } catch (Throwable t) { // NOPMD
222                    // Errors have been rethrown, so this should work.
223                    event.handlingError(asEventPipeline, t);
224                }
225            }
226        } catch (AssertionError t) {
227            // JUnit support
228            CoreUtils.setAssertionError(t);
229            event.handlingError(asEventPipeline, t);
230        } catch (Error e) { // NOPMD
231            // Wouldn't have caught it, if it was possible.
232            throw e;
233        } catch (Throwable t) { // NOPMD
234            // Errors have been rethrown, so this should work.
235            event.handlingError(asEventPipeline, t);
236        } finally { // NOPMD
237            if (invoking != null) {
238                event.handled();
239                invoking = null;
240                newEventsParent.get().decrementOpen();
241            }
242        }
243    }
244
245    @SuppressWarnings("PMD.CompareObjectsWithEquals")
246    /* default */ void suspendHandling(EventBase<?> event) {
247        if (Thread.currentThread() != executor.get()) {
248            throw new IllegalStateException("May only be called from handler.");
249        }
250        if (!invoking.hasNext()) {
251            // Last anyway, nothing to be done
252            return;
253        }
254        event.setSuspendedHandlers(invoking);
255        invoking = null;
256        suspended.add(event);
257        // Just in case (might happen)
258        toBeResumed.remove(event);
259    }
260
261    /* default */ void resumeHandling(EventBase<?> event) {
262        toBeResumed.add(event);
263        synchronized (this) {
264            if (!isExecuting) {
265                // There were no more events, restart
266                GeneratorRegistry.instance().add(this);
267                isExecuting = true;
268                executorService.execute(this);
269            }
270        }
271    }
272
273    /*
274     * (non-Javadoc)
275     * 
276     * @see org.jgrapes.core.internal.InternalEventPipeline#executorService()
277     */
278    @Override
279    public ExecutorService executorService() {
280        return executorService;
281    }
282
283    @Override
284    public void awaitExhaustion() throws InterruptedException {
285        synchronized (executor) {
286            while (isExecuting) {
287                executor.wait();
288            }
289        }
290    }
291
292    /*
293     * (non-Javadoc)
294     * 
295     * @see java.lang.Object#toString()
296     */
297    @Override
298    public String toString() {
299        StringBuilder builder = new StringBuilder();
300        builder.append(Components.objectName(this))
301            .append(" [");
302        if (queue != null) {
303            builder.append("queue=").append(queue);
304        }
305        builder.append(']');
306        return builder.toString();
307    }
308
309}