001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.process; 020 021import java.io.FileDescriptor; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.logging.Level; 031import org.jgrapes.core.Channel; 032import org.jgrapes.core.Component; 033import org.jgrapes.core.Components; 034import org.jgrapes.core.Event; 035import org.jgrapes.core.EventPipeline; 036import org.jgrapes.core.Manager; 037import org.jgrapes.core.annotation.Handler; 038import org.jgrapes.core.events.Stop; 039import org.jgrapes.io.IOSubchannel; 040import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 041import org.jgrapes.io.events.Close; 042import org.jgrapes.io.events.Closed; 043import org.jgrapes.io.events.Input; 044import org.jgrapes.io.events.Opening; 045import org.jgrapes.io.events.Output; 046import org.jgrapes.io.events.ProcessExited; 047import org.jgrapes.io.events.ProcessStarted; 048import org.jgrapes.io.events.StartProcess; 049import org.jgrapes.io.events.StartProcessError; 050import org.jgrapes.io.util.InputStreamPipeline; 051import org.jgrapes.io.util.ManagedBuffer; 052import org.jgrapes.io.util.ManagedBufferPool; 053 054/** 055 * Provides a component that executes processes. A process is started 056 * by firing a {@link StartProcess} event. In response, the 057 * {@link ProcessManager} starts the process and creates a 058 * {@link ProcessChannel} (i.e. an {@link IOSubchannel) for communication 059 * with the process. It fires an {@link Opening} and {@link ProcessStarted} 060 * event on the newly created channel. 061 * 062 * Data may be sent to the process's stdin by firing {@link Output} 063 * events on the {@link ProcessChannel}. As usual, these events should 064 * be fired using the channels {@link IOSubchannel#responsePipeline() 065 * response pipeline}. Data generated by the process is provided by 066 * {@link Input} events. In order to distinguish between stdout and stderr, 067 * the events have an association with class {@link FileDescriptor} as 068 * key and an associated value of 1 (stdout) or 2 (stderr). 069 * 070 * When the process terminated, three {@link Closed} events are fired on 071 * the {@link ProcessChannel} one each for stdout and stderr (with the 072 * same association as was used for the {@link Input} events) and a 073 * as third event a {@link ProcessExited} (specialized {@link Closed}) 074 * with the process's exit value. Note that the sequence in which these 075 * events are sent is undefined. 076 */ 077@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 078public class ProcessManager extends Component { 079 080 private ExecutorService executorService 081 = Components.defaultExecutorService(); 082 private final Set<ProcessChannel> channels 083 = Collections.synchronizedSet(new HashSet<>()); 084 085 /** 086 * Creates a new connector, using itself as component channel. 087 */ 088 public ProcessManager() { 089 this(Channel.SELF); 090 } 091 092 /** 093 * Create a new instance using the given channel. 094 * 095 * @param componentChannel the component channel 096 */ 097 public ProcessManager(Channel componentChannel) { 098 super(componentChannel); 099 } 100 101 /** 102 * Sets an executor service to be used by the event pipelines 103 * that forward data from the process. 104 * 105 * @param executorService the executorService to set 106 * @return the process manager for easy chaining 107 * @see Manager#newEventPipeline(ExecutorService) 108 */ 109 public ProcessManager setExecutorService(ExecutorService executorService) { 110 this.executorService = executorService; 111 return this; 112 } 113 114 /** 115 * Start a new process using the data from the event. 116 * 117 * @param event the event 118 */ 119 @Handler 120 public void onStartProcess(StartProcess event) { 121 var pbd = new ProcessBuilder(event.command()); 122 if (event.directory() != null) { 123 pbd.directory(event.directory()); 124 } 125 if (event.environment() != null) { 126 Map<String, String> env = pbd.environment(); 127 for (var entry : event.environment().entrySet()) { 128 if (entry.getValue() == null) { 129 env.remove(entry.getValue()); 130 continue; 131 } 132 env.put(entry.getKey(), entry.getValue()); 133 } 134 } 135 try { 136 Process proc; 137 new ProcessChannel(event, proc = pbd.start()); 138 logger.fine(() -> "Started process pid=" + proc.toHandle().pid()); 139 } catch (IOException e) { 140 fire(new StartProcessError(event, "Failed to start process.", e)); 141 } 142 } 143 144 /** 145 * Writes the data passed in the event. 146 * 147 * The end of record flag is used to determine if a channel is 148 * eligible for purging. If the flag is set and all output has 149 * been processed, the channel is purgeable until input is 150 * received or another output event causes the state to be 151 * reevaluated. 152 * 153 * @param event the event 154 * @param channel the channel 155 * @throws InterruptedException the interrupted exception 156 * @throws IOException 157 */ 158 @Handler 159 public void onOutput(Output<ByteBuffer> event, 160 ProcessChannel channel) throws InterruptedException, IOException { 161 if (channels.contains(channel)) { 162 channel.write(event); 163 } 164 } 165 166 /** 167 * Closes the output to the process (the process's stdin). 168 * 169 * If the event has an association with key {@link Process}, 170 * the event additionally causes the process to be "closed", 171 * i.e. to be terminated (see {@link ProcessHandle#destroy}). 172 * 173 * @param event the event 174 * @throws IOException if an I/O exception occurred 175 * @throws InterruptedException if the execution was interrupted 176 */ 177 @Handler 178 public void onClose(Close event) { 179 for (Channel channel : event.channels()) { 180 if (channel instanceof ProcessChannel 181 && channels.contains(channel)) { 182 ((ProcessChannel) channel).close(event); 183 } 184 } 185 } 186 187 /** 188 * Stop all running processes. 189 * 190 * @param event 191 */ 192 @Handler 193 public void onStop(Stop event) { 194 Set<ProcessChannel> copy; 195 synchronized (channels) { 196 copy = new HashSet<>(channels); 197 } 198 for (var channel : copy) { 199 channel.doClose(true); 200 } 201 } 202 203 /** 204 * Handles closed events from stdout and stderr. 205 * 206 * @param event the event 207 * @throws IOException if an I/O exception occurred 208 * @throws InterruptedException if the execution was interrupted 209 */ 210 @Handler(priority = -100) 211 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 212 public void onClosed(Closed<?> event) 213 throws IOException, InterruptedException { 214 for (Channel channel : event.channels()) { 215 if (channel instanceof ProcessChannel 216 && channels.contains(channel)) { 217 ((ProcessChannel) channel).closed(event); 218 } 219 } 220 } 221 222 /** 223 * The Class ProcessChannel. 224 */ 225 public final class ProcessChannel extends DefaultIOSubchannel { 226 227 private final StartProcess startEvent; 228 private final Process process; 229 private final EventPipeline downPipeline; 230 private boolean running = true; 231 private final AtomicBoolean closing = new AtomicBoolean(); 232 private final AtomicBoolean terminating = new AtomicBoolean(); 233 private boolean outOpen; 234 private boolean errOpen; 235 236 /** 237 * Instantiates a new process channel. 238 * 239 * @param startEvent the start event 240 * @param process the process 241 */ 242 private ProcessChannel(StartProcess startEvent, Process process) { 243 super(channel(), newEventPipeline()); 244 this.startEvent = startEvent; 245 this.process = process; 246 247 // Register 248 synchronized (ProcessManager.this) { 249 if (channels.isEmpty()) { 250 registerAsGenerator(); 251 } 252 channels.add(this); 253 } 254 255 // Using the channel for two streams requires more buffers. 256 setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new, 257 () -> { 258 return ByteBuffer.allocate(4096); 259 }, 4).setName(Components.objectName(this) 260 + ".upstream.byteBuffers")); 261 262 if (executorService == null) { 263 downPipeline = newEventPipeline(); 264 } else { 265 downPipeline = newEventPipeline(executorService); 266 } 267 268 // (1) Opening, (2) ProcessStarted(Opened), (3) process I/O 269 downPipeline().fire(Event.onCompletion(new Opening<Void>(), 270 o -> downPipeline().fire(Event.onCompletion( 271 new ProcessStarted(startEvent), s -> startIO()), this)), 272 this); 273 } 274 275 /** 276 * Write the given data to the process (to its stdin). 277 * 278 * @param event the event 279 * @throws IOException Signals that an I/O exception has occurred. 280 */ 281 private void write(Output<ByteBuffer> event) throws IOException { 282 var source = event.buffer().backingBuffer(); 283 process.getOutputStream().write(source.array(), source.position(), 284 source.remaining()); 285 } 286 287 private void startIO() { 288 // Regrettably, the streams cannot be used with nio select. 289 outOpen = true; 290 executorService.submit( 291 new InputStreamPipeline(process.getInputStream(), this, 292 downPipeline()).sendInputEvents().setEventAssociations( 293 Map.of(FileDescriptor.class, 1))); 294 errOpen = true; 295 executorService.submit( 296 new InputStreamPipeline(process.getErrorStream(), this, 297 downPipeline()).sendInputEvents().setEventAssociations( 298 Map.of(FileDescriptor.class, 2))); 299 process.onExit().thenAccept(p -> { 300 logger.fine(() -> "Process pid=" + p.toHandle().pid() 301 + " has exited with: " + p.exitValue()); 302 downPipeline() 303 .fire(new ProcessExited(startEvent, p.exitValue()), this); 304 running = false; 305 maybeUnregister(); 306 }); 307 } 308 309 /** 310 * Close the stream to the process (its stdin) and optionally 311 * terminates the process. 312 * 313 * @param event the event 314 * @throws IOException Signals that an I/O exception has occurred. 315 */ 316 private void close(Close event) { 317 doClose(event.associated(Process.class, Object.class).isPresent()); 318 } 319 320 private void doClose(boolean terminate) { 321 if (!closing.getAndSet(true)) { 322 try { 323 process.getOutputStream().close(); 324 } catch (IOException e) { 325 // Just trying to be nice 326 logger.log(Level.FINE, e, () -> "Failed to close pipe" 327 + " to process (ignored): " + e.getMessage()); 328 } 329 } 330 if (terminate && !terminating.getAndSet(true)) { 331 process.toHandle().destroy(); 332 } 333 } 334 335 /** 336 * Handles closed events from the process's output stream. 337 * 338 * @param event the event 339 */ 340 private void closed(Closed<?> event) { 341 switch (event.associated(FileDescriptor.class, Integer.class) 342 .orElse(-1)) { 343 case 1: 344 outOpen = false; 345 break; 346 case 2: 347 errOpen = false; 348 break; 349 default: 350 return; 351 } 352 maybeUnregister(); 353 } 354 355 private void maybeUnregister() { 356 if (!running && !outOpen && !errOpen) { 357 synchronized (channels) { 358 channels.remove(this); 359 if (channels.isEmpty()) { 360 unregisterAsGenerator(); 361 } 362 } 363 } 364 } 365 366 /** 367 * Return the event that caused this channel to be created. 368 * 369 * @return the start event 370 */ 371 public StartProcess startEvent() { 372 return startEvent; 373 } 374 375 /** 376 * Return the {@link Process} associated with this channel. 377 * 378 * @return the process 379 */ 380 public Process process() { 381 return process; 382 } 383 384 /** 385 * Gets the down pipeline. 386 * 387 * @return the downPipeline 388 */ 389 public EventPipeline downPipeline() { 390 return downPipeline; 391 } 392 } 393}