001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core;
020
021import java.lang.reflect.Array;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.function.BiConsumer;
035
036import org.jgrapes.core.events.HandlingError;
037import org.jgrapes.core.internal.EventBase;
038import org.jgrapes.core.internal.EventProcessor;
039
040/**
041 * This class is the base class for all events.
042 *  
043 * By default (i.e. as implemented by this class), the event's kind is
044 * represented by its Java class and the eligibility is based on
045 * the "is a" relationship between classes. An event is eligible if its class
046 * is equal to or a super class of the class used as criterion. 
047 * This default behavior can be changed by overriding the
048 * methods from {@link Eligible}. See {@link NamedEvent} as an example.
049 * 
050 * @param <T>
051 *            the result type of the event. Use {@link Void} if handling the
052 *            event does not produce a result
053 */
054public class Event<T> extends EventBase<T> {
055
056    /** The channels that this event is to be fired on if no
057     * channels are specified explicitly when firing. */
058    private Channel[] channels;
059    /** Indicates that the event should not processed further. */
060    private boolean stopped;
061    /** The results of handling the event (if any). */
062    private List<T> results;
063    /** Context data. */
064    private Map<Object, Object> contextData;
065    private boolean cancelled;
066
067    /**
068     * Creates a new event. Passing channels is equivalent to first
069     * creating the event and then calling {@link #setChannels(Channel...)}
070     * with the given channels.
071     * 
072     * @param channels the channels to set
073     */
074    public Event(Channel... channels) {
075        super();
076        this.channels = Arrays.copyOf(channels, channels.length);
077    }
078
079    /**
080     * Returns the class of this event as representation of its kind.
081     * 
082     * @return the class of this event
083     * 
084     * @see org.jgrapes.core.Eligible#defaultCriterion()
085     */
086    @Override
087    public Object defaultCriterion() {
088        return getClass();
089    }
090
091    /**
092     * Returns <code>true</code> if the `criterion` 
093     * is of the same class or a base class of this event's class.
094     * 
095     * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object)
096     */
097    @Override
098    public boolean isEligibleFor(Object criterion) {
099        return Class.class.isInstance(criterion)
100            && ((Class<?>) criterion).isAssignableFrom(getClass());
101    }
102
103    /**
104     * Return the event pipeline that currently processes the event
105     * (if any).
106     * 
107     * @return the event pipeline if the event is being processed
108     */
109    @SuppressWarnings("PMD.UselessOverridingMethod")
110    public Optional<EventPipeline> processedBy() {
111        return super.processedBy();
112    }
113
114    /**
115     * Implements the default behavior for handling events thrown
116     * by a handler. Fires a {@link HandlingError handling error} event
117     * for this event and the given throwable.
118     * 
119     * @see HandlingError
120     */
121    @Override
122    protected void handlingError(
123            EventPipeline eventProcessor, Throwable throwable) {
124        eventProcessor.fire(
125            new HandlingError(this, throwable), channels());
126    }
127
128    /**
129     * Sets the channels that the event is fired on if no channels
130     * are specified explicitly when firing the event
131     * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}).
132     * 
133     * @param channels the channels to set
134     * @return the object for easy chaining
135     * 
136     * @throws IllegalStateException if the method is called after
137     * this event has been fired
138     */
139    public Event<T> setChannels(Channel... channels) {
140        if (enqueued()) {
141            throw new IllegalStateException(
142                "Channels cannot be changed after fire");
143        }
144        this.channels = Arrays.copyOf(channels, channels.length);
145        return this;
146    }
147
148    /**
149     * Returns the channels associated with the event. Before an
150     * event has been fired, this returns the channels set with
151     * {@link #setChannels(Channel[])}. After an event has been
152     * fired, this returns the channels that the event has
153     * effectively been fired on 
154     * (see {@link Manager#fire(Event, Channel...)}).
155     * 
156     * @return the channels (never `null`, but may be empty)
157     */
158    @Override
159    public Channel[] channels() {
160        return Arrays.copyOf(channels, channels.length);
161    }
162
163    /**
164     * Returns the subset of channels that are assignable to the given type.
165     * 
166     * @param <C> the given type's class
167     * @param type the class to look for
168     * @return the filtered channels
169     * @see #channels()
170     */
171    @SuppressWarnings({ "unchecked", "PMD.ShortVariable",
172        "PMD.AvoidDuplicateLiterals" })
173    public <C> C[] channels(Class<C> type) {
174        return Arrays.stream(channels)
175            .filter(c -> type.isAssignableFrom(c.getClass())).toArray(
176                size -> (C[]) Array.newInstance(type, size));
177    }
178
179    /**
180     * Execute the given handler for all channels of the given type.
181     * 
182     * @param <E> the type of the event
183     * @param <C> the type of the channel
184     * @param type the channel type
185     * @param handler the handler
186     */
187    @SuppressWarnings({ "unchecked", "PMD.ShortVariable" })
188    public <E extends EventBase<?>, C extends Channel> void forChannels(
189            Class<C> type, BiConsumer<E, C> handler) {
190        Arrays.stream(channels)
191            .filter(c -> type.isAssignableFrom(c.getClass()))
192            .forEach(c -> handler.accept((E) this, (C) c));
193    }
194
195    /**
196     * Returns the events to be thrown when this event has completed
197     * (see {@link #isDone()}).
198     * 
199     * @return the completed events
200     */
201    public Set<Event<?>> completionEvents() {
202        return completionEvents == null ? Collections.emptySet()
203            : Collections.unmodifiableSet(completionEvents);
204    }
205
206    /**
207     * Adds the given event to the events to be thrown when this event 
208     * has completed (see {@link #isDone()}). Such an event is called 
209     * a "completion event".
210     * 
211     * Completion events are considered to be caused by the event that 
212     * caused the completed event. If an event *e1* caused an event
213     * *e2* which has a completion event *e2c*, *e1* is only put in 
214     * state completed when *e2c* has been handled.
215     * 
216     * Completion events are handled by the same {@link EventProcessor}
217     * as the event that has been completed.
218     * 
219     * @param completionEvent the completion event to add
220     * @return the object for easy chaining
221     */
222    public Event<T> addCompletionEvent(Event<?> completionEvent) {
223        if (completionEvents == null) {
224            completionEvents = new HashSet<>();
225        }
226        completionEvents.add(completionEvent);
227        return this;
228    }
229
230    /*
231     * (non-Javadoc)
232     * 
233     * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean)
234     */
235    @Override
236    public Event<T> setRequiresResult(boolean value) {
237        return (Event<T>) super.setRequiresResult(value);
238    }
239
240    /**
241     * Check if this event has completed. An event is completed
242     * if 
243     *  * all its handlers have been invoked (or the event has
244     *    been stopped or cancelled), 
245     *  * all events caused by it have completed,
246     *  * no {@link CompletionLock}s remain, and  
247     *  * a result has been set (only required if 
248     *    {@link #setRequiresResult(boolean)} has been called with `true`).
249     * 
250     * @return the completed state
251     */
252    @Override
253    public boolean isDone() {
254        return completed;
255    }
256
257    /**
258     * Invoked after all handlers for the event have been executed. 
259     * May be overridden by derived classes to cause some immediate effect
260     * (instead of e.g. waiting for the completion event). The default 
261     * implementation does nothing. This method is invoked by the event 
262     * handler thread and must not block.
263     */
264    protected void handled() {
265        // Default is to do nothing.
266    }
267
268    /**
269     * Can be called during the execution of an event handler to indicate
270     * that the event should not be processed further. All remaining 
271     * handlers for this event will be skipped.
272     * 
273     * @return the object for easy chaining
274     */
275    public Event<T> stop() {
276        stopped = true;
277        return this;
278    }
279
280    /**
281     * Returns <code>true</code> if {@link #stop} has been called.
282     * 
283     * @return the stopped state
284     */
285    public boolean isStopped() {
286        return stopped;
287    }
288
289    /**
290     * Prevents the invocation of further handlers (like {@link #stop()} 
291     * and (in addition) the invocation of any added completed events.
292     * 
293     * @param mayInterruptIfRunning ignored
294     * @return `false` if the event has already been completed
295     * @see java.util.concurrent.Future#cancel(boolean)
296     */
297    @Override
298    public boolean cancel(boolean mayInterruptIfRunning) {
299        if (!completed && !cancelled) {
300            stop();
301            cancelled = true;
302            return true;
303        }
304        return false;
305    }
306
307    @Override
308    public boolean isCancelled() {
309        return cancelled;
310    }
311
312    /**
313     * Sets the result of handling this event. If this method is invoked 
314     * more then once, the various results are collected in a list. This
315     * can happen if the event is handled by several components. 
316     * 
317     * @param result the result to set
318     * @return the object for easy chaining
319     */
320    public Event<T> setResult(T result) {
321        synchronized (this) {
322            if (results == null) {
323                // Make sure that we have a valid result before
324                // calling decrementOpen
325                results = new ArrayList<>();
326                results.add(result);
327                firstResultAssigned();
328                return this;
329            }
330            results.add(result);
331            return this;
332        }
333    }
334
335    /**
336     * Allows access to the intermediate result before the 
337     * completion of the event. 
338     * 
339     * @return the intermediate results (which may be an empty list)
340     */
341    protected List<T> currentResults() {
342        return results == null ? Collections.emptyList()
343            : Collections.unmodifiableList(results);
344    }
345
346    /**
347     * Tie the result of this event to the result of the other event.
348     * Changes of either event's results will subsequently be applied
349     * to both events.
350     * <P>
351     * This is useful when an event is replaced by another event during
352     * handling like:
353     * {@code fire((new Event()).tieTo(oldEvent.stop()))}  
354     * 
355     * @param other the event to tie to
356     * @return the object for easy chaining
357     */
358    public Event<T> tieTo(Event<T> other) {
359        synchronized (this) {
360            if (other.results == null) {
361                other.results = new ArrayList<>();
362            }
363            results = other.results;
364            return this;
365        }
366    }
367
368    /**
369     * Waits for the event to be completed (see {@link #isDone()})
370     * and returns the first (or only) result.
371     * 
372     * @see Future#get()
373     */
374    @Override
375    public T get() throws InterruptedException {
376        while (true) {
377            synchronized (this) {
378                if (completed) {
379                    return results == null || results.isEmpty()
380                        ? null
381                        : results.get(0);
382                }
383                wait();
384            }
385        }
386    }
387
388    /**
389     * Causes the invoking thread to wait until the processing of the 
390     * event has been completed (see {@link #isDone()}) or the given 
391     * timeout has expired and returns the first (or only) result. 
392     * 
393     * @return the result
394     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
395     */
396    @Override
397    public T get(long timeout, TimeUnit unit)
398            throws InterruptedException, TimeoutException {
399        synchronized (this) {
400            if (completed) {
401                return results == null || results.isEmpty()
402                    ? null
403                    : results.get(0);
404            }
405            wait(unit.toMillis(timeout));
406        }
407        if (completed) {
408            return results == null || results.isEmpty()
409                ? null
410                : results.get(0);
411        }
412        throw new TimeoutException();
413    }
414
415    /**
416     * Waits for the event to be completed (see {@link #isDone()})
417     * and returns the list of results (which may be empty if the
418     * event's result type is {@link Void}).
419     * 
420     * @return the results
421     * @see Future#get()
422     */
423    public List<T> results() throws InterruptedException {
424        while (true) {
425            synchronized (this) {
426                if (completed) {
427                    return results == null ? Collections.emptyList()
428                        : Collections.unmodifiableList(results);
429                }
430                wait();
431            }
432        }
433    }
434
435    /**
436     * Causes the invoking thread to wait until the processing of the 
437     * event has been completed (see {@link #isDone()}) or given timeout 
438     * has expired and returns the list of results (which may be empty
439     * if the event's result type is {@link Void}). 
440     * 
441     * @return the results
442     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
443     */
444    public List<T> results(long timeout, TimeUnit unit)
445            throws InterruptedException, TimeoutException {
446        synchronized (this) {
447            if (completed) {
448                return results == null ? Collections.emptyList()
449                    : Collections.unmodifiableList(results);
450            }
451            wait(unit.toMillis(timeout));
452        }
453        if (completed) {
454            return results == null ? Collections.emptyList()
455                : Collections.unmodifiableList(results);
456        }
457        throw new TimeoutException();
458    }
459
460    @Override
461    @SuppressWarnings("PMD.ShortVariable")
462    public Event<T> setAssociated(Object by, Object with) {
463        if (contextData == null) {
464            contextData = new ConcurrentHashMap<>();
465        }
466        if (with == null) {
467            contextData.remove(by);
468        } else {
469            contextData.put(by, with);
470        }
471        return this;
472    }
473
474    @Override
475    @SuppressWarnings("PMD.ShortVariable")
476    public <V> Optional<V> associated(Object by, Class<V> type) {
477        if (contextData == null) {
478            return Optional.empty();
479        }
480        return Optional.ofNullable(contextData.get(by))
481            .filter(found -> type.isAssignableFrom(found.getClass()))
482            .map(match -> type.cast(match));
483    }
484
485    /*
486     * (non-Javadoc)
487     * 
488     * @see java.lang.Object#toString()
489     */
490    @Override
491    public String toString() {
492        StringBuilder builder = new StringBuilder();
493        builder.append(Components.objectName(this))
494            .append(" [");
495        if (channels != null) {
496            builder.append("channels=");
497            builder.append(Channel.toString(channels));
498        }
499        builder.append(']');
500        return builder.toString();
501    }
502
503}