001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.Future;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.function.Consumer;
030import org.jgrapes.core.Associator;
031import org.jgrapes.core.Channel;
032import org.jgrapes.core.CompletionEvent;
033import org.jgrapes.core.CompletionLock;
034import org.jgrapes.core.Eligible;
035import org.jgrapes.core.Event;
036import org.jgrapes.core.EventPipeline;
037
038/**
039 * Provides the implementations of methods to class {@link Event} that
040 * need access to classes or methods that are visible in the implementation
041 * package only. The class is not intended to be used as base class
042 * for any other class.
043 * 
044 * @param <T> the result type of the event. Use {@link Void} if handling
045 * the event does not produce a result
046 */
047@SuppressWarnings("PMD.TooManyMethods")
048public abstract class EventBase<T>
049        implements Eligible, Future<T>, Associator {
050
051    /** The event that caused this event. */
052    private EventBase<?> generatedBy;
053    /** Number of events that have to be processed until completion.
054     * This is one for the event itself and one more for each event
055     * that has this event as its cause. */
056    private final AtomicInteger openCount = new AtomicInteger(1);
057    /** Completion locks. */
058    private Set<CompletionLockBase> completionLocks;
059    /** Set when the event is enqueued, reset when it has been completed. */
060    private EventProcessor processedBy;
061    /** The events to be fired upon completion. Using this attribute
062     * provides a slightly faster access than invoking
063     * {@link Event#completionEvents()}, which wraps the result in
064     * an unmodifiable set. */
065    protected Set<Event<?>> completionEvents;
066    /** Temporarily set when invoking a handler, only to be used by
067     * {@link #handlingError(EventPipeline, Throwable)}. */
068    protected Channel invokedFor;
069    /** Set when the event has been completed. */
070    protected boolean completed;
071    private boolean requiresResult;
072    /** Event is tracked by {@link VerboseHandlerReference}. */
073    private boolean tracked = true;
074    /** Event handler to be invoked after resumeHandling. */
075    private Iterator<HandlerReference> suspendedHandlers;
076    private Runnable whenResumed;
077
078    /**
079     * See {@link Event#channels()}.
080     *
081     * @return the channel[]
082     */
083    public abstract Channel[] channels();
084
085    /**
086     * See {@link Event#handled()}.
087     */
088    protected abstract void handled();
089
090    /**
091     * See {@link Event#isStopped()}.
092     */
093    public abstract boolean isStopped();
094
095    /**
096     * See {@link Event#currentResults()}.
097     */
098    protected abstract List<T> currentResults();
099
100    /**
101     * See {@link Event#setRequiresResult(boolean)}.
102     */
103    protected EventBase<T> setRequiresResult(boolean value) {
104        if (requiresResult == value) {
105            return this;
106        }
107        if (value) {
108            openCount.incrementAndGet();
109            requiresResult = true;
110        } else {
111            requiresResult = false;
112            decrementOpen();
113        }
114        return this;
115    }
116
117    /**
118     * See {@link Event#firstResultAssigned()}.
119     */
120    protected void firstResultAssigned() {
121        if (requiresResult) {
122            decrementOpen();
123        }
124    }
125
126    /**
127     * Returns <code>true</code> if the event has been enqueued in a pipeline.
128     * 
129     * @return the result
130     */
131    protected boolean enqueued() {
132        return processedBy != null || completed || isCancelled();
133    }
134
135    /**
136     * Invoked when an exception occurs while invoking a handler for an event.
137     * 
138     * @param eventProcessor the manager that has invoked the handler
139     * @param throwable the exception that has been thrown by the handler
140     */
141    protected abstract void handlingError(
142            EventPipeline eventProcessor, Throwable throwable);
143
144    /**
145     * If an event is fired while processing another event, note
146     * the event being processed. This allows us to track the cause
147     * of events to the "initial" (externally) generated event that
148     * triggered everything.
149     * 
150     * @param causingEvent the causing event to set
151     */
152    /* default */ void generatedBy(EventBase<?> causingEvent) {
153        generatedBy = causingEvent;
154        if (causingEvent != null) {
155            causingEvent.openCount.incrementAndGet();
156        }
157    }
158
159    /**
160     * Set the processor that will (eventually) process the event. 
161     * 
162     * @param processor the processor
163     */
164    /* default */ void processedBy(EventProcessor processor) {
165        this.processedBy = processor;
166    }
167
168    public Optional<EventPipeline> processedBy() {
169        return Optional.ofNullable(processedBy).map(
170            procBy -> procBy.asEventPipeline());
171    }
172
173    /**
174     * Suspend the invocation of the remaining handlers for this event.
175     * May only be called in a handler for the event. Must be balanced
176     * by an invocation of {@link #resumeHandling()}. 
177     */
178    public void suspendHandling() {
179        suspendHandling(null);
180    }
181
182    /**
183     * Suspend the invocation of the remaining handlers for this event.
184     * May only be called in a handler for the event. Must be balanced
185     * by an invocation of {@link #resumeHandling()}.
186     *
187     * @param whenResumed some function to be executed when handling is resumed 
188     */
189    public void suspendHandling(Runnable whenResumed) {
190        if (processedBy == null) {
191            throw new IllegalStateException("May only be called from handler.");
192        }
193        this.whenResumed = whenResumed;
194        processedBy.suspendHandling(this);
195    }
196
197    /* default */ void invokeWhenResumed() {
198        if (whenResumed != null) {
199            whenResumed.run();
200            whenResumed = null;
201        }
202    }
203
204    /**
205     * Resume the invocation of handlers for this event.
206     * 
207     * @see #suspendHandling()
208     */
209    public void resumeHandling() {
210        if (processedBy == null) {
211            throw new IllegalStateException("Lost processor.");
212        }
213        processedBy.resumeHandling(this);
214    }
215
216    /* default */ Iterator<HandlerReference> clearSuspendedHandlers() {
217        var result = suspendedHandlers;
218        suspendedHandlers = null;
219        return result;
220    }
221
222    /* default */ void setSuspendedHandlers(
223            Iterator<HandlerReference> suspendedHandlers) {
224        this.suspendedHandlers = suspendedHandlers;
225    }
226
227    /**
228     * @param pipeline
229     */
230    @SuppressWarnings("PMD.CognitiveComplexity")
231    /* default */ void decrementOpen() {
232        if (openCount.decrementAndGet() == 0 && !completed) {
233            synchronized (this) {
234                completed = true;
235                notifyAll();
236            }
237            if (completionEvents != null && !isCancelled()) {
238                processedBy.updateNewEventsParent(generatedBy);
239                for (Event<?> e : completionEvents) {
240                    Channel[] completeChannels = e.channels();
241                    if (completeChannels.length == 0) {
242                        // Note that channels() cannot be empty, as it is set
243                        // when firing the event and an event is never fired
244                        // on no channels.
245                        completeChannels = channels();
246                        e.setChannels(completeChannels);
247                    }
248                    processedBy.add(e, completeChannels);
249                }
250            }
251            if (generatedBy != null) {
252                generatedBy.decrementOpen();
253            }
254            processedBy = null; // No longer needed
255        }
256    }
257
258    /**
259     * Adds the given completion lock. 
260     * 
261     * @param lock the lock
262     * @see CompletionLock
263     */
264    /* default */ Event<T> addCompletionLock(CompletionLockBase lock) {
265        synchronized (this) {
266            if (completionLocks == null) {
267                completionLocks = Collections.synchronizedSet(new HashSet<>());
268            }
269        }
270        if (completionLocks.add(lock)) {
271            openCount.incrementAndGet();
272            lock.startTimer();
273        }
274        return (Event<T>) this;
275    }
276
277    /**
278     * Removes the given completion lock. 
279     * 
280     * @param lock the lock
281     * @see CompletionLock
282     */
283    /* default */ void removeCompletionLock(CompletionLockBase lock) {
284        if (completionLocks == null) {
285            return;
286        }
287        if (completionLocks.remove(lock)) {
288            decrementOpen();
289        }
290        lock.cancelTimer();
291    }
292
293    /**
294     * Disables tracking for this event and all events generated
295     * when handling it.
296     */
297    public Event<T> disableTracking() {
298        tracked = false;
299        return (Event<T>) this;
300    }
301
302    /**
303     * Whether the event (and all events generated when handling it)
304     * is tracked.
305     * 
306     * @return `true` if event is tracked
307     */
308    public boolean isTracked() {
309        return tracked;
310    }
311
312    @SuppressWarnings("PMD.UselessParentheses")
313    /* default */ boolean isTrackable() {
314        return generatedBy == null ? tracked
315            : (tracked && generatedBy.isTrackable());
316    }
317
318    /**
319     * Adds the given event to the events to be thrown when this event 
320     * has completed (see {@link #isDone()}). Such an event is called 
321     * a "completion event".
322     * 
323     * Completion events are considered to be caused by the event that 
324     * caused the completed event. If an event *e1* caused an event
325     * *e2* which has a completion event *e2c*, *e1* is only put in 
326     * state completed when *e2c* has been handled.
327     * 
328     * Completion events are handled by the same {@link EventProcessor}
329     * as the event that has been completed.
330     * 
331     * @param completionEvent the completion event to add
332     * @return the object for easy chaining
333     * @see #onCompletion(Event, Consumer)
334     */
335    public abstract Event<T> addCompletionEvent(Event<?> completionEvent);
336
337    /**
338     * Invokes the consumer when the event is completed. This is
339     * a shortcut for registering a {@link CompletionEvent} and
340     * providing a handler for the completion event that invokes 
341     * the consumer.
342     * 
343     * The static form is required because otherwise the compiler cannot
344     * infer the type of the consumer's argument.
345     *
346     * @param <T> the result type of the event
347     * @param <E> the type of the event
348     * @param event the event
349     * @param consumer the consumer
350     * @return the event
351     */
352    public static <T, E extends Event<T>> E onCompletion(E event,
353            Consumer<E> consumer) {
354        event.addCompletionEvent(new ActionEvent<Event<T>>(
355            event.getClass().getSimpleName() + "CompletionAction") {
356            @Override
357            public void execute() throws Exception {
358                consumer.accept(event);
359            }
360        });
361        return event;
362    }
363}