001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.Future; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.function.Consumer; 030import org.jgrapes.core.Associator; 031import org.jgrapes.core.Channel; 032import org.jgrapes.core.CompletionEvent; 033import org.jgrapes.core.CompletionLock; 034import org.jgrapes.core.Eligible; 035import org.jgrapes.core.Event; 036import org.jgrapes.core.EventPipeline; 037 038/** 039 * Provides the implementations of methods to class {@link Event} that 040 * need access to classes or methods that are visible in the implementation 041 * package only. The class is not intended to be used as base class 042 * for any other class. 043 * 044 * @param <T> the result type of the event. Use {@link Void} if handling 045 * the event does not produce a result 046 */ 047@SuppressWarnings("PMD.TooManyMethods") 048public abstract class EventBase<T> 049 implements Eligible, Future<T>, Associator { 050 051 /** The event that caused this event. */ 052 private EventBase<?> generatedBy; 053 /** Number of events that have to be processed until completion. 054 * This is one for the event itself and one more for each event 055 * that has this event as its cause. */ 056 private final AtomicInteger openCount = new AtomicInteger(1); 057 /** Completion locks. */ 058 private Set<CompletionLockBase> completionLocks; 059 /** Set when the event is enqueued, reset when it has been completed. */ 060 private EventProcessor processedBy; 061 /** The events to be fired upon completion. Using this attribute 062 * provides a slightly faster access than invoking 063 * {@link Event#completionEvents()}, which wraps the result in 064 * an unmodifiable set. */ 065 protected Set<Event<?>> completionEvents; 066 /** Set when the event has been completed. */ 067 protected boolean completed; 068 private boolean requiresResult; 069 /** Event is tracked by {@link VerboseHandlerReference}. */ 070 private boolean tracked = true; 071 /** Event handler to be invoked after resumeHandling. */ 072 private Iterator<HandlerReference> suspendedHandlers; 073 private Runnable whenResumed; 074 075 /** 076 * See {@link Event#channels()}. 077 * 078 * @return the channel[] 079 */ 080 public abstract Channel[] channels(); 081 082 /** 083 * See {@link Event#handled()}. 084 */ 085 protected abstract void handled(); 086 087 /** 088 * See {@link Event#isStopped()}. 089 */ 090 public abstract boolean isStopped(); 091 092 /** 093 * See {@link Event#currentResults()}. 094 */ 095 protected abstract List<T> currentResults(); 096 097 /** 098 * See {@link Event#setRequiresResult(boolean)}. 099 */ 100 protected EventBase<T> setRequiresResult(boolean value) { 101 if (requiresResult == value) { 102 return this; 103 } 104 if (value) { 105 openCount.incrementAndGet(); 106 requiresResult = true; 107 } else { 108 requiresResult = false; 109 decrementOpen(); 110 } 111 return this; 112 } 113 114 /** 115 * See {@link Event#firstResultAssigned()}. 116 */ 117 protected void firstResultAssigned() { 118 if (requiresResult) { 119 decrementOpen(); 120 } 121 } 122 123 /** 124 * Returns <code>true</code> if the event has been enqueued in a pipeline. 125 * 126 * @return the result 127 */ 128 protected boolean enqueued() { 129 return processedBy != null || completed || isCancelled(); 130 } 131 132 /** 133 * Invoked when an exception occurs while invoking a handler for an event. 134 * 135 * @param eventProcessor the manager that has invoked the handler 136 * @param throwable the exception that has been thrown by the handler 137 */ 138 protected abstract void handlingError( 139 EventPipeline eventProcessor, Throwable throwable); 140 141 /** 142 * If an event is fired while processing another event, note 143 * the event being processed. This allows us to track the cause 144 * of events to the "initial" (externally) generated event that 145 * triggered everything. 146 * 147 * @param causingEvent the causing event to set 148 */ 149 /* default */ void generatedBy(EventBase<?> causingEvent) { 150 generatedBy = causingEvent; 151 if (causingEvent != null) { 152 causingEvent.openCount.incrementAndGet(); 153 } 154 } 155 156 /** 157 * Set the processor that will (eventually) process the event. 158 * 159 * @param processor the processor 160 */ 161 /* default */ void processedBy(EventProcessor processor) { 162 this.processedBy = processor; 163 } 164 165 public Optional<EventPipeline> processedBy() { 166 return Optional.ofNullable(processedBy).map( 167 procBy -> procBy.asEventPipeline()); 168 } 169 170 /** 171 * Suspend the invocation of the remaining handlers for this event. 172 * May only be called in a handler for the event. Must be balanced 173 * by an invocation of {@link #resumeHandling()}. 174 */ 175 public void suspendHandling() { 176 suspendHandling(null); 177 } 178 179 /** 180 * Suspend the invocation of the remaining handlers for this event. 181 * May only be called in a handler for the event. Must be balanced 182 * by an invocation of {@link #resumeHandling()}. 183 * 184 * @param whenResumed some function to be executed when handling is resumed 185 */ 186 public void suspendHandling(Runnable whenResumed) { 187 if (processedBy == null) { 188 throw new IllegalStateException("May only be called from handler."); 189 } 190 this.whenResumed = whenResumed; 191 processedBy.suspendHandling(this); 192 } 193 194 /* default */ void invokeWhenResumed() { 195 if (whenResumed != null) { 196 whenResumed.run(); 197 whenResumed = null; 198 } 199 } 200 201 /** 202 * Resume the invocation of handlers for this event. 203 * 204 * @see #suspendHandling() 205 */ 206 public void resumeHandling() { 207 if (processedBy == null) { 208 throw new IllegalStateException("Lost processor."); 209 } 210 processedBy.resumeHandling(this); 211 } 212 213 /* default */ Iterator<HandlerReference> clearSuspendedHandlers() { 214 var result = suspendedHandlers; 215 suspendedHandlers = null; 216 return result; 217 } 218 219 /* default */ void setSuspendedHandlers( 220 Iterator<HandlerReference> suspendedHandlers) { 221 this.suspendedHandlers = suspendedHandlers; 222 } 223 224 /** 225 * @param pipeline 226 */ 227 @SuppressWarnings("PMD.CognitiveComplexity") 228 /* default */ void decrementOpen() { 229 if (openCount.decrementAndGet() == 0 && !completed) { 230 synchronized (this) { 231 completed = true; 232 notifyAll(); 233 } 234 if (completionEvents != null && !isCancelled()) { 235 processedBy.updateNewEventsParent(generatedBy); 236 for (Event<?> e : completionEvents) { 237 Channel[] completeChannels = e.channels(); 238 if (completeChannels.length == 0) { 239 // Note that channels() cannot be empty, as it is set 240 // when firing the event and an event is never fired 241 // on no channels. 242 completeChannels = channels(); 243 e.setChannels(completeChannels); 244 } 245 processedBy.add(e, completeChannels); 246 } 247 } 248 if (generatedBy != null) { 249 generatedBy.decrementOpen(); 250 } 251 processedBy = null; // No longer needed 252 } 253 } 254 255 /** 256 * Adds the given completion lock. 257 * 258 * @param lock the lock 259 * @see CompletionLock 260 */ 261 /* default */ Event<T> addCompletionLock(CompletionLockBase lock) { 262 synchronized (this) { 263 if (completionLocks == null) { 264 completionLocks = Collections.synchronizedSet(new HashSet<>()); 265 } 266 } 267 if (completionLocks.add(lock)) { 268 openCount.incrementAndGet(); 269 lock.startTimer(); 270 } 271 return (Event<T>) this; 272 } 273 274 /** 275 * Removes the given completion lock. 276 * 277 * @param lock the lock 278 * @see CompletionLock 279 */ 280 /* default */ void removeCompletionLock(CompletionLockBase lock) { 281 if (completionLocks == null) { 282 return; 283 } 284 if (completionLocks.remove(lock)) { 285 decrementOpen(); 286 } 287 lock.cancelTimer(); 288 } 289 290 /** 291 * Disables tracking for this event and all events generated 292 * when handling it. 293 */ 294 public Event<T> disableTracking() { 295 tracked = false; 296 return (Event<T>) this; 297 } 298 299 /** 300 * Whether the event (and all events generated when handling it) 301 * is tracked. 302 * 303 * @return `true` if event is tracked 304 */ 305 public boolean isTracked() { 306 return tracked; 307 } 308 309 @SuppressWarnings("PMD.UselessParentheses") 310 /* default */ boolean isTrackable() { 311 return generatedBy == null ? tracked 312 : (tracked && generatedBy.isTrackable()); 313 } 314 315 /** 316 * Adds the given event to the events to be thrown when this event 317 * has completed (see {@link #isDone()}). Such an event is called 318 * a "completion event". 319 * 320 * Completion events are considered to be caused by the event that 321 * caused the completed event. If an event *e1* caused an event 322 * *e2* which has a completion event *e2c*, *e1* is only put in 323 * state completed when *e2c* has been handled. 324 * 325 * Completion events are handled by the same {@link EventProcessor} 326 * as the event that has been completed. 327 * 328 * @param completionEvent the completion event to add 329 * @return the object for easy chaining 330 * @see #onCompletion(Event, Consumer) 331 */ 332 public abstract Event<T> addCompletionEvent(Event<?> completionEvent); 333 334 /** 335 * Invokes the consumer when the event is completed. This is 336 * a shortcut for registering a {@link CompletionEvent} and 337 * providing a handler for the completion event that invokes 338 * the consumer. 339 * 340 * The static form is required because otherwise the compiler cannot 341 * infer the type of the consumer's argument. 342 * 343 * @param <T> the result type of the event 344 * @param <E> the type of the event 345 * @param event the event 346 * @param consumer the consumer 347 * @return the event 348 */ 349 public static <T, E extends Event<T>> E onCompletion(E event, 350 Consumer<E> consumer) { 351 event.addCompletionEvent(new ActionEvent<Event<T>>( 352 event.getClass().getSimpleName() + "CompletionAction") { 353 @Override 354 public void execute() throws Exception { 355 consumer.accept(event); 356 } 357 }); 358 return event; 359 } 360}