001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core; 020 021import java.lang.reflect.Array; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.function.BiConsumer; 035import org.jgrapes.core.events.HandlingError; 036import org.jgrapes.core.internal.EventBase; 037 038/** 039 * This class is the base class for all events. 040 * 041 * By default (i.e. as implemented by this class), the event's kind is 042 * represented by its Java class and the eligibility is based on 043 * the "is a" relationship between classes. An event is eligible if its class 044 * is equal to or a super class of the class used as criterion. 045 * This default behavior can be changed by overriding the 046 * methods from {@link Eligible}. See {@link NamedEvent} as an example. 047 * 048 * @param <T> 049 * the result type of the event. Use {@link Void} if handling the 050 * event does not produce a result 051 */ 052@SuppressWarnings({ "PMD.GodClass", "PMD.TooManyMethods" }) 053public class Event<T> extends EventBase<T> { 054 055 /** The channels that this event is to be fired on if no 056 * channels are specified explicitly when firing. */ 057 private Channel[] channels; 058 /** Indicates that the event should not processed further. */ 059 private boolean stopped; 060 /** The results of handling the event (if any). */ 061 private List<T> results; 062 /** Context data. */ 063 private Map<Object, Object> contextData; 064 private boolean cancelled; 065 066 /** 067 * Creates a new event. Passing channels is equivalent to first 068 * creating the event and then calling {@link #setChannels(Channel...)} 069 * with the given channels. 070 * 071 * @param channels the channels to set 072 */ 073 public Event(Channel... channels) { 074 super(); 075 this.channels = Arrays.copyOf(channels, channels.length); 076 } 077 078 /** 079 * Returns the class of this event as representation of its kind. 080 * 081 * @return the class of this event 082 * 083 * @see org.jgrapes.core.Eligible#defaultCriterion() 084 */ 085 @Override 086 public Object defaultCriterion() { 087 return getClass(); 088 } 089 090 /** 091 * Returns <code>true</code> if the `criterion` 092 * is of the same class or a base class of this event's class. 093 * 094 * @see org.jgrapes.core.Eligible#isEligibleFor(java.lang.Object) 095 */ 096 @Override 097 public boolean isEligibleFor(Object criterion) { 098 return Class.class.isInstance(criterion) 099 && ((Class<?>) criterion).isAssignableFrom(getClass()); 100 } 101 102 /** 103 * Return the event pipeline that currently processes the event 104 * (if any). 105 * 106 * @return the event pipeline if the event is being processed 107 */ 108 @SuppressWarnings({ "PMD.UselessOverridingMethod", 109 "PMD.AvoidDuplicateLiterals" }) 110 @Override 111 public Optional<EventPipeline> processedBy() { 112 return super.processedBy(); 113 } 114 115 /** 116 * Implements the default behavior for handling events thrown 117 * by a handler. Fires a {@link HandlingError handling error} event 118 * for this event and the given throwable. 119 * 120 * @see HandlingError 121 */ 122 @Override 123 protected void handlingError( 124 EventPipeline eventProcessor, Throwable throwable) { 125 eventProcessor.fire( 126 new HandlingError(this, throwable), channels()); 127 } 128 129 /** 130 * Sets the channels that the event is fired on if no channels 131 * are specified explicitly when firing the event 132 * (see {@link org.jgrapes.core.Manager#fire(Event, Channel...)}). 133 * 134 * @param channels the channels to set 135 * @return the object for easy chaining 136 * 137 * @throws IllegalStateException if the method is called after 138 * this event has been fired 139 */ 140 public Event<T> setChannels(Channel... channels) { 141 if (enqueued()) { 142 throw new IllegalStateException( 143 "Channels cannot be changed after fire"); 144 } 145 this.channels = Arrays.copyOf(channels, channels.length); 146 return this; 147 } 148 149 /** 150 * Returns the channels associated with the event. Before an 151 * event has been fired, this returns the channels set with 152 * {@link #setChannels(Channel[])}. After an event has been 153 * fired, this returns the channels that the event has 154 * effectively been fired on 155 * (see {@link Manager#fire(Event, Channel...)}). 156 * 157 * @return the channels (never `null`, but may be empty) 158 */ 159 @Override 160 public Channel[] channels() { 161 return Arrays.copyOf(channels, channels.length); 162 } 163 164 /** 165 * Returns the subset of channels that are assignable to the given type. 166 * 167 * @param <C> the given type's class 168 * @param type the class to look for 169 * @return the filtered channels 170 * @see #channels() 171 */ 172 @SuppressWarnings({ "unchecked", "PMD.ShortVariable", 173 "PMD.AvoidDuplicateLiterals" }) 174 public <C> C[] channels(Class<C> type) { 175 return Arrays.stream(channels) 176 .filter(c -> type.isAssignableFrom(c.getClass())).toArray( 177 size -> (C[]) Array.newInstance(type, size)); 178 } 179 180 /** 181 * Execute the given handler for all channels of the given type. 182 * 183 * @param <E> the type of the event 184 * @param <C> the type of the channel 185 * @param type the channel type 186 * @param handler the handler 187 */ 188 @SuppressWarnings({ "unchecked", "PMD.ShortVariable" }) 189 public <E extends EventBase<?>, C extends Channel> void forChannels( 190 Class<C> type, BiConsumer<E, C> handler) { 191 Arrays.stream(channels) 192 .filter(c -> type.isAssignableFrom(c.getClass())) 193 .forEach(c -> handler.accept((E) this, (C) c)); 194 } 195 196 /** 197 * Returns the events to be thrown when this event has completed 198 * (see {@link #isDone()}). 199 * 200 * @return the completed events 201 */ 202 public Set<Event<?>> completionEvents() { 203 return completionEvents == null ? Collections.emptySet() 204 : Collections.unmodifiableSet(completionEvents); 205 } 206 207 /** 208 * {@inheritDoc} 209 */ 210 @Override 211 public Event<T> addCompletionEvent(Event<?> completionEvent) { 212 if (completionEvents == null) { 213 completionEvents = new HashSet<>(); 214 } 215 completionEvents.add(completionEvent); 216 return this; 217 } 218 219 /* 220 * (non-Javadoc) 221 * 222 * @see org.jgrapes.core.internal.EventBase#setRequiresResult(boolean) 223 */ 224 @Override 225 public Event<T> setRequiresResult(boolean value) { 226 return (Event<T>) super.setRequiresResult(value); 227 } 228 229 /** 230 * Check if this event has completed. An event is completed 231 * if 232 * * all its handlers have been invoked (or the event has 233 * been stopped or cancelled), 234 * * all events caused by it have completed, 235 * * no {@link CompletionLock}s remain, and 236 * * a result has been set (only required if 237 * {@link #setRequiresResult(boolean)} has been called with `true`). 238 * 239 * @return the completed state 240 */ 241 @Override 242 public boolean isDone() { 243 return completed; 244 } 245 246 /** 247 * Invoked after all handlers for the event have been executed. 248 * May be overridden by derived classes to cause some immediate effect 249 * (instead of e.g. waiting for the completion event). The default 250 * implementation does nothing. This method is invoked by the event 251 * handler thread and must not block. 252 */ 253 protected void handled() { 254 // Default is to do nothing. 255 } 256 257 /** 258 * {@inheritDoc} 259 */ 260 @Override 261 @SuppressWarnings("PMD.UselessOverridingMethod") 262 public void suspendHandling() { 263 super.suspendHandling(); 264 } 265 266 /** 267 * {@inheritDoc} 268 */ 269 @Override 270 @SuppressWarnings("PMD.UselessOverridingMethod") 271 public void suspendHandling(Runnable whenResumed) { 272 super.suspendHandling(whenResumed); 273 } 274 275 /** 276 * {@inheritDoc} 277 */ 278 @Override 279 @SuppressWarnings("PMD.UselessOverridingMethod") 280 public void resumeHandling() { 281 super.resumeHandling(); 282 } 283 284 /** 285 * Can be called during the execution of an event handler to indicate 286 * that the event should not be processed further. All remaining 287 * handlers for this event will be skipped. 288 * 289 * @return the object for easy chaining 290 */ 291 public Event<T> stop() { 292 stopped = true; 293 // Just in case. 294 resumeHandling(); 295 return this; 296 } 297 298 /** 299 * Returns <code>true</code> if {@link #stop} has been called. 300 * 301 * @return the stopped state 302 */ 303 public boolean isStopped() { 304 return stopped; 305 } 306 307 /** 308 * Prevents the invocation of further handlers (like {@link #stop()} 309 * and (in addition) the invocation of any added completed events. 310 * 311 * @param mayInterruptIfRunning ignored 312 * @return `false` if the event has already been completed 313 * @see java.util.concurrent.Future#cancel(boolean) 314 */ 315 @Override 316 public boolean cancel(boolean mayInterruptIfRunning) { 317 if (!completed && !cancelled) { 318 stop(); 319 cancelled = true; 320 return true; 321 } 322 return false; 323 } 324 325 @Override 326 public boolean isCancelled() { 327 return cancelled; 328 } 329 330 /** 331 * Sets the result of handling this event. If this method is invoked 332 * more then once, the various results are collected in a list. This 333 * can happen if the event is handled by several components. 334 * 335 * @param result the result to set 336 * @return the object for easy chaining 337 */ 338 public Event<T> setResult(T result) { 339 synchronized (this) { 340 if (results == null) { 341 // Make sure that we have a valid result before 342 // calling decrementOpen 343 results = new ArrayList<>(); 344 results.add(result); 345 firstResultAssigned(); 346 return this; 347 } 348 results.add(result); 349 return this; 350 } 351 } 352 353 /** 354 * Allows access to the intermediate result before the 355 * completion of the event. 356 * 357 * @return the intermediate results (which may be an empty list) 358 */ 359 protected List<T> currentResults() { 360 return results == null ? Collections.emptyList() 361 : Collections.unmodifiableList(results); 362 } 363 364 /** 365 * Tie the result of this event to the result of the other event. 366 * Changes of either event's results will subsequently be applied 367 * to both events. 368 * <P> 369 * This is useful when an event is replaced by another event during 370 * handling like: 371 * {@code fire((new Event()).tieTo(oldEvent.stop()))} 372 * 373 * @param other the event to tie to 374 * @return the object for easy chaining 375 */ 376 public Event<T> tieTo(Event<T> other) { 377 synchronized (this) { 378 if (other.results == null) { 379 other.results = new ArrayList<>(); 380 } 381 results = other.results; 382 return this; 383 } 384 } 385 386 /** 387 * Waits for the event to be completed (see {@link #isDone()}) 388 * and returns the first (or only) result. 389 * 390 * @see Future#get() 391 */ 392 @Override 393 public T get() throws InterruptedException { 394 while (true) { 395 synchronized (this) { 396 if (completed) { 397 return results == null || results.isEmpty() 398 ? null 399 : results.get(0); 400 } 401 wait(); 402 } 403 } 404 } 405 406 /** 407 * Causes the invoking thread to wait until the processing of the 408 * event has been completed (see {@link #isDone()}) or the given 409 * timeout has expired and returns the first (or only) result. 410 * 411 * @return the result 412 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 413 */ 414 @Override 415 public T get(long timeout, TimeUnit unit) 416 throws InterruptedException, TimeoutException { 417 synchronized (this) { 418 if (completed) { 419 return results == null || results.isEmpty() 420 ? null 421 : results.get(0); 422 } 423 wait(unit.toMillis(timeout)); 424 } 425 if (completed) { 426 return results == null || results.isEmpty() 427 ? null 428 : results.get(0); 429 } 430 throw new TimeoutException(); 431 } 432 433 /** 434 * Waits for the event to be completed (see {@link #isDone()}) 435 * and returns the list of results (which may be empty if the 436 * event's result type is {@link Void}). 437 * 438 * @return the results 439 * @see Future#get() 440 */ 441 public List<T> results() throws InterruptedException { 442 while (true) { 443 synchronized (this) { 444 if (completed) { 445 return results == null ? Collections.emptyList() 446 : Collections.unmodifiableList(results); 447 } 448 wait(); 449 } 450 } 451 } 452 453 /** 454 * Causes the invoking thread to wait until the processing of the 455 * event has been completed (see {@link #isDone()}) or given timeout 456 * has expired and returns the list of results (which may be empty 457 * if the event's result type is {@link Void}). 458 * 459 * @return the results 460 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 461 */ 462 public List<T> results(long timeout, TimeUnit unit) 463 throws InterruptedException, TimeoutException { 464 synchronized (this) { 465 if (completed) { 466 return results == null ? Collections.emptyList() 467 : Collections.unmodifiableList(results); 468 } 469 wait(unit.toMillis(timeout)); 470 } 471 if (completed) { 472 return results == null ? Collections.emptyList() 473 : Collections.unmodifiableList(results); 474 } 475 throw new TimeoutException(); 476 } 477 478 @Override 479 @SuppressWarnings("PMD.ShortVariable") 480 public Event<T> setAssociated(Object by, Object with) { 481 if (contextData == null) { 482 contextData = new ConcurrentHashMap<>(); 483 } 484 if (with == null) { 485 contextData.remove(by); 486 } else { 487 contextData.put(by, with); 488 } 489 return this; 490 } 491 492 @Override 493 @SuppressWarnings("PMD.ShortVariable") 494 public <V> Optional<V> associated(Object by, Class<V> type) { 495 if (contextData == null) { 496 return Optional.empty(); 497 } 498 return Optional.ofNullable(contextData.get(by)) 499 .filter(found -> type.isAssignableFrom(found.getClass())) 500 .map(match -> type.cast(match)); 501 } 502 503 /* 504 * (non-Javadoc) 505 * 506 * @see java.lang.Object#toString() 507 */ 508 @Override 509 public String toString() { 510 StringBuilder builder = new StringBuilder(); 511 builder.append(Components.objectName(this)) 512 .append(" ["); 513 if (channels != null) { 514 builder.append("channels="); 515 builder.append(Channel.toString(channels)); 516 } 517 builder.append(']'); 518 return builder.toString(); 519 } 520 521}