001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Optional;
025import java.util.Set;
026import java.util.concurrent.Future;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.jgrapes.core.Associator;
029import org.jgrapes.core.Channel;
030import org.jgrapes.core.CompletionLock;
031import org.jgrapes.core.Eligible;
032import org.jgrapes.core.Event;
033import org.jgrapes.core.EventPipeline;
034
035/**
036 * Provides the implementations of methods to class {@link Event} that
037 * need access to classes or methods that are visible in the implementation
038 * package only. The class is not intended to be used as base class
039 * for any other class.
040 * 
041 * @param <T> the result type of the event. Use {@link Void} if handling
042 * the event does not produce a result
043 */
044public abstract class EventBase<T>
045        implements Eligible, Future<T>, Associator {
046
047    /** The event that caused this event. */
048    private EventBase<?> generatedBy;
049    /** Number of events that have to be processed until completion.
050     * This is one for the event itself and one more for each event
051     * that has this event as its cause. */
052    private final AtomicInteger openCount = new AtomicInteger(1);
053    /** Completion locks. */
054    private Set<CompletionLockBase> completionLocks;
055    /** Set when the event is enqueued, reset when it has been completed. */
056    private EventProcessor processedBy;
057    /** The events to be fired upon completion. Using this attribute
058     * provides a slightly faster access than invoking
059     * {@link Event#completionEvents()}, which wraps the result in
060     * an unmodifiable set. */
061    protected Set<Event<?>> completionEvents;
062    /** Set when the event has been completed. */
063    protected boolean completed;
064    private boolean requiresResult;
065    /** Event is tracked by {@link VerboseHandlerReference}. */
066    private boolean tracked = true;
067
068    /**
069     * See {@link Event#channels()}.
070     *
071     * @return the channel[]
072     */
073    public abstract Channel[] channels();
074
075    /**
076     * See {@link Event#handled()}.
077     */
078    protected abstract void handled();
079
080    /**
081     * See {@link Event#isStopped()}.
082     */
083    public abstract boolean isStopped();
084
085    /**
086     * See {@link Event#currentResults()}.
087     */
088    protected abstract List<T> currentResults();
089
090    /**
091     * See {@link Event#setRequiresResult(boolean)}.
092     */
093    protected EventBase<T> setRequiresResult(boolean value) {
094        if (requiresResult == value) {
095            return this;
096        }
097        if (value) {
098            openCount.incrementAndGet();
099            requiresResult = true;
100        } else {
101            requiresResult = false;
102            decrementOpen();
103        }
104        return this;
105    }
106
107    /**
108     * See {@link Event#firstResultAssigned()}.
109     */
110    protected void firstResultAssigned() {
111        if (requiresResult) {
112            decrementOpen();
113        }
114    }
115
116    /**
117     * Returns <code>true</code> if the event has been enqueued in a pipeline.
118     * 
119     * @return the result
120     */
121    protected boolean enqueued() {
122        return processedBy != null || completed || isCancelled();
123    }
124
125    /**
126     * Invoked when an exception occurs while invoking a handler for an event.
127     * 
128     * @param eventProcessor the manager that has invoked the handler
129     * @param throwable the exception that has been thrown by the handler
130     */
131    protected abstract void handlingError(
132            EventPipeline eventProcessor, Throwable throwable);
133
134    /**
135     * If an event is fired while processing another event, note
136     * the event being processed. This allows us to track the cause
137     * of events to the "initial" (externally) generated event that
138     * triggered everything.
139     * 
140     * @param causingEvent the causing event to set
141     */
142    /* default */ void generatedBy(EventBase<?> causingEvent) {
143        generatedBy = causingEvent;
144        if (causingEvent != null) {
145            causingEvent.openCount.incrementAndGet();
146        }
147    }
148
149    /**
150     * Set the processor that will (eventually) process the event. 
151     * 
152     * @param processor the processor
153     */
154    /* default */ void processedBy(EventProcessor processor) {
155        this.processedBy = processor;
156    }
157
158    public Optional<EventPipeline> processedBy() {
159        return Optional.ofNullable(processedBy).map(
160            procBy -> procBy.asEventPipeline());
161    }
162
163    /**
164     * @param pipeline
165     */
166    /* default */ void decrementOpen() {
167        if (openCount.decrementAndGet() == 0 && !completed) {
168            synchronized (this) {
169                completed = true;
170                notifyAll();
171            }
172            if (completionEvents != null && !isCancelled()) {
173                processedBy.updateNewEventsParent(generatedBy);
174                for (Event<?> e : completionEvents) {
175                    Channel[] completeChannels = e.channels();
176                    if (completeChannels.length == 0) {
177                        // Note that channels() cannot be empty, as it is set
178                        // when firing the event and an event is never fired
179                        // on no channels.
180                        completeChannels = channels();
181                        e.setChannels(completeChannels);
182                    }
183                    processedBy.add(e, completeChannels);
184                }
185            }
186            if (generatedBy != null) {
187                generatedBy.decrementOpen();
188            }
189            processedBy = null; // No longer needed
190        }
191    }
192
193    /**
194     * Adds the given completion lock. 
195     * 
196     * @param lock the lock
197     * @see CompletionLock
198     */
199    /* default */ Event<T> addCompletionLock(CompletionLockBase lock) {
200        synchronized (this) {
201            if (completionLocks == null) {
202                completionLocks = Collections.synchronizedSet(new HashSet<>());
203            }
204        }
205        if (completionLocks.add(lock)) {
206            openCount.incrementAndGet();
207            lock.startTimer();
208        }
209        return (Event<T>) this;
210    }
211
212    /**
213     * Removes the given completion lock. 
214     * 
215     * @param lock the lock
216     * @see CompletionLock
217     */
218    /* default */ void removeCompletionLock(CompletionLockBase lock) {
219        if (completionLocks == null) {
220            return;
221        }
222        if (completionLocks.remove(lock)) {
223            decrementOpen();
224        }
225        lock.cancelTimer();
226    }
227
228    /**
229     * Disables tracking for this event and all events generated
230     * when handling it.
231     */
232    public Event<T> disableTracking() {
233        tracked = false;
234        return (Event<T>) this;
235    }
236
237    /**
238     * Whether the event (and all events generated when handling it)
239     * is tracked.
240     * 
241     * @return `true` if event is tracked
242     */
243    public boolean isTracked() {
244        return tracked;
245    }
246
247    /* default */ boolean isTrackable() {
248        return generatedBy == null ? tracked
249            : (tracked && generatedBy.isTrackable());
250    }
251}