001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License 
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.process;
020
021import java.io.FileDescriptor;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.logging.Level;
031import org.jgrapes.core.Channel;
032import org.jgrapes.core.Component;
033import org.jgrapes.core.Components;
034import org.jgrapes.core.Event;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.core.Manager;
037import org.jgrapes.core.annotation.Handler;
038import org.jgrapes.core.events.Stop;
039import org.jgrapes.io.IOSubchannel;
040import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
041import org.jgrapes.io.events.Close;
042import org.jgrapes.io.events.Closed;
043import org.jgrapes.io.events.Input;
044import org.jgrapes.io.events.Opening;
045import org.jgrapes.io.events.Output;
046import org.jgrapes.io.events.ProcessExited;
047import org.jgrapes.io.events.ProcessStarted;
048import org.jgrapes.io.events.StartProcess;
049import org.jgrapes.io.events.StartProcessError;
050import org.jgrapes.io.util.InputStreamPipeline;
051import org.jgrapes.io.util.ManagedBuffer;
052import org.jgrapes.io.util.ManagedBufferPool;
053
054/**
055 * Provides a component that executes processes. A process is started
056 * by firing a {@link StartProcess} event. In response, the
057 * {@link ProcessManager} starts the process and creates a 
058 * {@link ProcessChannel} (i.e. an {@link IOSubchannel) for communication
059 * with the process. It fires an {@link Opening} and {@link ProcessStarted} 
060 * event on the newly created channel.
061 * 
062 * Data may be sent to the process's stdin by firing {@link Output}
063 * events on the {@link ProcessChannel}. As usual, these events should
064 * be fired using the channels {@link IOSubchannel#responsePipeline()
065 * response pipeline}. Data generated by the process is provided by
066 * {@link Input} events. In order to distinguish between stdout and stderr,
067 * the events have an association with class {@link FileDescriptor} as
068 * key and an associated value of 1 (stdout) or 2 (stderr).
069 * 
070 * When the process terminated, three {@link Closed} events are fired on
071 * the {@link ProcessChannel} one each for stdout and stderr (with the
072 * same association as was used for the {@link Input} events) and a 
073 * as third event a {@link ProcessExited} (specialized {@link Closed})
074 * with the process's exit value. Note that the sequence in which these
075 * events are sent is undefined.
076 */
077@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
078public class ProcessManager extends Component {
079
080    private ExecutorService executorService
081        = Components.defaultExecutorService();
082    private final Set<ProcessChannel> channels
083        = Collections.synchronizedSet(new HashSet<>());
084
085    /**
086     * Creates a new connector, using itself as component channel. 
087     */
088    public ProcessManager() {
089        this(Channel.SELF);
090    }
091
092    /**
093     * Create a new instance using the given channel.
094     *
095     * @param componentChannel the component channel
096     */
097    public ProcessManager(Channel componentChannel) {
098        super(componentChannel);
099    }
100
101    /**
102     * Sets an executor service to be used by the event pipelines
103     * that forward data from the process.
104     * 
105     * @param executorService the executorService to set
106     * @return the process manager for easy chaining
107     * @see Manager#newEventPipeline(ExecutorService)
108     */
109    public ProcessManager setExecutorService(ExecutorService executorService) {
110        this.executorService = executorService;
111        return this;
112    }
113
114    /**
115     * Start a new process using the data from the event.
116     *
117     * @param event the event
118     */
119    @Handler
120    public void onStartProcess(StartProcess event) {
121        var pbd = new ProcessBuilder(event.command());
122        if (event.directory() != null) {
123            pbd.directory(event.directory());
124        }
125        if (event.environment() != null) {
126            Map<String, String> env = pbd.environment();
127            for (var entry : event.environment().entrySet()) {
128                if (entry.getValue() == null) {
129                    env.remove(entry.getValue());
130                    continue;
131                }
132                env.put(entry.getKey(), entry.getValue());
133            }
134        }
135        try {
136            Process proc;
137            new ProcessChannel(event, proc = pbd.start());
138            logger.fine(() -> "Started process pid=" + proc.toHandle().pid());
139        } catch (IOException e) {
140            fire(new StartProcessError(event, "Failed to start process.", e));
141        }
142    }
143
144    /**
145     * Writes the data passed in the event. 
146     * 
147     * The end of record flag is used to determine if a channel is 
148     * eligible for purging. If the flag is set and all output has 
149     * been processed, the channel is purgeable until input is 
150     * received or another output event causes the state to be 
151     * reevaluated. 
152     *
153     * @param event the event
154     * @param channel the channel
155     * @throws InterruptedException the interrupted exception
156     * @throws IOException 
157     */
158    @Handler
159    public void onOutput(Output<ByteBuffer> event,
160            ProcessChannel channel) throws InterruptedException, IOException {
161        if (channels.contains(channel)) {
162            channel.write(event);
163        }
164    }
165
166    /**
167     * Closes the output to the process (the process's stdin).
168     * 
169     * If the event has an association with key {@link Process},
170     * the event additionally causes the process to be "closed",
171     * i.e. to be terminated (see {@link ProcessHandle#destroy}).
172     *
173     * @param event the event
174     * @throws IOException if an I/O exception occurred
175     * @throws InterruptedException if the execution was interrupted
176     */
177    @Handler
178    public void onClose(Close event) {
179        for (Channel channel : event.channels()) {
180            if (channel instanceof ProcessChannel
181                && channels.contains(channel)) {
182                ((ProcessChannel) channel).close(event);
183            }
184        }
185    }
186
187    /**
188     * Stop all running processes.
189     * 
190     * @param event
191     */
192    @Handler
193    public void onStop(Stop event) {
194        Set<ProcessChannel> copy;
195        synchronized (channels) {
196            copy = new HashSet<>(channels);
197        }
198        for (var channel : copy) {
199            channel.doClose(true);
200        }
201    }
202
203    /**
204     * Handles closed events from stdout and stderr.
205     *
206     * @param event the event
207     * @throws IOException if an I/O exception occurred
208     * @throws InterruptedException if the execution was interrupted
209     */
210    @Handler(priority = -100)
211    @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
212    public void onClosed(Closed<?> event)
213            throws IOException, InterruptedException {
214        for (Channel channel : event.channels()) {
215            if (channel instanceof ProcessChannel
216                && channels.contains(channel)) {
217                ((ProcessChannel) channel).closed(event);
218            }
219        }
220    }
221
222    /**
223     * The Class ProcessChannel.
224     */
225    public final class ProcessChannel extends DefaultIOSubchannel {
226
227        private final StartProcess startEvent;
228        private final Process process;
229        private final EventPipeline downPipeline;
230        private boolean running = true;
231        private final AtomicBoolean closing = new AtomicBoolean();
232        private final AtomicBoolean terminating = new AtomicBoolean();
233        private boolean outOpen;
234        private boolean errOpen;
235
236        /**
237         * Instantiates a new process channel.
238         *
239         * @param startEvent the start event
240         * @param process the process
241         */
242        private ProcessChannel(StartProcess startEvent, Process process) {
243            super(channel(), newEventPipeline());
244            this.startEvent = startEvent;
245            this.process = process;
246
247            // Register
248            synchronized (ProcessManager.this) {
249                if (channels.isEmpty()) {
250                    registerAsGenerator();
251                }
252                channels.add(this);
253            }
254
255            // Using the channel for two streams requires more buffers.
256            setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new,
257                () -> {
258                    return ByteBuffer.allocate(4096);
259                }, 4).setName(Components.objectName(this)
260                    + ".upstream.byteBuffers"));
261
262            if (executorService == null) {
263                downPipeline = newEventPipeline();
264            } else {
265                downPipeline = newEventPipeline(executorService);
266            }
267
268            // (1) Opening, (2) ProcessStarted(Opened), (3) process I/O
269            downPipeline().fire(Event.onCompletion(new Opening<Void>(),
270                o -> downPipeline().fire(Event.onCompletion(
271                    new ProcessStarted(startEvent), s -> startIO()), this)),
272                this);
273        }
274
275        /**
276         * Write the given data to the process (to its stdin).
277         *
278         * @param event the event
279         * @throws IOException Signals that an I/O exception has occurred.
280         */
281        private void write(Output<ByteBuffer> event) throws IOException {
282            var source = event.buffer().backingBuffer();
283            process.getOutputStream().write(source.array(), source.position(),
284                source.remaining());
285        }
286
287        private void startIO() {
288            // Regrettably, the streams cannot be used with nio select.
289            outOpen = true;
290            executorService.submit(
291                new InputStreamPipeline(process.getInputStream(), this,
292                    downPipeline()).sendInputEvents().setEventAssociations(
293                        Map.of(FileDescriptor.class, 1)));
294            errOpen = true;
295            executorService.submit(
296                new InputStreamPipeline(process.getErrorStream(), this,
297                    downPipeline()).sendInputEvents().setEventAssociations(
298                        Map.of(FileDescriptor.class, 2)));
299            process.onExit().thenAccept(p -> {
300                logger.fine(() -> "Process pid=" + p.toHandle().pid()
301                    + " has exited with: " + p.exitValue());
302                downPipeline()
303                    .fire(new ProcessExited(startEvent, p.exitValue()), this);
304                running = false;
305                maybeUnregister();
306            });
307        }
308
309        /**
310         * Close the stream to the process (its stdin) and optionally
311         * terminates the process.
312         *
313         * @param event the event
314         * @throws IOException Signals that an I/O exception has occurred.
315         */
316        private void close(Close event) {
317            doClose(event.associated(Process.class, Object.class).isPresent());
318        }
319
320        private void doClose(boolean terminate) {
321            if (!closing.getAndSet(true)) {
322                try {
323                    process.getOutputStream().close();
324                } catch (IOException e) {
325                    // Just trying to be nice
326                    logger.log(Level.FINE, e, () -> "Failed to close pipe"
327                        + " to process (ignored): " + e.getMessage());
328                }
329            }
330            if (terminate && !terminating.getAndSet(true)) {
331                process.toHandle().destroy();
332            }
333        }
334
335        /**
336         * Handles closed events from the process's output stream. 
337         *
338         * @param event the event
339         */
340        private void closed(Closed<?> event) {
341            switch (event.associated(FileDescriptor.class, Integer.class)
342                .orElse(-1)) {
343            case 1:
344                outOpen = false;
345                break;
346            case 2:
347                errOpen = false;
348                break;
349            default:
350                return;
351            }
352            maybeUnregister();
353        }
354
355        private void maybeUnregister() {
356            if (!running && !outOpen && !errOpen) {
357                synchronized (channels) {
358                    channels.remove(this);
359                    if (channels.isEmpty()) {
360                        unregisterAsGenerator();
361                    }
362                }
363            }
364        }
365
366        /**
367         * Return the event that caused this channel to be created.
368         *
369         * @return the start event
370         */
371        public StartProcess startEvent() {
372            return startEvent;
373        }
374
375        /**
376         * Return the {@link Process} associated with this channel.
377         *
378         * @return the process
379         */
380        public Process process() {
381            return process;
382        }
383
384        /**
385         * Gets the down pipeline.
386         *
387         * @return the downPipeline
388         */
389        public EventPipeline downPipeline() {
390            return downPipeline;
391        }
392    }
393}