001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.channels.AsynchronousCloseException; 024import java.nio.channels.AsynchronousFileChannel; 025import java.nio.channels.ClosedChannelException; 026import java.nio.channels.CompletionHandler; 027import java.nio.channels.SeekableByteChannel; 028import java.nio.file.Files; 029import java.nio.file.OpenOption; 030import java.nio.file.Path; 031import java.nio.file.StandardOpenOption; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Map; 035import java.util.WeakHashMap; 036import java.util.stream.Collectors; 037import org.jgrapes.core.Channel; 038import org.jgrapes.core.Component; 039import org.jgrapes.core.Components; 040import org.jgrapes.core.Event; 041import org.jgrapes.core.annotation.Handler; 042import org.jgrapes.core.events.Stop; 043import org.jgrapes.io.events.Close; 044import org.jgrapes.io.events.Closed; 045import org.jgrapes.io.events.FileOpened; 046import org.jgrapes.io.events.IOError; 047import org.jgrapes.io.events.Input; 048import org.jgrapes.io.events.OpenFile; 049import org.jgrapes.io.events.Opening; 050import org.jgrapes.io.events.Output; 051import org.jgrapes.io.events.SaveInput; 052import org.jgrapes.io.events.SaveOutput; 053import org.jgrapes.io.events.StreamFile; 054import org.jgrapes.io.util.ManagedBuffer; 055import org.jgrapes.io.util.ManagedBufferPool; 056 057/** 058 * A component that reads from or writes to a file. 059 */ 060@SuppressWarnings("PMD.ExcessiveImports") 061public class FileStorage extends Component { 062 063 private int bufferSize; 064 065 @SuppressWarnings("PMD.UseConcurrentHashMap") 066 private final Map<Channel, Writer> inputWriters = Collections 067 .synchronizedMap(new WeakHashMap<>()); 068 @SuppressWarnings("PMD.UseConcurrentHashMap") 069 private final Map<Channel, Writer> outputWriters = Collections 070 .synchronizedMap(new WeakHashMap<>()); 071 072 /** 073 * Create a new instance using the given size for the read buffers. 074 * 075 * @param channel the component's channel. Used for sending {@link Output} 076 * events and receiving {@link Input} events 077 * @param bufferSize the size of the buffers used for reading 078 */ 079 public FileStorage(Channel channel, int bufferSize) { 080 super(channel); 081 this.bufferSize = bufferSize; 082 } 083 084 /** 085 * Create a new instance using the default buffer size of 8192. 086 * 087 * @param channel the component's channel. Used for sending {@link Output} 088 * events and receiving {@link Input} events 089 */ 090 public FileStorage(Channel channel) { 091 this(channel, 8192); 092 } 093 094 /** 095 * Opens a file for reading using the properties of the event and streams 096 * its content as a sequence of {@link Output} events with the 097 * end of record flag set in the last event. All generated events are 098 * considered responses to this event and therefore fired using the event 099 * processor from the event's I/O subchannel. 100 * 101 * @param event the event 102 * @throws InterruptedException if the execution was interrupted 103 */ 104 @Handler 105 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 106 "PMD.AccessorClassGeneration", "PMD.AvoidDuplicateLiterals" }) 107 public void onStreamFile(StreamFile event) 108 throws InterruptedException { 109 if (Arrays.asList(event.options()) 110 .contains(StandardOpenOption.WRITE)) { 111 throw new IllegalArgumentException( 112 "Cannot stream file opened for writing."); 113 } 114 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 115 if (inputWriters.containsKey(channel)) { 116 channel.respond(new IOError(event, 117 new IllegalStateException("File is already open."))); 118 } else { 119 new FileStreamer(event, channel); 120 } 121 } 122 } 123 124 /** 125 * A file streamer. 126 */ 127 private class FileStreamer { 128 129 private final IOSubchannel channel; 130 private final Path path; 131 private AsynchronousFileChannel ioChannel; 132 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, 133 ByteBuffer> ioBuffers; 134 private long offset; 135 private final CompletionHandler<Integer, 136 ManagedBuffer<ByteBuffer>> readCompletionHandler 137 = new ReadCompletionHandler(); 138 139 private FileStreamer(StreamFile event, IOSubchannel channel) 140 throws InterruptedException { 141 this.channel = channel; 142 path = event.path(); 143 offset = 0; 144 try { 145 try { 146 ioChannel = AsynchronousFileChannel 147 .open(event.path(), event.options()); 148 } catch (UnsupportedOperationException e) { 149 runReaderThread(event); 150 return; 151 } 152 } catch (IOException e) { 153 channel.respond(new IOError(event, e)); 154 return; 155 } 156 registerAsGenerator(); 157 // Reading from file 158 ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 159 () -> { 160 return ByteBuffer.allocateDirect(bufferSize); 161 }, 2); 162 ManagedBuffer<ByteBuffer> buffer = ioBuffers.acquire(); 163 // (1) Opening, (2) FileOpened, (3) Output events 164 channel.respond(Event 165 .onCompletion(new Opening<OpenFile>().setResult(event), e -> { 166 channel.respond(new FileOpened(event)); 167 // Start reading. 168 synchronized (ioChannel) { 169 ioChannel.read(buffer.backingBuffer(), offset, buffer, 170 readCompletionHandler); 171 } 172 })); 173 } 174 175 /** 176 * The read completion handler. 177 */ 178 private class ReadCompletionHandler 179 implements 180 CompletionHandler<Integer, ManagedBuffer<ByteBuffer>> { 181 182 @Override 183 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 184 "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals" }) 185 public void completed( 186 Integer result, ManagedBuffer<ByteBuffer> buffer) { 187 if (result >= 0) { 188 offset += result; 189 boolean eof = true; 190 try { 191 eof = offset == ioChannel.size(); 192 } catch (IOException e1) { 193 // Handled like true 194 } 195 channel.respond(Output.fromSink(buffer, eof)); 196 if (!eof) { 197 try { 198 ManagedBuffer<ByteBuffer> nextBuffer 199 = ioBuffers.acquire(); 200 nextBuffer.clear(); 201 synchronized (ioChannel) { 202 ioChannel.read(nextBuffer.backingBuffer(), 203 offset, 204 nextBuffer, readCompletionHandler); 205 } 206 } catch (InterruptedException e) { 207 // Results in empty buffer 208 } 209 return; 210 } 211 } 212 IOException ioExc = null; 213 try { 214 ioChannel.close(); 215 } catch (ClosedChannelException e) { 216 // Can be ignored 217 } catch (IOException e) { 218 ioExc = e; 219 } 220 channel.respond(new Closed<Void>(ioExc)); 221 unregisterAsGenerator(); 222 } 223 224 @Override 225 public void failed( 226 Throwable exc, ManagedBuffer<ByteBuffer> context) { 227 channel.respond(new Closed<Void>(exc)); 228 unregisterAsGenerator(); 229 } 230 } 231 232 /** 233 * Stream file that doesn't support asynchronous I/O. 234 * 235 * @param event 236 * @throws IOException 237 */ 238 private void runReaderThread(StreamFile event) 239 throws IOException { 240 ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 241 () -> { 242 return ByteBuffer.allocateDirect(bufferSize); 243 }, 2); 244 @SuppressWarnings("PMD.CloseResource") 245 final SeekableByteChannel ioChannel 246 = Files.newByteChannel(event.path(), event.options()); 247 activeEventPipeline().executorService().submit(new Runnable() { 248 @Override 249 @SuppressWarnings("PMD.EmptyCatchBlock") 250 public void run() { 251 // Reading from file 252 IOException ioExc = null; 253 try { 254 long size = ioChannel.size(); 255 while (ioChannel.position() < size) { 256 ManagedBuffer<ByteBuffer> buffer 257 = ioBuffers.acquire(); 258 buffer.fillFromChannel(ioChannel); 259 channel.respond(Output.fromSink(buffer, 260 ioChannel.position() == size)); 261 } 262 ioChannel.close(); 263 } catch (InterruptedException e) { 264 return; 265 } catch (ClosedChannelException e) { 266 // Can be ignored 267 } catch (IOException e) { 268 ioExc = e; 269 } 270 channel.respond(new Closed<Void>(ioExc)); 271 } 272 }); 273 } 274 275 /* 276 * (non-Javadoc) 277 * 278 * @see java.lang.Object#toString() 279 */ 280 @Override 281 public String toString() { 282 StringBuilder builder = new StringBuilder(50); 283 builder.append("FileStreamer ["); 284 if (channel != null) { 285 builder.append("channel="); 286 builder.append(Channel.toString(channel)); 287 builder.append(", "); 288 } 289 if (path != null) { 290 builder.append("path="); 291 builder.append(path); 292 builder.append(", "); 293 } 294 builder.append("offset=") 295 .append(offset) 296 .append(']'); 297 return builder.toString(); 298 } 299 300 } 301 302 /** 303 * Opens a file for writing using the properties of the event. All data from 304 * subsequent {@link Input} events is written to the file. 305 * The end of record flag is ignored. 306 * 307 * @param event the event 308 * @throws InterruptedException if the execution was interrupted 309 */ 310 @Handler 311 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 312 public void onSaveInput(SaveInput event) throws InterruptedException { 313 if (!Arrays.asList(event.options()) 314 .contains(StandardOpenOption.WRITE)) { 315 throw new IllegalArgumentException( 316 "File must be opened for writing."); 317 } 318 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 319 if (inputWriters.containsKey(channel)) { 320 channel.respond(new IOError(event, 321 new IllegalStateException("File is already open."))); 322 } else { 323 new Writer(event, channel); 324 } 325 } 326 } 327 328 /** 329 * Handle input by writing it to the file, if a channel exists. 330 * 331 * @param event the event 332 * @param channel the channel 333 */ 334 @Handler 335 public void onInput(Input<ByteBuffer> event, Channel channel) { 336 Writer writer = inputWriters.get(channel); 337 if (writer != null) { 338 writer.write(event.buffer()); 339 } 340 } 341 342 /** 343 * Opens a file for writing using the properties of the event. All data from 344 * subsequent {@link Output} events is written to the file. 345 * The end of record flag is ignored. 346 * 347 * @param event the event 348 * @throws InterruptedException if the execution was interrupted 349 */ 350 @Handler 351 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 352 public void onSaveOutput(SaveOutput event) throws InterruptedException { 353 if (!Arrays.asList(event.options()) 354 .contains(StandardOpenOption.WRITE)) { 355 throw new IllegalArgumentException( 356 "File must be opened for writing."); 357 } 358 for (IOSubchannel channel : event.channels(IOSubchannel.class)) { 359 if (outputWriters.containsKey(channel)) { 360 channel.respond(new IOError(event, 361 new IllegalStateException("File is already open."))); 362 } else { 363 new Writer(event, channel); 364 } 365 } 366 } 367 368 /** 369 * Handle {@link Output} events by writing them to the file, if 370 * a channel exists. 371 * 372 * @param event the event 373 * @param channel the channel 374 */ 375 @Handler 376 public void onOutput(Output<ByteBuffer> event, Channel channel) { 377 Writer writer = outputWriters.get(channel); 378 if (writer != null) { 379 writer.write(event.buffer()); 380 } 381 } 382 383 /** 384 * Handle close by closing the file associated with the channel. 385 * 386 * @param event the event 387 * @param channel the channel 388 * @throws InterruptedException the interrupted exception 389 */ 390 @Handler 391 public void onClose(Close event, Channel channel) 392 throws InterruptedException { 393 Writer writer = inputWriters.get(channel); 394 if (writer != null) { 395 writer.close(event); 396 } 397 writer = outputWriters.get(channel); 398 if (writer != null) { 399 writer.close(event); 400 } 401 } 402 403 /** 404 * Handle stop by closing all files. 405 * 406 * @param event the event 407 * @throws InterruptedException the interrupted exception 408 */ 409 @Handler(priority = -1000) 410 public void onStop(Stop event) throws InterruptedException { 411 while (!inputWriters.isEmpty()) { 412 Writer handler = inputWriters.entrySet().iterator().next() 413 .getValue(); 414 handler.close(event); 415 } 416 while (!outputWriters.isEmpty()) { 417 Writer handler = outputWriters.entrySet().iterator().next() 418 .getValue(); 419 handler.close(event); 420 } 421 } 422 423 /** 424 * A writer. 425 */ 426 private class Writer { 427 428 private final IOSubchannel channel; 429 private Path path; 430 private AsynchronousFileChannel ioChannel; 431 private long offset; 432 private final CompletionHandler<Integer, 433 WriteContext> writeCompletionHandler 434 = new WriteCompletionHandler(); 435 private int outstandingAsyncs; 436 437 /** 438 * The write context needs to be finer grained than the general file 439 * connection context because an asynchronous write may be only 440 * partially successful, i.e. not all data provided by the write event 441 * may successfully be written in one asynchronous write invocation. 442 */ 443 private class WriteContext { 444 public ManagedBuffer<ByteBuffer>.ByteBufferView reader; 445 public long pos; 446 447 /** 448 * Instantiates a new write context. 449 * 450 * @param reader the reader 451 * @param pos the pos 452 */ 453 public WriteContext( 454 ManagedBuffer<ByteBuffer>.ByteBufferView reader, long pos) { 455 this.reader = reader; 456 this.pos = pos; 457 } 458 } 459 460 /** 461 * Instantiates a new writer. 462 * 463 * @param event the event 464 * @param channel the channel 465 * @throws InterruptedException the interrupted exception 466 */ 467 public Writer(SaveInput event, IOSubchannel channel) 468 throws InterruptedException { 469 this(event, event.path(), event.options(), channel); 470 inputWriters.put(channel, this); 471 channel.respond(new FileOpened(event)); 472 } 473 474 /** 475 * Instantiates a new writer. 476 * 477 * @param event the event 478 * @param channel the channel 479 * @throws InterruptedException the interrupted exception 480 */ 481 public Writer(SaveOutput event, IOSubchannel channel) 482 throws InterruptedException { 483 this(event, event.path(), event.options(), channel); 484 outputWriters.put(channel, this); 485 channel.respond(new FileOpened(event)); 486 } 487 488 private Writer(Event<?> event, Path path, OpenOption[] options, 489 IOSubchannel channel) throws InterruptedException { 490 this.channel = channel; 491 this.path = path; 492 offset = 0; 493 try { 494 ioChannel = AsynchronousFileChannel.open(path, options); 495 } catch (IOException e) { 496 channel.respond(new IOError(event, e)); 497 return; 498 } 499 } 500 501 /** 502 * Write the buffer. 503 * 504 * @param buffer the buffer 505 */ 506 public void write(ManagedBuffer<ByteBuffer> buffer) { 507 int written = buffer.remaining(); 508 if (written == 0) { 509 return; 510 } 511 buffer.lockBuffer(); 512 synchronized (ioChannel) { 513 if (outstandingAsyncs == 0) { 514 registerAsGenerator(); 515 } 516 outstandingAsyncs += 1; 517 ManagedBuffer<ByteBuffer>.ByteBufferView reader 518 = buffer.newByteBufferView(); 519 ioChannel.write(reader.get(), offset, 520 new WriteContext(reader, offset), 521 writeCompletionHandler); 522 } 523 offset += written; 524 } 525 526 /** 527 * A write completion handler. 528 */ 529 private class WriteCompletionHandler 530 implements CompletionHandler<Integer, WriteContext> { 531 532 @Override 533 public void completed(Integer result, WriteContext context) { 534 ManagedBuffer<ByteBuffer>.ByteBufferView reader 535 = context.reader; 536 if (reader.get().hasRemaining()) { 537 ioChannel.write(reader.get(), 538 context.pos + reader.get().position(), 539 context, writeCompletionHandler); 540 return; 541 } 542 reader.managedBuffer().unlockBuffer(); 543 handled(); 544 } 545 546 @Override 547 public void failed(Throwable exc, WriteContext context) { 548 try { 549 if (!(exc instanceof AsynchronousCloseException)) { 550 channel.respond(new IOError(null, exc)); 551 } 552 } finally { 553 handled(); 554 } 555 } 556 557 @SuppressWarnings("PMD.AssignmentInOperand") 558 private void handled() { 559 synchronized (ioChannel) { 560 if (--outstandingAsyncs == 0) { 561 unregisterAsGenerator(); 562 ioChannel.notifyAll(); 563 } 564 } 565 } 566 } 567 568 /** 569 * Close. 570 * 571 * @param event the event 572 * @throws InterruptedException the interrupted exception 573 */ 574 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 575 "PMD.EmptyCatchBlock" }) 576 public void close(Event<?> event) 577 throws InterruptedException { 578 IOException ioExc = null; 579 try { 580 synchronized (ioChannel) { 581 while (outstandingAsyncs > 0) { 582 ioChannel.wait(); 583 } 584 ioChannel.close(); 585 } 586 } catch (ClosedChannelException e) { 587 // Can be ignored 588 } catch (IOException e) { 589 ioExc = e; 590 } 591 channel.respond(new Closed<Void>(ioExc)); 592 inputWriters.remove(channel); 593 outputWriters.remove(channel); 594 } 595 596 /* 597 * (non-Javadoc) 598 * 599 * @see java.lang.Object#toString() 600 */ 601 @Override 602 public String toString() { 603 StringBuilder builder = new StringBuilder(50); 604 builder.append("FileConnection ["); 605 if (channel != null) { 606 builder.append("channel=") 607 .append(Channel.toString(channel)) 608 .append(", "); 609 } 610 if (path != null) { 611 builder.append("path=") 612 .append(path) 613 .append(", "); 614 } 615 builder.append("offset=") 616 .append(offset) 617 .append(']'); 618 return builder.toString(); 619 } 620 621 } 622 623 /* 624 * (non-Javadoc) 625 * 626 * @see java.lang.Object#toString() 627 */ 628 @Override 629 public String toString() { 630 StringBuilder builder = new StringBuilder(); 631 builder.append(Components.objectName(this)) 632 .append(" ["); 633 if (inputWriters != null) { 634 builder.append(inputWriters.values().stream() 635 .map(chnl -> Components.objectName(chnl)) 636 .collect(Collectors.toList())); 637 } 638 builder.append(']'); 639 return builder.toString(); 640 } 641}