001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.channels.AsynchronousCloseException; 024import java.nio.channels.AsynchronousFileChannel; 025import java.nio.channels.ClosedChannelException; 026import java.nio.channels.CompletionHandler; 027import java.nio.channels.SeekableByteChannel; 028import java.nio.file.Files; 029import java.nio.file.OpenOption; 030import java.nio.file.Path; 031import java.nio.file.StandardOpenOption; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Map; 035import java.util.WeakHashMap; 036import java.util.stream.Collectors; 037import org.jgrapes.core.Channel; 038import org.jgrapes.core.Component; 039import org.jgrapes.core.Components; 040import org.jgrapes.core.Event; 041import org.jgrapes.core.annotation.Handler; 042import org.jgrapes.core.events.Stop; 043import org.jgrapes.io.events.Close; 044import org.jgrapes.io.events.Closed; 045import org.jgrapes.io.events.FileOpened; 046import org.jgrapes.io.events.IOError; 047import org.jgrapes.io.events.Input; 048import org.jgrapes.io.events.OpenFile; 049import org.jgrapes.io.events.Opening; 050import org.jgrapes.io.events.Output; 051import org.jgrapes.io.events.SaveInput; 052import org.jgrapes.io.events.SaveOutput; 053import org.jgrapes.io.events.StreamFile; 054import org.jgrapes.io.util.ManagedBuffer; 055import org.jgrapes.io.util.ManagedBufferPool; 056 057/** 058 * A component that reads from or writes to a file. 059 */ 060@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" }) 061public class FileStorage extends Component { 062 063 private int bufferSize; 064 065 @SuppressWarnings("PMD.UseConcurrentHashMap") 066 private final Map<Channel, Writer> inputWriters = Collections 067 .synchronizedMap(new WeakHashMap<>()); 068 @SuppressWarnings("PMD.UseConcurrentHashMap") 069 private final Map<Channel, Writer> outputWriters = Collections 070 .synchronizedMap(new WeakHashMap<>()); 071 072 /** 073 * Create a new instance using the given size for the read buffers. 074 * 075 * @param channel the component's channel. Used for sending {@link Output} 076 * events and receiving {@link Input} events 077 * @param bufferSize the size of the buffers used for reading 078 */ 079 public FileStorage(Channel channel, int bufferSize) { 080 super(channel); 081 this.bufferSize = bufferSize; 082 } 083 084 /** 085 * Create a new instance using the default buffer size of 8192. 086 * 087 * @param channel the component's channel. Used for sending {@link Output} 088 * events and receiving {@link Input} events 089 */ 090 public FileStorage(Channel channel) { 091 this(channel, 8192); 092 } 093 094 /** 095 * Opens a file for reading using the properties of the event and streams 096 * its content as a sequence of {@link Output} events with the 097 * end of record flag set in the last event. All generated events are 098 * considered responses to this event and therefore fired using the event 099 * processor from the event's I/O subchannel. 100 * 101 * @param event the event 102 * @throws InterruptedException if the execution was interrupted 103 */ 104 @Handler 105 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 106 "PMD.AccessorClassGeneration", "PMD.AvoidDuplicateLiterals" }) 107 public void onStreamFile(StreamFile event) 108 throws InterruptedException { 109 if (Arrays.asList(event.options()) 110 .contains(StandardOpenOption.WRITE)) { 111 throw new IllegalArgumentException( 112 "Cannot stream file opened for writing."); 113 } 114 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 115 if (inputWriters.containsKey(channel)) { 116 channel.respond(new IOError(event, 117 new IllegalStateException("File is already open."))); 118 } else { 119 new FileStreamer(event, channel); 120 } 121 } 122 } 123 124 /** 125 * A file streamer. 126 */ 127 private final class FileStreamer { 128 129 private final IOSubchannel channel; 130 private final Path path; 131 @SuppressWarnings("PMD.ImmutableField") 132 private AsynchronousFileChannel ioChannel; 133 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, 134 ByteBuffer> ioBuffers; 135 private long offset; 136 private final CompletionHandler<Integer, 137 ManagedBuffer<ByteBuffer>> readCompletionHandler 138 = new ReadCompletionHandler(); 139 140 private FileStreamer(StreamFile event, IOSubchannel channel) 141 throws InterruptedException { 142 this.channel = channel; 143 path = event.path(); 144 offset = 0; 145 try { 146 try { 147 ioChannel = AsynchronousFileChannel 148 .open(event.path(), event.options()); 149 } catch (UnsupportedOperationException e) { 150 runReaderThread(event); 151 return; 152 } 153 } catch (IOException e) { 154 channel.respond(new IOError(event, e)); 155 return; 156 } 157 registerAsGenerator(); 158 // Reading from file 159 ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 160 () -> { 161 return ByteBuffer.allocateDirect(bufferSize); 162 }, 2); 163 ManagedBuffer<ByteBuffer> buffer = ioBuffers.acquire(); 164 // (1) Opening, (2) FileOpened, (3) Output events 165 channel.respond(Event 166 .onCompletion(new Opening<OpenFile>().setResult(event), e -> { 167 channel.respond(new FileOpened(event)); 168 // Start reading. 169 synchronized (ioChannel) { 170 ioChannel.read(buffer.backingBuffer(), offset, buffer, 171 readCompletionHandler); 172 } 173 })); 174 } 175 176 /** 177 * The read completion handler. 178 */ 179 private final class ReadCompletionHandler implements 180 CompletionHandler<Integer, ManagedBuffer<ByteBuffer>> { 181 @Override 182 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 183 "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals" }) 184 public void completed( 185 Integer result, ManagedBuffer<ByteBuffer> buffer) { 186 if (result >= 0) { 187 offset += result; 188 boolean eof = true; 189 try { 190 eof = offset == ioChannel.size(); 191 } catch (IOException e1) { 192 // Handled like true 193 } 194 channel.respond(Output.fromSink(buffer, eof)); 195 if (!eof) { 196 try { 197 ManagedBuffer<ByteBuffer> nextBuffer 198 = ioBuffers.acquire(); 199 nextBuffer.clear(); 200 synchronized (ioChannel) { 201 ioChannel.read(nextBuffer.backingBuffer(), 202 offset, 203 nextBuffer, readCompletionHandler); 204 } 205 } catch (InterruptedException e) { 206 // Results in empty buffer 207 } 208 return; 209 } 210 } 211 IOException ioExc = null; 212 try { 213 ioChannel.close(); 214 } catch (ClosedChannelException e) { 215 // Can be ignored 216 } catch (IOException e) { 217 ioExc = e; 218 } 219 channel.respond(new Closed<Void>(ioExc)); 220 unregisterAsGenerator(); 221 } 222 223 @Override 224 public void failed( 225 Throwable exc, ManagedBuffer<ByteBuffer> context) { 226 channel.respond(new Closed<Void>(exc)); 227 unregisterAsGenerator(); 228 } 229 } 230 231 /** 232 * Stream file that doesn't support asynchronous I/O. 233 * 234 * @param event 235 * @throws IOException 236 */ 237 private void runReaderThread(StreamFile event) 238 throws IOException { 239 ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 240 () -> { 241 return ByteBuffer.allocateDirect(bufferSize); 242 }, 2); 243 @SuppressWarnings("PMD.CloseResource") 244 final SeekableByteChannel ioChannel 245 = Files.newByteChannel(event.path(), event.options()); 246 activeEventPipeline().executorService().submit(new Runnable() { 247 @Override 248 @SuppressWarnings("PMD.EmptyCatchBlock") 249 public void run() { 250 // Reading from file 251 IOException ioExc = null; 252 try { 253 long size = ioChannel.size(); 254 while (ioChannel.position() < size) { 255 ManagedBuffer<ByteBuffer> buffer 256 = ioBuffers.acquire(); 257 buffer.fillFromChannel(ioChannel); 258 channel.respond(Output.fromSink(buffer, 259 ioChannel.position() == size)); 260 } 261 ioChannel.close(); 262 } catch (InterruptedException e) { 263 return; 264 } catch (ClosedChannelException e) { 265 // Can be ignored 266 } catch (IOException e) { 267 ioExc = e; 268 } 269 channel.respond(new Closed<Void>(ioExc)); 270 } 271 }); 272 } 273 274 /* 275 * (non-Javadoc) 276 * 277 * @see java.lang.Object#toString() 278 */ 279 @Override 280 public String toString() { 281 StringBuilder builder = new StringBuilder(50); 282 builder.append("FileStreamer ["); 283 if (channel != null) { 284 builder.append("channel="); 285 builder.append(Channel.toString(channel)); 286 builder.append(", "); 287 } 288 if (path != null) { 289 builder.append("path=").append(path).append(", "); 290 } 291 builder.append("offset=") 292 .append(offset) 293 .append(']'); 294 return builder.toString(); 295 } 296 297 } 298 299 /** 300 * Opens a file for writing using the properties of the event. All data from 301 * subsequent {@link Input} events is written to the file. 302 * The end of record flag is ignored. 303 * 304 * @param event the event 305 * @throws InterruptedException if the execution was interrupted 306 */ 307 @Handler 308 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 309 public void onSaveInput(SaveInput event) throws InterruptedException { 310 if (!Arrays.asList(event.options()) 311 .contains(StandardOpenOption.WRITE)) { 312 throw new IllegalArgumentException( 313 "File must be opened for writing."); 314 } 315 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 316 if (inputWriters.containsKey(channel)) { 317 channel.respond(new IOError(event, 318 new IllegalStateException("File is already open."))); 319 } else { 320 new Writer(event, channel); 321 } 322 } 323 } 324 325 /** 326 * Handle input by writing it to the file, if a channel exists. 327 * 328 * @param event the event 329 * @param channel the channel 330 */ 331 @Handler 332 public void onInput(Input<ByteBuffer> event, Channel channel) { 333 Writer writer = inputWriters.get(channel); 334 if (writer != null) { 335 writer.write(event.buffer()); 336 } 337 } 338 339 /** 340 * Opens a file for writing using the properties of the event. All data from 341 * subsequent {@link Output} events is written to the file. 342 * The end of record flag is ignored. 343 * 344 * @param event the event 345 * @throws InterruptedException if the execution was interrupted 346 */ 347 @Handler 348 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 349 public void onSaveOutput(SaveOutput event) throws InterruptedException { 350 if (!Arrays.asList(event.options()) 351 .contains(StandardOpenOption.WRITE)) { 352 throw new IllegalArgumentException( 353 "File must be opened for writing."); 354 } 355 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 356 if (outputWriters.containsKey(channel)) { 357 channel.respond(new IOError(event, 358 new IllegalStateException("File is already open."))); 359 } else { 360 new Writer(event, channel); 361 } 362 } 363 } 364 365 /** 366 * Handle {@link Output} events by writing them to the file, if 367 * a channel exists. 368 * 369 * @param event the event 370 * @param channel the channel 371 */ 372 @Handler 373 public void onOutput(Output<ByteBuffer> event, Channel channel) { 374 Writer writer = outputWriters.get(channel); 375 if (writer != null) { 376 writer.write(event.buffer()); 377 } 378 } 379 380 /** 381 * Handle close by closing the file associated with the channel. 382 * 383 * @param event the event 384 * @param channel the channel 385 * @throws InterruptedException the interrupted exception 386 */ 387 @Handler 388 public void onClose(Close event, Channel channel) 389 throws InterruptedException { 390 Writer writer = inputWriters.get(channel); 391 if (writer != null) { 392 writer.close(event); 393 } 394 writer = outputWriters.get(channel); 395 if (writer != null) { 396 writer.close(event); 397 } 398 } 399 400 /** 401 * Handle stop by closing all files. 402 * 403 * @param event the event 404 * @throws InterruptedException the interrupted exception 405 */ 406 @Handler(priority = -1000) 407 public void onStop(Stop event) throws InterruptedException { 408 while (!inputWriters.isEmpty()) { 409 Writer handler = inputWriters.entrySet().iterator().next() 410 .getValue(); 411 handler.close(event); 412 } 413 while (!outputWriters.isEmpty()) { 414 Writer handler = outputWriters.entrySet().iterator().next() 415 .getValue(); 416 handler.close(event); 417 } 418 } 419 420 /** 421 * A writer. 422 */ 423 private class Writer { 424 425 private final IOSubchannel channel; 426 private Path path; 427 private AsynchronousFileChannel ioChannel; 428 private long offset; 429 private final CompletionHandler<Integer, 430 WriteContext> writeCompletionHandler 431 = new WriteCompletionHandler(); 432 private int outstandingAsyncs; 433 434 /** 435 * The write context needs to be finer grained than the general file 436 * connection context because an asynchronous write may be only 437 * partially successful, i.e. not all data provided by the write event 438 * may successfully be written in one asynchronous write invocation. 439 */ 440 private class WriteContext { 441 public final ManagedBuffer<ByteBuffer>.ByteBufferView reader; 442 public final long pos; 443 444 /** 445 * Instantiates a new write context. 446 * 447 * @param reader the reader 448 * @param pos the pos 449 */ 450 public WriteContext( 451 ManagedBuffer<ByteBuffer>.ByteBufferView reader, long pos) { 452 this.reader = reader; 453 this.pos = pos; 454 } 455 } 456 457 /** 458 * Instantiates a new writer. 459 * 460 * @param event the event 461 * @param channel the channel 462 * @throws InterruptedException the interrupted exception 463 */ 464 public Writer(SaveInput event, IOSubchannel channel) 465 throws InterruptedException { 466 this(event, event.path(), event.options(), channel); 467 inputWriters.put(channel, this); 468 channel.respond(new FileOpened(event)); 469 } 470 471 /** 472 * Instantiates a new writer. 473 * 474 * @param event the event 475 * @param channel the channel 476 * @throws InterruptedException the interrupted exception 477 */ 478 public Writer(SaveOutput event, IOSubchannel channel) 479 throws InterruptedException { 480 this(event, event.path(), event.options(), channel); 481 outputWriters.put(channel, this); 482 channel.respond(new FileOpened(event)); 483 } 484 485 private Writer(Event<?> event, Path path, OpenOption[] options, 486 IOSubchannel channel) throws InterruptedException { 487 this.channel = channel; 488 this.path = path; 489 offset = 0; 490 try { 491 ioChannel = AsynchronousFileChannel.open(path, options); 492 } catch (IOException e) { 493 channel.respond(new IOError(event, e)); 494 } 495 } 496 497 /** 498 * Write the buffer. 499 * 500 * @param buffer the buffer 501 */ 502 public void write(ManagedBuffer<ByteBuffer> buffer) { 503 int written = buffer.remaining(); 504 if (written == 0) { 505 return; 506 } 507 buffer.lockBuffer(); 508 synchronized (ioChannel) { 509 if (outstandingAsyncs == 0) { 510 registerAsGenerator(); 511 } 512 outstandingAsyncs += 1; 513 ManagedBuffer<ByteBuffer>.ByteBufferView reader 514 = buffer.newByteBufferView(); 515 ioChannel.write(reader.get(), offset, 516 new WriteContext(reader, offset), 517 writeCompletionHandler); 518 } 519 offset += written; 520 } 521 522 /** 523 * A write completion handler. 524 */ 525 private final class WriteCompletionHandler 526 implements CompletionHandler<Integer, WriteContext> { 527 528 @Override 529 public void completed(Integer result, WriteContext context) { 530 ManagedBuffer<ByteBuffer>.ByteBufferView reader 531 = context.reader; 532 if (reader.get().hasRemaining()) { 533 ioChannel.write(reader.get(), 534 context.pos + reader.get().position(), 535 context, writeCompletionHandler); 536 return; 537 } 538 reader.managedBuffer().unlockBuffer(); 539 handled(); 540 } 541 542 @Override 543 public void failed(Throwable exc, WriteContext context) { 544 try { 545 if (!(exc instanceof AsynchronousCloseException)) { 546 channel.respond(new IOError(null, exc)); 547 } 548 } finally { 549 handled(); 550 } 551 } 552 553 @SuppressWarnings("PMD.AssignmentInOperand") 554 private void handled() { 555 synchronized (ioChannel) { 556 if (--outstandingAsyncs == 0) { 557 unregisterAsGenerator(); 558 ioChannel.notifyAll(); 559 } 560 } 561 } 562 } 563 564 /** 565 * Close. 566 * 567 * @param event the event 568 * @throws InterruptedException the interrupted exception 569 */ 570 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 571 "PMD.EmptyCatchBlock" }) 572 public void close(Event<?> event) 573 throws InterruptedException { 574 IOException ioExc = null; 575 try { 576 synchronized (ioChannel) { 577 while (outstandingAsyncs > 0) { 578 ioChannel.wait(); 579 } 580 ioChannel.close(); 581 } 582 } catch (ClosedChannelException e) { 583 // Can be ignored 584 } catch (IOException e) { 585 ioExc = e; 586 } 587 channel.respond(new Closed<Void>(ioExc)); 588 inputWriters.remove(channel); 589 outputWriters.remove(channel); 590 } 591 592 /* 593 * (non-Javadoc) 594 * 595 * @see java.lang.Object#toString() 596 */ 597 @Override 598 public String toString() { 599 StringBuilder builder = new StringBuilder(50); 600 builder.append("FileConnection ["); 601 if (channel != null) { 602 builder.append("channel=") 603 .append(Channel.toString(channel)) 604 .append(", "); 605 } 606 if (path != null) { 607 builder.append("path=") 608 .append(path) 609 .append(", "); 610 } 611 builder.append("offset=") 612 .append(offset) 613 .append(']'); 614 return builder.toString(); 615 } 616 617 } 618 619 /* 620 * (non-Javadoc) 621 * 622 * @see java.lang.Object#toString() 623 */ 624 @Override 625 public String toString() { 626 StringBuilder builder = new StringBuilder(); 627 builder.append(Components.objectName(this)) 628 .append(" ["); 629 if (inputWriters != null) { 630 builder.append(inputWriters.values().stream() 631 .map(chnl -> Components.objectName(chnl)) 632 .collect(Collectors.toList())); 633 } 634 builder.append(']'); 635 return builder.toString(); 636 } 637}