001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.nio.channels.AsynchronousCloseException;
024import java.nio.channels.AsynchronousFileChannel;
025import java.nio.channels.ClosedChannelException;
026import java.nio.channels.CompletionHandler;
027import java.nio.channels.SeekableByteChannel;
028import java.nio.file.Files;
029import java.nio.file.OpenOption;
030import java.nio.file.Path;
031import java.nio.file.StandardOpenOption;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Map;
035import java.util.WeakHashMap;
036import java.util.stream.Collectors;
037import org.jgrapes.core.Channel;
038import org.jgrapes.core.Component;
039import org.jgrapes.core.Components;
040import org.jgrapes.core.Event;
041import org.jgrapes.core.annotation.Handler;
042import org.jgrapes.core.events.Stop;
043import org.jgrapes.io.events.Close;
044import org.jgrapes.io.events.Closed;
045import org.jgrapes.io.events.FileOpened;
046import org.jgrapes.io.events.IOError;
047import org.jgrapes.io.events.Input;
048import org.jgrapes.io.events.OpenFile;
049import org.jgrapes.io.events.Opening;
050import org.jgrapes.io.events.Output;
051import org.jgrapes.io.events.SaveInput;
052import org.jgrapes.io.events.SaveOutput;
053import org.jgrapes.io.events.StreamFile;
054import org.jgrapes.io.util.ManagedBuffer;
055import org.jgrapes.io.util.ManagedBufferPool;
056
057/**
058 * A component that reads from or writes to a file.
059 */
060@SuppressWarnings("PMD.ExcessiveImports")
061public class FileStorage extends Component {
062
063    private int bufferSize;
064
065    @SuppressWarnings("PMD.UseConcurrentHashMap")
066    private final Map<Channel, Writer> inputWriters = Collections
067        .synchronizedMap(new WeakHashMap<>());
068    @SuppressWarnings("PMD.UseConcurrentHashMap")
069    private final Map<Channel, Writer> outputWriters = Collections
070        .synchronizedMap(new WeakHashMap<>());
071
072    /**
073     * Create a new instance using the given size for the read buffers.
074     * 
075     * @param channel the component's channel. Used for sending {@link Output}
076     * events and receiving {@link Input} events 
077     * @param bufferSize the size of the buffers used for reading
078     */
079    public FileStorage(Channel channel, int bufferSize) {
080        super(channel);
081        this.bufferSize = bufferSize;
082    }
083
084    /**
085     * Create a new instance using the default buffer size of 8192.
086     * 
087     * @param channel the component's channel. Used for sending {@link Output}
088     * events and receiving {@link Input} events 
089     */
090    public FileStorage(Channel channel) {
091        this(channel, 8192);
092    }
093
094    /**
095     * Opens a file for reading using the properties of the event and streams
096     * its content as a sequence of {@link Output} events with the 
097     * end of record flag set in the last event. All generated events are 
098     * considered responses to this event and therefore fired using the event 
099     * processor from the event's I/O subchannel.
100     * 
101     * @param event the event
102     * @throws InterruptedException if the execution was interrupted
103     */
104    @Handler
105    @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
106        "PMD.AccessorClassGeneration", "PMD.AvoidDuplicateLiterals" })
107    public void onStreamFile(StreamFile event)
108            throws InterruptedException {
109        if (Arrays.asList(event.options())
110            .contains(StandardOpenOption.WRITE)) {
111            throw new IllegalArgumentException(
112                "Cannot stream file opened for writing.");
113        }
114        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
115            if (inputWriters.containsKey(channel)) {
116                channel.respond(new IOError(event,
117                    new IllegalStateException("File is already open.")));
118            } else {
119                new FileStreamer(event, channel);
120            }
121        }
122    }
123
124    /**
125     * A file streamer.
126     */
127    private class FileStreamer {
128
129        private final IOSubchannel channel;
130        private final Path path;
131        private AsynchronousFileChannel ioChannel;
132        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
133                ByteBuffer> ioBuffers;
134        private long offset;
135        private final CompletionHandler<Integer,
136                ManagedBuffer<ByteBuffer>> readCompletionHandler
137                    = new ReadCompletionHandler();
138
139        private FileStreamer(StreamFile event, IOSubchannel channel)
140                throws InterruptedException {
141            this.channel = channel;
142            path = event.path();
143            offset = 0;
144            try {
145                try {
146                    ioChannel = AsynchronousFileChannel
147                        .open(event.path(), event.options());
148                } catch (UnsupportedOperationException e) {
149                    runReaderThread(event);
150                    return;
151                }
152            } catch (IOException e) {
153                channel.respond(new IOError(event, e));
154                return;
155            }
156            registerAsGenerator();
157            // Reading from file
158            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
159                () -> {
160                    return ByteBuffer.allocateDirect(bufferSize);
161                }, 2);
162            ManagedBuffer<ByteBuffer> buffer = ioBuffers.acquire();
163            // (1) Opening, (2) FileOpened, (3) Output events
164            channel.respond(Event
165                .onCompletion(new Opening<OpenFile>().setResult(event), e -> {
166                    channel.respond(new FileOpened(event));
167                    // Start reading.
168                    synchronized (ioChannel) {
169                        ioChannel.read(buffer.backingBuffer(), offset, buffer,
170                            readCompletionHandler);
171                    }
172                }));
173        }
174
175        /**
176         * The read completion handler.
177         */
178        private class ReadCompletionHandler
179                implements
180                CompletionHandler<Integer, ManagedBuffer<ByteBuffer>> {
181
182            @Override
183            @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
184                "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals" })
185            public void completed(
186                    Integer result, ManagedBuffer<ByteBuffer> buffer) {
187                if (result >= 0) {
188                    offset += result;
189                    boolean eof = true;
190                    try {
191                        eof = offset == ioChannel.size();
192                    } catch (IOException e1) {
193                        // Handled like true
194                    }
195                    channel.respond(Output.fromSink(buffer, eof));
196                    if (!eof) {
197                        try {
198                            ManagedBuffer<ByteBuffer> nextBuffer
199                                = ioBuffers.acquire();
200                            nextBuffer.clear();
201                            synchronized (ioChannel) {
202                                ioChannel.read(nextBuffer.backingBuffer(),
203                                    offset,
204                                    nextBuffer, readCompletionHandler);
205                            }
206                        } catch (InterruptedException e) {
207                            // Results in empty buffer
208                        }
209                        return;
210                    }
211                }
212                IOException ioExc = null;
213                try {
214                    ioChannel.close();
215                } catch (ClosedChannelException e) {
216                    // Can be ignored
217                } catch (IOException e) {
218                    ioExc = e;
219                }
220                channel.respond(new Closed<Void>(ioExc));
221                unregisterAsGenerator();
222            }
223
224            @Override
225            public void failed(
226                    Throwable exc, ManagedBuffer<ByteBuffer> context) {
227                channel.respond(new Closed<Void>(exc));
228                unregisterAsGenerator();
229            }
230        }
231
232        /**
233         * Stream file that doesn't support asynchronous I/O.
234         * 
235         * @param event
236         * @throws IOException
237         */
238        private void runReaderThread(StreamFile event)
239                throws IOException {
240            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
241                () -> {
242                    return ByteBuffer.allocateDirect(bufferSize);
243                }, 2);
244            @SuppressWarnings("PMD.CloseResource")
245            final SeekableByteChannel ioChannel
246                = Files.newByteChannel(event.path(), event.options());
247            activeEventPipeline().executorService().submit(new Runnable() {
248                @Override
249                @SuppressWarnings("PMD.EmptyCatchBlock")
250                public void run() {
251                    // Reading from file
252                    IOException ioExc = null;
253                    try {
254                        long size = ioChannel.size();
255                        while (ioChannel.position() < size) {
256                            ManagedBuffer<ByteBuffer> buffer
257                                = ioBuffers.acquire();
258                            buffer.fillFromChannel(ioChannel);
259                            channel.respond(Output.fromSink(buffer,
260                                ioChannel.position() == size));
261                        }
262                        ioChannel.close();
263                    } catch (InterruptedException e) {
264                        return;
265                    } catch (ClosedChannelException e) {
266                        // Can be ignored
267                    } catch (IOException e) {
268                        ioExc = e;
269                    }
270                    channel.respond(new Closed<Void>(ioExc));
271                }
272            });
273        }
274
275        /*
276         * (non-Javadoc)
277         * 
278         * @see java.lang.Object#toString()
279         */
280        @Override
281        public String toString() {
282            StringBuilder builder = new StringBuilder(50);
283            builder.append("FileStreamer [");
284            if (channel != null) {
285                builder.append("channel=");
286                builder.append(Channel.toString(channel));
287                builder.append(", ");
288            }
289            if (path != null) {
290                builder.append("path=");
291                builder.append(path);
292                builder.append(", ");
293            }
294            builder.append("offset=")
295                .append(offset)
296                .append(']');
297            return builder.toString();
298        }
299
300    }
301
302    /**
303     * Opens a file for writing using the properties of the event. All data from
304     * subsequent {@link Input} events is written to the file.
305     * The end of record flag is ignored.
306     * 
307     * @param event the event
308     * @throws InterruptedException if the execution was interrupted
309     */
310    @Handler
311    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
312    public void onSaveInput(SaveInput event) throws InterruptedException {
313        if (!Arrays.asList(event.options())
314            .contains(StandardOpenOption.WRITE)) {
315            throw new IllegalArgumentException(
316                "File must be opened for writing.");
317        }
318        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
319            if (inputWriters.containsKey(channel)) {
320                channel.respond(new IOError(event,
321                    new IllegalStateException("File is already open.")));
322            } else {
323                new Writer(event, channel);
324            }
325        }
326    }
327
328    /**
329     * Handle input by writing it to the file, if a channel exists.
330     *
331     * @param event the event
332     * @param channel the channel
333     */
334    @Handler
335    public void onInput(Input<ByteBuffer> event, Channel channel) {
336        Writer writer = inputWriters.get(channel);
337        if (writer != null) {
338            writer.write(event.buffer());
339        }
340    }
341
342    /**
343     * Opens a file for writing using the properties of the event. All data from
344     * subsequent {@link Output} events is written to the file. 
345     * The end of record flag is ignored.
346     * 
347     * @param event the event
348     * @throws InterruptedException if the execution was interrupted
349     */
350    @Handler
351    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
352    public void onSaveOutput(SaveOutput event) throws InterruptedException {
353        if (!Arrays.asList(event.options())
354            .contains(StandardOpenOption.WRITE)) {
355            throw new IllegalArgumentException(
356                "File must be opened for writing.");
357        }
358        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
359            if (outputWriters.containsKey(channel)) {
360                channel.respond(new IOError(event,
361                    new IllegalStateException("File is already open.")));
362            } else {
363                new Writer(event, channel);
364            }
365        }
366    }
367
368    /**
369     * Handle {@link Output} events by writing them to the file, if
370     * a channel exists.
371     *
372     * @param event the event
373     * @param channel the channel
374     */
375    @Handler
376    public void onOutput(Output<ByteBuffer> event, Channel channel) {
377        Writer writer = outputWriters.get(channel);
378        if (writer != null) {
379            writer.write(event.buffer());
380        }
381    }
382
383    /**
384     * Handle close by closing the file associated with the channel.
385     *
386     * @param event the event
387     * @param channel the channel
388     * @throws InterruptedException the interrupted exception
389     */
390    @Handler
391    public void onClose(Close event, Channel channel)
392            throws InterruptedException {
393        Writer writer = inputWriters.get(channel);
394        if (writer != null) {
395            writer.close(event);
396        }
397        writer = outputWriters.get(channel);
398        if (writer != null) {
399            writer.close(event);
400        }
401    }
402
403    /**
404     * Handle stop by closing all files.
405     *
406     * @param event the event
407     * @throws InterruptedException the interrupted exception
408     */
409    @Handler(priority = -1000)
410    public void onStop(Stop event) throws InterruptedException {
411        while (!inputWriters.isEmpty()) {
412            Writer handler = inputWriters.entrySet().iterator().next()
413                .getValue();
414            handler.close(event);
415        }
416        while (!outputWriters.isEmpty()) {
417            Writer handler = outputWriters.entrySet().iterator().next()
418                .getValue();
419            handler.close(event);
420        }
421    }
422
423    /**
424     * A writer.
425     */
426    private class Writer {
427
428        private final IOSubchannel channel;
429        private Path path;
430        private AsynchronousFileChannel ioChannel;
431        private long offset;
432        private final CompletionHandler<Integer,
433                WriteContext> writeCompletionHandler
434                    = new WriteCompletionHandler();
435        private int outstandingAsyncs;
436
437        /**
438         * The write context needs to be finer grained than the general file
439         * connection context because an asynchronous write may be only
440         * partially successful, i.e. not all data provided by the write event
441         * may successfully be written in one asynchronous write invocation.
442         */
443        private class WriteContext {
444            public ManagedBuffer<ByteBuffer>.ByteBufferView reader;
445            public long pos;
446
447            /**
448             * Instantiates a new write context.
449             *
450             * @param reader the reader
451             * @param pos the pos
452             */
453            public WriteContext(
454                    ManagedBuffer<ByteBuffer>.ByteBufferView reader, long pos) {
455                this.reader = reader;
456                this.pos = pos;
457            }
458        }
459
460        /**
461         * Instantiates a new writer.
462         *
463         * @param event the event
464         * @param channel the channel
465         * @throws InterruptedException the interrupted exception
466         */
467        public Writer(SaveInput event, IOSubchannel channel)
468                throws InterruptedException {
469            this(event, event.path(), event.options(), channel);
470            inputWriters.put(channel, this);
471            channel.respond(new FileOpened(event));
472        }
473
474        /**
475         * Instantiates a new writer.
476         *
477         * @param event the event
478         * @param channel the channel
479         * @throws InterruptedException the interrupted exception
480         */
481        public Writer(SaveOutput event, IOSubchannel channel)
482                throws InterruptedException {
483            this(event, event.path(), event.options(), channel);
484            outputWriters.put(channel, this);
485            channel.respond(new FileOpened(event));
486        }
487
488        private Writer(Event<?> event, Path path, OpenOption[] options,
489                IOSubchannel channel) throws InterruptedException {
490            this.channel = channel;
491            this.path = path;
492            offset = 0;
493            try {
494                ioChannel = AsynchronousFileChannel.open(path, options);
495            } catch (IOException e) {
496                channel.respond(new IOError(event, e));
497                return;
498            }
499        }
500
501        /**
502         * Write the buffer.
503         *
504         * @param buffer the buffer
505         */
506        public void write(ManagedBuffer<ByteBuffer> buffer) {
507            int written = buffer.remaining();
508            if (written == 0) {
509                return;
510            }
511            buffer.lockBuffer();
512            synchronized (ioChannel) {
513                if (outstandingAsyncs == 0) {
514                    registerAsGenerator();
515                }
516                outstandingAsyncs += 1;
517                ManagedBuffer<ByteBuffer>.ByteBufferView reader
518                    = buffer.newByteBufferView();
519                ioChannel.write(reader.get(), offset,
520                    new WriteContext(reader, offset),
521                    writeCompletionHandler);
522            }
523            offset += written;
524        }
525
526        /**
527         * A write completion handler.
528         */
529        private class WriteCompletionHandler
530                implements CompletionHandler<Integer, WriteContext> {
531
532            @Override
533            public void completed(Integer result, WriteContext context) {
534                ManagedBuffer<ByteBuffer>.ByteBufferView reader
535                    = context.reader;
536                if (reader.get().hasRemaining()) {
537                    ioChannel.write(reader.get(),
538                        context.pos + reader.get().position(),
539                        context, writeCompletionHandler);
540                    return;
541                }
542                reader.managedBuffer().unlockBuffer();
543                handled();
544            }
545
546            @Override
547            public void failed(Throwable exc, WriteContext context) {
548                try {
549                    if (!(exc instanceof AsynchronousCloseException)) {
550                        channel.respond(new IOError(null, exc));
551                    }
552                } finally {
553                    handled();
554                }
555            }
556
557            @SuppressWarnings("PMD.AssignmentInOperand")
558            private void handled() {
559                synchronized (ioChannel) {
560                    if (--outstandingAsyncs == 0) {
561                        unregisterAsGenerator();
562                        ioChannel.notifyAll();
563                    }
564                }
565            }
566        }
567
568        /**
569         * Close.
570         *
571         * @param event the event
572         * @throws InterruptedException the interrupted exception
573         */
574        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
575            "PMD.EmptyCatchBlock" })
576        public void close(Event<?> event)
577                throws InterruptedException {
578            IOException ioExc = null;
579            try {
580                synchronized (ioChannel) {
581                    while (outstandingAsyncs > 0) {
582                        ioChannel.wait();
583                    }
584                    ioChannel.close();
585                }
586            } catch (ClosedChannelException e) {
587                // Can be ignored
588            } catch (IOException e) {
589                ioExc = e;
590            }
591            channel.respond(new Closed<Void>(ioExc));
592            inputWriters.remove(channel);
593            outputWriters.remove(channel);
594        }
595
596        /*
597         * (non-Javadoc)
598         * 
599         * @see java.lang.Object#toString()
600         */
601        @Override
602        public String toString() {
603            StringBuilder builder = new StringBuilder(50);
604            builder.append("FileConnection [");
605            if (channel != null) {
606                builder.append("channel=")
607                    .append(Channel.toString(channel))
608                    .append(", ");
609            }
610            if (path != null) {
611                builder.append("path=")
612                    .append(path)
613                    .append(", ");
614            }
615            builder.append("offset=")
616                .append(offset)
617                .append(']');
618            return builder.toString();
619        }
620
621    }
622
623    /*
624     * (non-Javadoc)
625     * 
626     * @see java.lang.Object#toString()
627     */
628    @Override
629    public String toString() {
630        StringBuilder builder = new StringBuilder();
631        builder.append(Components.objectName(this))
632            .append(" [");
633        if (inputWriters != null) {
634            builder.append(inputWriters.values().stream()
635                .map(chnl -> Components.objectName(chnl))
636                .collect(Collectors.toList()));
637        }
638        builder.append(']');
639        return builder.toString();
640    }
641}