001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core;
020
021import java.lang.reflect.Array;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.function.BiConsumer;
035import org.jgrapes.core.events.HandlingError;
036import org.jgrapes.core.internal.EventBase;
037
038/**
039 * This class is the base class for all events.
040 *  
041 * By default (i.e. as implemented by this class), the event's kind is
042 * represented by its Java class and the eligibility is based on
043 * the "is a" relationship between classes. An event is eligible if its class
044 * is equal to or a super class of the class used as criterion. 
045 * This default behavior can be changed by overriding the
046 * methods from {@link Eligible}. See {@link NamedEvent} as an example.
047 * 
048 * @param <T>
049 *            the result type of the event. Use {@link Void} if handling the
050 *            event does not produce a result
051 */
052@SuppressWarnings({ "PMD.GodClass", "PMD.TooManyMethods" })
053public class Event<T> extends EventBase<T> {
054
055    /** The channels that this event is to be fired on if no
056     * channels are specified explicitly when firing. */
057    private Channel[] channels;
058    /** Indicates that the event should not processed further. */
059    private boolean stopped;
060    /** The results of handling the event (if any). */
061    private List<T> results;
062    /** Context data. */
063    private Map<Object, Object> contextData;
064    private boolean cancelled;
065
066    /**
067     * Creates a new event. Passing channels is equivalent to first
068     * creating the event and then calling {@link #setChannels(Channel...)}
069     * with the given channels.
070     * 
071     * @param channels the channels to set
072     */
073    public Event(Channel... channels) {
074        super();
075        this.channels = Arrays.copyOf(channels, channels.length);
076    }
077
078    /**
079     * Returns the class of this event as representation of its kind.
080     * 
081     * @return the class of this event
082     * 
083     * @see org.jgrapes.core.Eligible#defaultCriterion()
084     */
085    @Override
086    public Object defaultCriterion() {
087        return getClass();
088    }
089
090    /**
091     * Returns <code>true</code> if the `criterion` 
092     * is of the same class or a base class of this event's class.
093     * 
094     * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object)
095     */
096    @Override
097    public boolean isEligibleFor(Object criterion) {
098        return Class.class.isInstance(criterion)
099            && ((Class<?>) criterion).isAssignableFrom(getClass());
100    }
101
102    /**
103     * Return the event pipeline that currently processes the event
104     * (if any).
105     * 
106     * @return the event pipeline if the event is being processed
107     */
108    @SuppressWarnings({ "PMD.UselessOverridingMethod",
109        "PMD.AvoidDuplicateLiterals" })
110    @Override
111    public Optional<EventPipeline> processedBy() {
112        return super.processedBy();
113    }
114
115    /**
116     * Implements the default behavior for handling events thrown
117     * by a handler. Fires a {@link HandlingError handling error} event
118     * for this event and the given throwable.
119     * 
120     * @see HandlingError
121     */
122    @Override
123    protected void handlingError(
124            EventPipeline eventProcessor, Throwable throwable) {
125        if (invokedFor != null) {
126            eventProcessor.fire(new HandlingError(this, throwable), invokedFor);
127        } else {
128            eventProcessor.fire(new HandlingError(this, throwable), channels());
129        }
130    }
131
132    /**
133     * Sets the channels that the event is fired on if no channels
134     * are specified explicitly when firing the event
135     * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}).
136     * 
137     * @param channels the channels to set
138     * @return the object for easy chaining
139     * 
140     * @throws IllegalStateException if the method is called after
141     * this event has been fired
142     */
143    public Event<T> setChannels(Channel... channels) {
144        if (enqueued()) {
145            throw new IllegalStateException(
146                "Channels cannot be changed after fire");
147        }
148        this.channels = Arrays.copyOf(channels, channels.length);
149        return this;
150    }
151
152    /**
153     * Returns the channels associated with the event. Before an
154     * event has been fired, this returns the channels set with
155     * {@link #setChannels(Channel[])}. After an event has been
156     * fired, this returns the channels that the event has
157     * effectively been fired on 
158     * (see {@link Manager#fire(Event, Channel...)}).
159     * 
160     * @return the channels (never `null`, but may be empty)
161     */
162    @Override
163    public Channel[] channels() {
164        return Arrays.copyOf(channels, channels.length);
165    }
166
167    /**
168     * Returns the subset of channels that are assignable to the given type.
169     * 
170     * @param <C> the given type's class
171     * @param type the class to look for
172     * @return the filtered channels
173     * @see #channels()
174     */
175    @SuppressWarnings({ "unchecked", "PMD.ShortVariable",
176        "PMD.AvoidDuplicateLiterals" })
177    public <C> C[] channels(Class<C> type) {
178        return Arrays.stream(channels)
179            .filter(c -> type.isAssignableFrom(c.getClass())).toArray(
180                size -> (C[]) Array.newInstance(type, size));
181    }
182
183    /**
184     * Execute the given handler for all channels of the given type.
185     * 
186     * @param <E> the type of the event
187     * @param <C> the type of the channel
188     * @param type the channel type
189     * @param handler the handler
190     */
191    @SuppressWarnings({ "unchecked", "PMD.ShortVariable" })
192    public <E extends EventBase<?>, C extends Channel> void forChannels(
193            Class<C> type, BiConsumer<E, C> handler) {
194        Arrays.stream(channels)
195            .filter(c -> type.isAssignableFrom(c.getClass()))
196            .forEach(c -> handler.accept((E) this, (C) c));
197    }
198
199    /**
200     * Returns the events to be thrown when this event has completed
201     * (see {@link #isDone()}).
202     * 
203     * @return the completed events
204     */
205    public Set<Event<?>> completionEvents() {
206        return completionEvents == null ? Collections.emptySet()
207            : Collections.unmodifiableSet(completionEvents);
208    }
209
210    /**
211     * {@inheritDoc}
212     */
213    @Override
214    public Event<T> addCompletionEvent(Event<?> completionEvent) {
215        if (completionEvents == null) {
216            completionEvents = new HashSet<>();
217        }
218        completionEvents.add(completionEvent);
219        return this;
220    }
221
222    /*
223     * (non-Javadoc)
224     * 
225     * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean)
226     */
227    @Override
228    public Event<T> setRequiresResult(boolean value) {
229        return (Event<T>) super.setRequiresResult(value);
230    }
231
232    /**
233     * Check if this event has completed. An event is completed
234     * if 
235     *  * all its handlers have been invoked (or the event has
236     *    been stopped or cancelled), 
237     *  * all events caused by it have completed,
238     *  * no {@link CompletionLock}s remain, and  
239     *  * a result has been set (only required if 
240     *    {@link #setRequiresResult(boolean)} has been called with `true`).
241     * 
242     * @return the completed state
243     */
244    @Override
245    public boolean isDone() {
246        return completed;
247    }
248
249    /**
250     * Invoked after all handlers for the event have been executed. 
251     * May be overridden by derived classes to cause some immediate effect
252     * (instead of e.g. waiting for the completion event). The default 
253     * implementation does nothing. This method is invoked by the event 
254     * handler thread and must not block.
255     */
256    protected void handled() {
257        // Default is to do nothing.
258    }
259
260    /**
261     * {@inheritDoc}
262     */
263    @Override
264    @SuppressWarnings("PMD.UselessOverridingMethod")
265    public void suspendHandling() {
266        super.suspendHandling();
267    }
268
269    /**
270     * {@inheritDoc}
271     */
272    @Override
273    @SuppressWarnings("PMD.UselessOverridingMethod")
274    public void suspendHandling(Runnable whenResumed) {
275        super.suspendHandling(whenResumed);
276    }
277
278    /**
279     * {@inheritDoc}
280     */
281    @Override
282    @SuppressWarnings("PMD.UselessOverridingMethod")
283    public void resumeHandling() {
284        super.resumeHandling();
285    }
286
287    /**
288     * Can be called during the execution of an event handler to indicate
289     * that the event should not be processed further. All remaining 
290     * handlers for this event will be skipped.
291     * 
292     * @return the object for easy chaining
293     */
294    public Event<T> stop() {
295        stopped = true;
296        // Just in case.
297        resumeHandling();
298        return this;
299    }
300
301    /**
302     * Returns <code>true</code> if {@link #stop} has been called.
303     * 
304     * @return the stopped state
305     */
306    public boolean isStopped() {
307        return stopped;
308    }
309
310    /**
311     * Prevents the invocation of further handlers (like {@link #stop()} 
312     * and (in addition) the invocation of any added completed events.
313     * 
314     * @param mayInterruptIfRunning ignored
315     * @return `false` if the event has already been completed
316     * @see java.util.concurrent.Future#cancel(boolean)
317     */
318    @Override
319    public boolean cancel(boolean mayInterruptIfRunning) {
320        if (!completed && !cancelled) {
321            stop();
322            cancelled = true;
323            return true;
324        }
325        return false;
326    }
327
328    @Override
329    public boolean isCancelled() {
330        return cancelled;
331    }
332
333    /**
334     * Sets the result of handling this event. If this method is invoked 
335     * more then once, the various results are collected in a list. This
336     * can happen if the event is handled by several components. 
337     * 
338     * @param result the result to set
339     * @return the object for easy chaining
340     */
341    public Event<T> setResult(T result) {
342        synchronized (this) {
343            if (results == null) {
344                // Make sure that we have a valid result before
345                // calling decrementOpen
346                results = new ArrayList<>();
347                results.add(result);
348                firstResultAssigned();
349                return this;
350            }
351            results.add(result);
352            return this;
353        }
354    }
355
356    /**
357     * Allows access to the intermediate result before the 
358     * completion of the event. 
359     * 
360     * @return the intermediate results (which may be an empty list)
361     */
362    protected List<T> currentResults() {
363        return results == null ? Collections.emptyList()
364            : Collections.unmodifiableList(results);
365    }
366
367    /**
368     * Tie the result of this event to the result of the other event.
369     * Changes of either event's results will subsequently be applied
370     * to both events.
371     * <P>
372     * This is useful when an event is replaced by another event during
373     * handling like:
374     * {@code fire((new Event()).tieTo(oldEvent.stop()))}  
375     * 
376     * @param other the event to tie to
377     * @return the object for easy chaining
378     */
379    public Event<T> tieTo(Event<T> other) {
380        synchronized (this) {
381            if (other.results == null) {
382                other.results = new ArrayList<>();
383            }
384            results = other.results;
385            return this;
386        }
387    }
388
389    /**
390     * Waits for the event to be completed (see {@link #isDone()})
391     * and returns the first (or only) result.
392     * 
393     * @see Future#get()
394     */
395    @Override
396    public T get() throws InterruptedException {
397        while (true) {
398            synchronized (this) {
399                if (completed) {
400                    return results == null || results.isEmpty()
401                        ? null
402                        : results.get(0);
403                }
404                wait();
405            }
406        }
407    }
408
409    /**
410     * Causes the invoking thread to wait until the processing of the 
411     * event has been completed (see {@link #isDone()}) or the given 
412     * timeout has expired and returns the first (or only) result. 
413     * 
414     * @return the result
415     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
416     */
417    @Override
418    public T get(long timeout, TimeUnit unit)
419            throws InterruptedException, TimeoutException {
420        synchronized (this) {
421            if (completed) {
422                return results == null || results.isEmpty()
423                    ? null
424                    : results.get(0);
425            }
426            wait(unit.toMillis(timeout));
427        }
428        if (completed) {
429            return results == null || results.isEmpty()
430                ? null
431                : results.get(0);
432        }
433        throw new TimeoutException();
434    }
435
436    /**
437     * Waits for the event to be completed (see {@link #isDone()})
438     * and returns the list of results (which may be empty if the
439     * event's result type is {@link Void}).
440     * 
441     * @return the results
442     * @see Future#get()
443     */
444    public List<T> results() throws InterruptedException {
445        while (true) {
446            synchronized (this) {
447                if (completed) {
448                    return results == null ? Collections.emptyList()
449                        : Collections.unmodifiableList(results);
450                }
451                wait();
452            }
453        }
454    }
455
456    /**
457     * Causes the invoking thread to wait until the processing of the 
458     * event has been completed (see {@link #isDone()}) or given timeout 
459     * has expired and returns the list of results (which may be empty
460     * if the event's result type is {@link Void}). 
461     * 
462     * @return the results
463     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
464     */
465    public List<T> results(long timeout, TimeUnit unit)
466            throws InterruptedException, TimeoutException {
467        synchronized (this) {
468            if (completed) {
469                return results == null ? Collections.emptyList()
470                    : Collections.unmodifiableList(results);
471            }
472            wait(unit.toMillis(timeout));
473        }
474        if (completed) {
475            return results == null ? Collections.emptyList()
476                : Collections.unmodifiableList(results);
477        }
478        throw new TimeoutException();
479    }
480
481    @Override
482    @SuppressWarnings({ "PMD.ShortVariable", "unchecked" })
483    public <A extends Associator> A setAssociated(Object by, Object with) {
484        if (contextData == null) {
485            contextData = new ConcurrentHashMap<>();
486        }
487        if (with == null) {
488            contextData.remove(by);
489        } else {
490            contextData.put(by, with);
491        }
492        return (A) this;
493    }
494
495    @Override
496    @SuppressWarnings("PMD.ShortVariable")
497    public <V> Optional<V> associated(Object by, Class<V> type) {
498        if (contextData == null) {
499            return Optional.empty();
500        }
501        return Optional.ofNullable(contextData.get(by))
502            .filter(found -> type.isAssignableFrom(found.getClass()))
503            .map(type::cast);
504    }
505
506    /*
507     * (non-Javadoc)
508     * 
509     * @see java.lang.Object#toString()
510     */
511    @Override
512    public String toString() {
513        StringBuilder builder = new StringBuilder();
514        builder.append(Components.objectName(this))
515            .append(" [");
516        if (channels != null) {
517            builder.append("channels=");
518            builder.append(Channel.toString(channels));
519        }
520        builder.append(']');
521        return builder.toString();
522    }
523
524}