001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.Future;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.function.Consumer;
030import org.jgrapes.core.Associator;
031import org.jgrapes.core.Channel;
032import org.jgrapes.core.CompletionEvent;
033import org.jgrapes.core.CompletionLock;
034import org.jgrapes.core.Eligible;
035import org.jgrapes.core.Event;
036import org.jgrapes.core.EventPipeline;
037
038/**
039 * Provides the implementations of methods to class {@link Event} that
040 * need access to classes or methods that are visible in the implementation
041 * package only. The class is not intended to be used as base class
042 * for any other class.
043 * 
044 * @param <T> the result type of the event. Use {@link Void} if handling
045 * the event does not produce a result
046 */
047@SuppressWarnings("PMD.TooManyMethods")
048public abstract class EventBase<T>
049        implements Eligible, Future<T>, Associator {
050
051    /** The event that caused this event. */
052    private EventBase<?> generatedBy;
053    /** Number of events that have to be processed until completion.
054     * This is one for the event itself and one more for each event
055     * that has this event as its cause. */
056    private final AtomicInteger openCount = new AtomicInteger(1);
057    /** Completion locks. */
058    private Set<CompletionLockBase> completionLocks;
059    /** Set when the event is enqueued, reset when it has been completed. */
060    private EventProcessor processedBy;
061    /** The events to be fired upon completion. Using this attribute
062     * provides a slightly faster access than invoking
063     * {@link Event#completionEvents()}, which wraps the result in
064     * an unmodifiable set. */
065    protected Set<Event<?>> completionEvents;
066    /** Set when the event has been completed. */
067    protected boolean completed;
068    private boolean requiresResult;
069    /** Event is tracked by {@link VerboseHandlerReference}. */
070    private boolean tracked = true;
071    /** Event handler to be invoked after resumeHandling. */
072    private Iterator<HandlerReference> suspendedHandlers;
073    private Runnable whenResumed;
074
075    /**
076     * See {@link Event#channels()}.
077     *
078     * @return the channel[]
079     */
080    public abstract Channel[] channels();
081
082    /**
083     * See {@link Event#handled()}.
084     */
085    protected abstract void handled();
086
087    /**
088     * See {@link Event#isStopped()}.
089     */
090    public abstract boolean isStopped();
091
092    /**
093     * See {@link Event#currentResults()}.
094     */
095    protected abstract List<T> currentResults();
096
097    /**
098     * See {@link Event#setRequiresResult(boolean)}.
099     */
100    protected EventBase<T> setRequiresResult(boolean value) {
101        if (requiresResult == value) {
102            return this;
103        }
104        if (value) {
105            openCount.incrementAndGet();
106            requiresResult = true;
107        } else {
108            requiresResult = false;
109            decrementOpen();
110        }
111        return this;
112    }
113
114    /**
115     * See {@link Event#firstResultAssigned()}.
116     */
117    protected void firstResultAssigned() {
118        if (requiresResult) {
119            decrementOpen();
120        }
121    }
122
123    /**
124     * Returns <code>true</code> if the event has been enqueued in a pipeline.
125     * 
126     * @return the result
127     */
128    protected boolean enqueued() {
129        return processedBy != null || completed || isCancelled();
130    }
131
132    /**
133     * Invoked when an exception occurs while invoking a handler for an event.
134     * 
135     * @param eventProcessor the manager that has invoked the handler
136     * @param throwable the exception that has been thrown by the handler
137     */
138    protected abstract void handlingError(
139            EventPipeline eventProcessor, Throwable throwable);
140
141    /**
142     * If an event is fired while processing another event, note
143     * the event being processed. This allows us to track the cause
144     * of events to the "initial" (externally) generated event that
145     * triggered everything.
146     * 
147     * @param causingEvent the causing event to set
148     */
149    /* default */ void generatedBy(EventBase<?> causingEvent) {
150        generatedBy = causingEvent;
151        if (causingEvent != null) {
152            causingEvent.openCount.incrementAndGet();
153        }
154    }
155
156    /**
157     * Set the processor that will (eventually) process the event. 
158     * 
159     * @param processor the processor
160     */
161    /* default */ void processedBy(EventProcessor processor) {
162        this.processedBy = processor;
163    }
164
165    public Optional<EventPipeline> processedBy() {
166        return Optional.ofNullable(processedBy).map(
167            procBy -> procBy.asEventPipeline());
168    }
169
170    /**
171     * Suspend the invocation of the remaining handlers for this event.
172     * May only be called in a handler for the event. Must be balanced
173     * by an invocation of {@link #resumeHandling()}. 
174     */
175    public void suspendHandling() {
176        suspendHandling(null);
177    }
178
179    /**
180     * Suspend the invocation of the remaining handlers for this event.
181     * May only be called in a handler for the event. Must be balanced
182     * by an invocation of {@link #resumeHandling()}.
183     *
184     * @param whenResumed some function to be executed when handling is resumed 
185     */
186    public void suspendHandling(Runnable whenResumed) {
187        if (processedBy == null) {
188            throw new IllegalStateException("May only be called from handler.");
189        }
190        this.whenResumed = whenResumed;
191        processedBy.suspendHandling(this);
192    }
193
194    /* default */ void invokeWhenResumed() {
195        if (whenResumed != null) {
196            whenResumed.run();
197            whenResumed = null;
198        }
199    }
200
201    /**
202     * Resume the invocation of handlers for this event.
203     * 
204     * @see #suspendHandling()
205     */
206    public void resumeHandling() {
207        if (processedBy == null) {
208            throw new IllegalStateException("Lost processor.");
209        }
210        processedBy.resumeHandling(this);
211    }
212
213    /* default */ Iterator<HandlerReference> clearSuspendedHandlers() {
214        var result = suspendedHandlers;
215        suspendedHandlers = null;
216        return result;
217    }
218
219    /* default */ void setSuspendedHandlers(
220            Iterator<HandlerReference> suspendedHandlers) {
221        this.suspendedHandlers = suspendedHandlers;
222    }
223
224    /**
225     * @param pipeline
226     */
227    @SuppressWarnings("PMD.CognitiveComplexity")
228    /* default */ void decrementOpen() {
229        if (openCount.decrementAndGet() == 0 && !completed) {
230            synchronized (this) {
231                completed = true;
232                notifyAll();
233            }
234            if (completionEvents != null && !isCancelled()) {
235                processedBy.updateNewEventsParent(generatedBy);
236                for (Event<?> e : completionEvents) {
237                    Channel[] completeChannels = e.channels();
238                    if (completeChannels.length == 0) {
239                        // Note that channels() cannot be empty, as it is set
240                        // when firing the event and an event is never fired
241                        // on no channels.
242                        completeChannels = channels();
243                        e.setChannels(completeChannels);
244                    }
245                    processedBy.add(e, completeChannels);
246                }
247            }
248            if (generatedBy != null) {
249                generatedBy.decrementOpen();
250            }
251            processedBy = null; // No longer needed
252        }
253    }
254
255    /**
256     * Adds the given completion lock. 
257     * 
258     * @param lock the lock
259     * @see CompletionLock
260     */
261    /* default */ Event<T> addCompletionLock(CompletionLockBase lock) {
262        synchronized (this) {
263            if (completionLocks == null) {
264                completionLocks = Collections.synchronizedSet(new HashSet<>());
265            }
266        }
267        if (completionLocks.add(lock)) {
268            openCount.incrementAndGet();
269            lock.startTimer();
270        }
271        return (Event<T>) this;
272    }
273
274    /**
275     * Removes the given completion lock. 
276     * 
277     * @param lock the lock
278     * @see CompletionLock
279     */
280    /* default */ void removeCompletionLock(CompletionLockBase lock) {
281        if (completionLocks == null) {
282            return;
283        }
284        if (completionLocks.remove(lock)) {
285            decrementOpen();
286        }
287        lock.cancelTimer();
288    }
289
290    /**
291     * Disables tracking for this event and all events generated
292     * when handling it.
293     */
294    public Event<T> disableTracking() {
295        tracked = false;
296        return (Event<T>) this;
297    }
298
299    /**
300     * Whether the event (and all events generated when handling it)
301     * is tracked.
302     * 
303     * @return `true` if event is tracked
304     */
305    public boolean isTracked() {
306        return tracked;
307    }
308
309    @SuppressWarnings("PMD.UselessParentheses")
310    /* default */ boolean isTrackable() {
311        return generatedBy == null ? tracked
312            : (tracked && generatedBy.isTrackable());
313    }
314
315    /**
316     * Adds the given event to the events to be thrown when this event 
317     * has completed (see {@link #isDone()}). Such an event is called 
318     * a "completion event".
319     * 
320     * Completion events are considered to be caused by the event that 
321     * caused the completed event. If an event *e1* caused an event
322     * *e2* which has a completion event *e2c*, *e1* is only put in 
323     * state completed when *e2c* has been handled.
324     * 
325     * Completion events are handled by the same {@link EventProcessor}
326     * as the event that has been completed.
327     * 
328     * @param completionEvent the completion event to add
329     * @return the object for easy chaining
330     * @see #onCompletion(Event, Consumer)
331     */
332    public abstract Event<T> addCompletionEvent(Event<?> completionEvent);
333
334    /**
335     * Invokes the consumer when the event is completed. This is
336     * a shortcut for registering a {@link CompletionEvent} and
337     * providing a handler for the completion event that invokes 
338     * the consumer.
339     * 
340     * The static form is required because otherwise the compiler cannot
341     * infer the type of the consumer's argument.
342     *
343     * @param <T> the result type of the event
344     * @param <E> the type of the event
345     * @param event the event
346     * @param consumer the consumer
347     * @return the event
348     */
349    public static <T, E extends Event<T>> E onCompletion(E event,
350            Consumer<E> consumer) {
351        event.addCompletionEvent(new ActionEvent<Event<T>>(
352            event.getClass().getSimpleName() + "CompletionAction") {
353            @Override
354            public void execute() throws Exception {
355                consumer.accept(event);
356            }
357        });
358        return event;
359    }
360}