001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core;
020
021import java.lang.reflect.Array;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.function.BiConsumer;
035import org.jgrapes.core.events.HandlingError;
036import org.jgrapes.core.internal.EventBase;
037
038/**
039 * This class is the base class for all events.
040 *  
041 * By default (i.e. as implemented by this class), the event's kind is
042 * represented by its Java class and the eligibility is based on
043 * the "is a" relationship between classes. An event is eligible if its class
044 * is equal to or a super class of the class used as criterion. 
045 * This default behavior can be changed by overriding the
046 * methods from {@link Eligible}. See {@link NamedEvent} as an example.
047 * 
048 * @param <T>
049 *            the result type of the event. Use {@link Void} if handling the
050 *            event does not produce a result
051 */
052@SuppressWarnings({ "PMD.GodClass", "PMD.TooManyMethods" })
053public class Event<T> extends EventBase<T> {
054
055    /** The channels that this event is to be fired on if no
056     * channels are specified explicitly when firing. */
057    private Channel[] channels;
058    /** Indicates that the event should not processed further. */
059    private boolean stopped;
060    /** The results of handling the event (if any). */
061    private List<T> results;
062    /** Context data. */
063    private Map<Object, Object> contextData;
064    private boolean cancelled;
065
066    /**
067     * Creates a new event. Passing channels is equivalent to first
068     * creating the event and then calling {@link #setChannels(Channel...)}
069     * with the given channels.
070     * 
071     * @param channels the channels to set
072     */
073    public Event(Channel... channels) {
074        super();
075        this.channels = Arrays.copyOf(channels, channels.length);
076    }
077
078    /**
079     * Returns the class of this event as representation of its kind.
080     * 
081     * @return the class of this event
082     * 
083     * @see org.jgrapes.core.Eligible#defaultCriterion()
084     */
085    @Override
086    public Object defaultCriterion() {
087        return getClass();
088    }
089
090    /**
091     * Returns <code>true</code> if the `criterion` 
092     * is of the same class or a base class of this event's class.
093     * 
094     * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object)
095     */
096    @Override
097    public boolean isEligibleFor(Object criterion) {
098        return Class.class.isInstance(criterion)
099            && ((Class<?>) criterion).isAssignableFrom(getClass());
100    }
101
102    /**
103     * Return the event pipeline that currently processes the event
104     * (if any).
105     * 
106     * @return the event pipeline if the event is being processed
107     */
108    @SuppressWarnings({ "PMD.UselessOverridingMethod",
109        "PMD.AvoidDuplicateLiterals" })
110    @Override
111    public Optional<EventPipeline> processedBy() {
112        return super.processedBy();
113    }
114
115    /**
116     * Implements the default behavior for handling events thrown
117     * by a handler. Fires a {@link HandlingError handling error} event
118     * for this event and the given throwable.
119     * 
120     * @see HandlingError
121     */
122    @Override
123    protected void handlingError(
124            EventPipeline eventProcessor, Throwable throwable) {
125        eventProcessor.fire(
126            new HandlingError(this, throwable), channels());
127    }
128
129    /**
130     * Sets the channels that the event is fired on if no channels
131     * are specified explicitly when firing the event
132     * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}).
133     * 
134     * @param channels the channels to set
135     * @return the object for easy chaining
136     * 
137     * @throws IllegalStateException if the method is called after
138     * this event has been fired
139     */
140    public Event<T> setChannels(Channel... channels) {
141        if (enqueued()) {
142            throw new IllegalStateException(
143                "Channels cannot be changed after fire");
144        }
145        this.channels = Arrays.copyOf(channels, channels.length);
146        return this;
147    }
148
149    /**
150     * Returns the channels associated with the event. Before an
151     * event has been fired, this returns the channels set with
152     * {@link #setChannels(Channel[])}. After an event has been
153     * fired, this returns the channels that the event has
154     * effectively been fired on 
155     * (see {@link Manager#fire(Event, Channel...)}).
156     * 
157     * @return the channels (never `null`, but may be empty)
158     */
159    @Override
160    public Channel[] channels() {
161        return Arrays.copyOf(channels, channels.length);
162    }
163
164    /**
165     * Returns the subset of channels that are assignable to the given type.
166     * 
167     * @param <C> the given type's class
168     * @param type the class to look for
169     * @return the filtered channels
170     * @see #channels()
171     */
172    @SuppressWarnings({ "unchecked", "PMD.ShortVariable",
173        "PMD.AvoidDuplicateLiterals" })
174    public <C> C[] channels(Class<C> type) {
175        return Arrays.stream(channels)
176            .filter(c -> type.isAssignableFrom(c.getClass())).toArray(
177                size -> (C[]) Array.newInstance(type, size));
178    }
179
180    /**
181     * Execute the given handler for all channels of the given type.
182     * 
183     * @param <E> the type of the event
184     * @param <C> the type of the channel
185     * @param type the channel type
186     * @param handler the handler
187     */
188    @SuppressWarnings({ "unchecked", "PMD.ShortVariable" })
189    public <E extends EventBase<?>, C extends Channel> void forChannels(
190            Class<C> type, BiConsumer<E, C> handler) {
191        Arrays.stream(channels)
192            .filter(c -> type.isAssignableFrom(c.getClass()))
193            .forEach(c -> handler.accept((E) this, (C) c));
194    }
195
196    /**
197     * Returns the events to be thrown when this event has completed
198     * (see {@link #isDone()}).
199     * 
200     * @return the completed events
201     */
202    public Set<Event<?>> completionEvents() {
203        return completionEvents == null ? Collections.emptySet()
204            : Collections.unmodifiableSet(completionEvents);
205    }
206
207    /**
208     * {@inheritDoc}
209     */
210    @Override
211    public Event<T> addCompletionEvent(Event<?> completionEvent) {
212        if (completionEvents == null) {
213            completionEvents = new HashSet<>();
214        }
215        completionEvents.add(completionEvent);
216        return this;
217    }
218
219    /*
220     * (non-Javadoc)
221     * 
222     * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean)
223     */
224    @Override
225    public Event<T> setRequiresResult(boolean value) {
226        return (Event<T>) super.setRequiresResult(value);
227    }
228
229    /**
230     * Check if this event has completed. An event is completed
231     * if 
232     *  * all its handlers have been invoked (or the event has
233     *    been stopped or cancelled), 
234     *  * all events caused by it have completed,
235     *  * no {@link CompletionLock}s remain, and  
236     *  * a result has been set (only required if 
237     *    {@link #setRequiresResult(boolean)} has been called with `true`).
238     * 
239     * @return the completed state
240     */
241    @Override
242    public boolean isDone() {
243        return completed;
244    }
245
246    /**
247     * Invoked after all handlers for the event have been executed. 
248     * May be overridden by derived classes to cause some immediate effect
249     * (instead of e.g. waiting for the completion event). The default 
250     * implementation does nothing. This method is invoked by the event 
251     * handler thread and must not block.
252     */
253    protected void handled() {
254        // Default is to do nothing.
255    }
256
257    /**
258     * {@inheritDoc}
259     */
260    @Override
261    @SuppressWarnings("PMD.UselessOverridingMethod")
262    public void suspendHandling() {
263        super.suspendHandling();
264    }
265
266    /**
267     * {@inheritDoc}
268     */
269    @Override
270    @SuppressWarnings("PMD.UselessOverridingMethod")
271    public void suspendHandling(Runnable whenResumed) {
272        super.suspendHandling(whenResumed);
273    }
274
275    /**
276     * {@inheritDoc}
277     */
278    @Override
279    @SuppressWarnings("PMD.UselessOverridingMethod")
280    public void resumeHandling() {
281        super.resumeHandling();
282    }
283
284    /**
285     * Can be called during the execution of an event handler to indicate
286     * that the event should not be processed further. All remaining 
287     * handlers for this event will be skipped.
288     * 
289     * @return the object for easy chaining
290     */
291    public Event<T> stop() {
292        stopped = true;
293        // Just in case.
294        resumeHandling();
295        return this;
296    }
297
298    /**
299     * Returns <code>true</code> if {@link #stop} has been called.
300     * 
301     * @return the stopped state
302     */
303    public boolean isStopped() {
304        return stopped;
305    }
306
307    /**
308     * Prevents the invocation of further handlers (like {@link #stop()} 
309     * and (in addition) the invocation of any added completed events.
310     * 
311     * @param mayInterruptIfRunning ignored
312     * @return `false` if the event has already been completed
313     * @see java.util.concurrent.Future#cancel(boolean)
314     */
315    @Override
316    public boolean cancel(boolean mayInterruptIfRunning) {
317        if (!completed && !cancelled) {
318            stop();
319            cancelled = true;
320            return true;
321        }
322        return false;
323    }
324
325    @Override
326    public boolean isCancelled() {
327        return cancelled;
328    }
329
330    /**
331     * Sets the result of handling this event. If this method is invoked 
332     * more then once, the various results are collected in a list. This
333     * can happen if the event is handled by several components. 
334     * 
335     * @param result the result to set
336     * @return the object for easy chaining
337     */
338    public Event<T> setResult(T result) {
339        synchronized (this) {
340            if (results == null) {
341                // Make sure that we have a valid result before
342                // calling decrementOpen
343                results = new ArrayList<>();
344                results.add(result);
345                firstResultAssigned();
346                return this;
347            }
348            results.add(result);
349            return this;
350        }
351    }
352
353    /**
354     * Allows access to the intermediate result before the 
355     * completion of the event. 
356     * 
357     * @return the intermediate results (which may be an empty list)
358     */
359    protected List<T> currentResults() {
360        return results == null ? Collections.emptyList()
361            : Collections.unmodifiableList(results);
362    }
363
364    /**
365     * Tie the result of this event to the result of the other event.
366     * Changes of either event's results will subsequently be applied
367     * to both events.
368     * <P>
369     * This is useful when an event is replaced by another event during
370     * handling like:
371     * {@code fire((new Event()).tieTo(oldEvent.stop()))}  
372     * 
373     * @param other the event to tie to
374     * @return the object for easy chaining
375     */
376    public Event<T> tieTo(Event<T> other) {
377        synchronized (this) {
378            if (other.results == null) {
379                other.results = new ArrayList<>();
380            }
381            results = other.results;
382            return this;
383        }
384    }
385
386    /**
387     * Waits for the event to be completed (see {@link #isDone()})
388     * and returns the first (or only) result.
389     * 
390     * @see Future#get()
391     */
392    @Override
393    public T get() throws InterruptedException {
394        while (true) {
395            synchronized (this) {
396                if (completed) {
397                    return results == null || results.isEmpty()
398                        ? null
399                        : results.get(0);
400                }
401                wait();
402            }
403        }
404    }
405
406    /**
407     * Causes the invoking thread to wait until the processing of the 
408     * event has been completed (see {@link #isDone()}) or the given 
409     * timeout has expired and returns the first (or only) result. 
410     * 
411     * @return the result
412     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
413     */
414    @Override
415    public T get(long timeout, TimeUnit unit)
416            throws InterruptedException, TimeoutException {
417        synchronized (this) {
418            if (completed) {
419                return results == null || results.isEmpty()
420                    ? null
421                    : results.get(0);
422            }
423            wait(unit.toMillis(timeout));
424        }
425        if (completed) {
426            return results == null || results.isEmpty()
427                ? null
428                : results.get(0);
429        }
430        throw new TimeoutException();
431    }
432
433    /**
434     * Waits for the event to be completed (see {@link #isDone()})
435     * and returns the list of results (which may be empty if the
436     * event's result type is {@link Void}).
437     * 
438     * @return the results
439     * @see Future#get()
440     */
441    public List<T> results() throws InterruptedException {
442        while (true) {
443            synchronized (this) {
444                if (completed) {
445                    return results == null ? Collections.emptyList()
446                        : Collections.unmodifiableList(results);
447                }
448                wait();
449            }
450        }
451    }
452
453    /**
454     * Causes the invoking thread to wait until the processing of the 
455     * event has been completed (see {@link #isDone()}) or given timeout 
456     * has expired and returns the list of results (which may be empty
457     * if the event's result type is {@link Void}). 
458     * 
459     * @return the results
460     * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
461     */
462    public List<T> results(long timeout, TimeUnit unit)
463            throws InterruptedException, TimeoutException {
464        synchronized (this) {
465            if (completed) {
466                return results == null ? Collections.emptyList()
467                    : Collections.unmodifiableList(results);
468            }
469            wait(unit.toMillis(timeout));
470        }
471        if (completed) {
472            return results == null ? Collections.emptyList()
473                : Collections.unmodifiableList(results);
474        }
475        throw new TimeoutException();
476    }
477
478    @Override
479    @SuppressWarnings("PMD.ShortVariable")
480    public Event<T> setAssociated(Object by, Object with) {
481        if (contextData == null) {
482            contextData = new ConcurrentHashMap<>();
483        }
484        if (with == null) {
485            contextData.remove(by);
486        } else {
487            contextData.put(by, with);
488        }
489        return this;
490    }
491
492    @Override
493    @SuppressWarnings("PMD.ShortVariable")
494    public <V> Optional<V> associated(Object by, Class<V> type) {
495        if (contextData == null) {
496            return Optional.empty();
497        }
498        return Optional.ofNullable(contextData.get(by))
499            .filter(found -> type.isAssignableFrom(found.getClass()))
500            .map(match -> type.cast(match));
501    }
502
503    /*
504     * (non-Javadoc)
505     * 
506     * @see java.lang.Object#toString()
507     */
508    @Override
509    public String toString() {
510        StringBuilder builder = new StringBuilder();
511        builder.append(Components.objectName(this))
512            .append(" [");
513        if (channels != null) {
514            builder.append("channels=");
515            builder.append(Channel.toString(channels));
516        }
517        builder.append(']');
518        return builder.toString();
519    }
520
521}