001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.core.internal; 020 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Queue; 024import java.util.Set; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import java.util.concurrent.ExecutorService; 028import org.jgrapes.core.Channel; 029import org.jgrapes.core.Components; 030import org.jgrapes.core.Event; 031import org.jgrapes.core.EventPipeline; 032 033/** 034 * This class provides the default implementation of an {@link EventPipeline}. 035 */ 036@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 037public class EventProcessor implements InternalEventPipeline, Runnable { 038 039 @SuppressWarnings("PMD.FieldNamingConventions") 040 protected static final ThreadLocal<EventBase<?>> newEventsParent 041 = new ThreadLocal<>(); 042 043 private final ExecutorService executorService; 044 private final ComponentTree componentTree; 045 private final EventPipeline asEventPipeline; 046 // Must not use synchronized in toString, leads to unexpected deadlock 047 protected final Queue<EventChannelsTuple> queue 048 = new ConcurrentLinkedQueue<>(); 049 private Iterator<HandlerReference> invoking; 050 // Used by this thread only. 051 private final Set<EventBase<?>> suspended = new HashSet<>(); 052 // Only this thread can remove, but others might add. 053 private final Queue<EventBase<?>> toBeResumed 054 = new ConcurrentLinkedDeque<>(); 055 private boolean isExecuting; 056 private final ThreadLocal<Thread> executor = new ThreadLocal<>(); 057 058 /** 059 * Instantiates a new event processor. 060 * 061 * @param tree the tree 062 */ 063 /* default */ EventProcessor(ComponentTree tree) { 064 this(tree, Components.defaultExecutorService()); 065 } 066 067 /* default */ EventProcessor(ComponentTree tree, 068 ExecutorService executorService) { 069 this.componentTree = tree; 070 this.executorService = executorService; 071 asEventPipeline = new CheckingPipelineFilter(tree, this); 072 } 073 074 /** 075 * Gets the component tree. 076 * 077 * @return the component tree 078 */ 079 protected ComponentTree tree() { 080 return componentTree; 081 } 082 083 /* default */ EventPipeline asEventPipeline() { 084 return asEventPipeline; 085 } 086 087 /** 088 * Called before adding completion events. The parent of 089 * a completion event is not the event that has completed but 090 * the event that generated the original event. 091 * 092 * @param parent the new parent 093 */ 094 /* default */ void updateNewEventsParent(EventBase<?> parent) { 095 newEventsParent.set(parent); 096 } 097 098 @Override 099 public <T extends Event<?>> T add(T event, Channel... channels) { 100 ((EventBase<?>) event).generatedBy(newEventsParent.get()); 101 ((EventBase<?>) event).processedBy(this); 102 synchronized (this) { 103 EventChannelsTuple.addTo(queue, event, channels); 104 if (!isExecuting) { 105 // Queue was initially empty, this starts it 106 GeneratorRegistry.instance().add(this); 107 isExecuting = true; 108 executorService.execute(this); 109 } 110 } 111 return event; 112 } 113 114 @SuppressWarnings("PMD.ConfusingTernary") 115 /* default */ void add(Queue<EventChannelsTuple> source) { 116 synchronized (this) { 117 while (true) { 118 EventChannelsTuple entry = source.poll(); 119 if (entry == null) { 120 break; 121 } 122 entry.event.processedBy(this); 123 queue.add(entry); 124 } 125 if (!isExecuting) { 126 GeneratorRegistry.instance().add(this); 127 isExecuting = true; 128 executorService.execute(this); 129 } 130 } 131 } 132 133 @Override 134 public void merge(InternalEventPipeline other) { 135 if (!(other instanceof BufferingEventPipeline)) { 136 throw new IllegalArgumentException( 137 "Can only merge events from an BufferingEventPipeline."); 138 } 139 add(((BufferingEventPipeline) other).retrieveEvents()); 140 } 141 142 @Override 143 @SuppressWarnings({ "PMD.AvoidDeeplyNestedIfStmts", 144 "PMD.CognitiveComplexity" }) 145 public void run() { 146 String origName = Thread.currentThread().getName(); 147 try { 148 Thread.currentThread().setName( 149 origName + " (P" + Components.objectId(this) + ")"); 150 executor.set(Thread.currentThread()); 151 componentTree.setDispatchingPipeline(this); 152 while (true) { 153 // No lock needed, only this thread can remove from resumed 154 var resumedEvent = toBeResumed.poll(); 155 if (resumedEvent != null) { 156 if (suspended.remove(resumedEvent)) { 157 resumedEvent.invokeWhenResumed(); 158 invokeHandlers(resumedEvent.clearSuspendedHandlers(), 159 resumedEvent); 160 } 161 continue; 162 } 163 164 EventChannelsTuple next; 165 synchronized (this) { 166 next = queue.poll(); 167 if (next == null) { 168 // Everything is done, though suspended handlers 169 // may cause this processor to be reactivated. 170 GeneratorRegistry.instance().remove(this); 171 isExecuting = false; 172 synchronized (executor) { 173 executor.notifyAll(); 174 } 175 break; 176 } 177 } 178 HandlerList handlers 179 = componentTree.getEventHandlers(next.event, next.channels); 180 invokeHandlers(handlers.iterator(), next.event); 181 } 182 } finally { 183 // This processor should now only be (strongly) referenced 184 // from suspended events (if any exist), the 185 // CheckingPipelineFilter (which is only referenced from this) 186 // and some component tree, if this is the tree's default 187 // processor. 188 newEventsParent.set(null); 189 componentTree.setDispatchingPipeline(null); 190 executor.set(null); 191 Thread.currentThread().setName(origName); 192 } 193 } 194 195 /** 196 * Invoke all (remaining) handlers with the given event as parameter. 197 * 198 * @param handlers the handlers 199 * @param event the event 200 */ 201 private void invokeHandlers(Iterator<HandlerReference> handlers, 202 EventBase<?> event) { 203 try { 204 invoking = handlers; 205 newEventsParent.set(event); 206 // invoking may be set to null by suspendHandling() 207 while (invoking != null && invoking.hasNext()) { 208 HandlerReference hdlr = invoking.next(); 209 try { 210 if (event.isStopped()) { 211 break; 212 } 213 hdlr.invoke(event); 214 } catch (AssertionError t) { 215 // JUnit support 216 CoreUtils.setAssertionError(t); 217 event.handlingError(asEventPipeline, t); 218 } catch (Error e) { // NOPMD 219 // Wouldn't have caught it, if it was possible. 220 throw e; 221 } catch (Throwable t) { // NOPMD 222 // Errors have been rethrown, so this should work. 223 event.handlingError(asEventPipeline, t); 224 } 225 } 226 } catch (AssertionError t) { 227 // JUnit support 228 CoreUtils.setAssertionError(t); 229 event.handlingError(asEventPipeline, t); 230 } catch (Error e) { // NOPMD 231 // Wouldn't have caught it, if it was possible. 232 throw e; 233 } catch (Throwable t) { // NOPMD 234 // Errors have been rethrown, so this should work. 235 event.handlingError(asEventPipeline, t); 236 } finally { // NOPMD 237 if (invoking != null) { 238 event.handled(); 239 invoking = null; 240 newEventsParent.get().decrementOpen(); 241 } 242 } 243 } 244 245 @SuppressWarnings("PMD.CompareObjectsWithEquals") 246 /* default */ void suspendHandling(EventBase<?> event) { 247 if (Thread.currentThread() != executor.get()) { 248 throw new IllegalStateException("May only be called from handler."); 249 } 250 if (!invoking.hasNext()) { 251 // Last anyway, nothing to be done 252 return; 253 } 254 event.setSuspendedHandlers(invoking); 255 invoking = null; 256 suspended.add(event); 257 // Just in case (might happen) 258 toBeResumed.remove(event); 259 } 260 261 /* default */ void resumeHandling(EventBase<?> event) { 262 toBeResumed.add(event); 263 synchronized (this) { 264 if (!isExecuting) { 265 // There were no more events, restart 266 GeneratorRegistry.instance().add(this); 267 isExecuting = true; 268 executorService.execute(this); 269 } 270 } 271 } 272 273 /* 274 * (non-Javadoc) 275 * 276 * @see org.jgrapes.core.internal.InternalEventPipeline#executorService() 277 */ 278 @Override 279 public ExecutorService executorService() { 280 return executorService; 281 } 282 283 @Override 284 public void awaitExhaustion() throws InterruptedException { 285 synchronized (executor) { 286 while (isExecuting) { 287 executor.wait(); 288 } 289 } 290 } 291 292 /* 293 * (non-Javadoc) 294 * 295 * @see java.lang.Object#toString() 296 */ 297 @Override 298 public String toString() { 299 StringBuilder builder = new StringBuilder(); 300 builder.append(Components.objectName(this)) 301 .append(" ["); 302 if (queue != null) { 303 builder.append("queue=").append(queue); 304 } 305 builder.append(']'); 306 return builder.toString(); 307 } 308 309}