001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.core.internal;
020
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Optional;
025import java.util.Set;
026import java.util.concurrent.Future;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.jgrapes.core.Associator;
030import org.jgrapes.core.Channel;
031import org.jgrapes.core.CompletionLock;
032import org.jgrapes.core.Eligible;
033import org.jgrapes.core.Event;
034import org.jgrapes.core.EventPipeline;
035
036/**
037 * Provides the implementations of methods to class {@link Event} that
038 * need access to classes or methods that are visible in the implementation
039 * package only. The class is not intended to be used as base class
040 * for any other class.
041 * 
042 * @param <T> the result type of the event. Use {@link Void} if handling
043 * the event does not produce a result
044 */
045public abstract class EventBase<T>
046        implements Eligible, Future<T>, Associator {
047
048    /** The event that caused this event. */
049    private EventBase<?> generatedBy;
050    /** Number of events that have to be processed until completion.
051     * This is one for the event itself and one more for each event
052     * that has this event as its cause. */
053    private final AtomicInteger openCount = new AtomicInteger(1);
054    /** Completion locks. */
055    private Set<CompletionLockBase> completionLocks;
056    /** Set when the event is enqueued, reset when it has been completed. */
057    private EventProcessor processedBy;
058    /** The events to be fired upon completion. Using this attribute
059     * provides a slightly faster access than invoking
060     * {@link Event#completionEvents()}, which wraps the result in
061     * an unmodifiable set. */
062    protected Set<Event<?>> completionEvents;
063    /** Set when the event has been completed. */
064    protected boolean completed;
065    private boolean requiresResult;
066
067    /**
068     * See {@link Event#channels()}.
069     *
070     * @return the channel[]
071     */
072    public abstract Channel[] channels();
073
074    /**
075     * See {@link Event#handled()}.
076     */
077    protected abstract void handled();
078
079    /**
080     * See {@link Event#isStopped()}.
081     */
082    public abstract boolean isStopped();
083
084    /**
085     * See {@link Event#currentResults()}.
086     */
087    protected abstract List<T> currentResults();
088
089    /**
090     * See {@link Event#setRequiresResult(boolean)}.
091     */
092    protected EventBase<T> setRequiresResult(boolean value) {
093        if (requiresResult == value) {
094            return this;
095        }
096        if (value) {
097            openCount.incrementAndGet();
098            requiresResult = true;
099        } else {
100            requiresResult = false;
101            decrementOpen();
102        }
103        return this;
104    }
105
106    /**
107     * See {@link Event#firstResultAssigned()}.
108     */
109    protected void firstResultAssigned() {
110        if (requiresResult) {
111            decrementOpen();
112        }
113    }
114
115    /**
116     * Returns <code>true</code> if the event has been enqueued in a pipeline.
117     * 
118     * @return the result
119     */
120    protected boolean enqueued() {
121        return processedBy != null || completed || isCancelled();
122    }
123
124    /**
125     * Invoked when an exception occurs while invoking a handler for an event.
126     * 
127     * @param eventProcessor the manager that has invoked the handler
128     * @param throwable the exception that has been thrown by the handler
129     */
130    protected abstract void handlingError(
131            EventPipeline eventProcessor, Throwable throwable);
132
133    /**
134     * If an event is fired while processing another event, note
135     * the event being processed. This allows us to track the cause
136     * of events to the "initial" (externally) generated event that
137     * triggered everything.
138     * 
139     * @param causingEvent the causing event to set
140     */
141    /* default */ void generatedBy(EventBase<?> causingEvent) {
142        generatedBy = causingEvent;
143        if (causingEvent != null) {
144            causingEvent.openCount.incrementAndGet();
145        }
146    }
147
148    /**
149     * Set the processor that will (eventually) process the event. 
150     * 
151     * @param processor the processor
152     */
153    /* default */ void processedBy(EventProcessor processor) {
154        this.processedBy = processor;
155    }
156
157    public Optional<EventPipeline> processedBy() {
158        return Optional.ofNullable(processedBy).map(
159            procBy -> procBy.asEventPipeline());
160    }
161
162    /**
163     * @param pipeline
164     */
165    /* default */ void decrementOpen() {
166        if (openCount.decrementAndGet() == 0 && !completed) {
167            synchronized (this) {
168                completed = true;
169                notifyAll();
170            }
171            if (completionEvents != null && !isCancelled()) {
172                processedBy.updateNewEventsParent(generatedBy);
173                for (Event<?> e : completionEvents) {
174                    Channel[] completeChannels = e.channels();
175                    if (completeChannels.length == 0) {
176                        // Note that channels() cannot be empty, as it is set
177                        // when firing the event and an event is never fired
178                        // on no channels.
179                        completeChannels = channels();
180                        e.setChannels(completeChannels);
181                    }
182                    processedBy.add(e, completeChannels);
183                }
184            }
185            if (generatedBy != null) {
186                generatedBy.decrementOpen();
187            }
188            processedBy = null; // No longer needed
189        }
190    }
191
192    /**
193     * Adds the given completion lock. 
194     * 
195     * @param lock the lock
196     * @see CompletionLock
197     */
198    /* default */ Event<T> addCompletionLock(CompletionLockBase lock) {
199        synchronized (this) {
200            if (completionLocks == null) {
201                completionLocks = Collections.synchronizedSet(new HashSet<>());
202            }
203        }
204        if (completionLocks.add(lock)) {
205            openCount.incrementAndGet();
206            lock.startTimer();
207        }
208        return (Event<T>) this;
209    }
210
211    /**
212     * Removes the given completion lock. 
213     * 
214     * @param lock the lock
215     * @see CompletionLock
216     */
217    /* default */ void removeCompletionLock(CompletionLockBase lock) {
218        if (completionLocks == null) {
219            return;
220        }
221        if (completionLocks.remove(lock)) {
222            decrementOpen();
223        }
224        lock.cancelTimer();
225    }
226
227}