001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.nio.channels.AsynchronousCloseException;
024import java.nio.channels.AsynchronousFileChannel;
025import java.nio.channels.ClosedChannelException;
026import java.nio.channels.CompletionHandler;
027import java.nio.channels.SeekableByteChannel;
028import java.nio.file.Files;
029import java.nio.file.OpenOption;
030import java.nio.file.Path;
031import java.nio.file.StandardOpenOption;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Map;
035import java.util.WeakHashMap;
036import java.util.stream.Collectors;
037
038import org.jgrapes.core.Channel;
039import org.jgrapes.core.Component;
040import org.jgrapes.core.Components;
041import org.jgrapes.core.Event;
042import org.jgrapes.core.annotation.Handler;
043import org.jgrapes.core.events.Stop;
044import org.jgrapes.io.events.Close;
045import org.jgrapes.io.events.Closed;
046import org.jgrapes.io.events.FileOpened;
047import org.jgrapes.io.events.IOError;
048import org.jgrapes.io.events.Input;
049import org.jgrapes.io.events.Output;
050import org.jgrapes.io.events.SaveInput;
051import org.jgrapes.io.events.SaveOutput;
052import org.jgrapes.io.events.StreamFile;
053import org.jgrapes.io.util.ManagedBuffer;
054import org.jgrapes.io.util.ManagedBufferPool;
055
056/**
057 * A component that reads from or writes to a file.
058 */
059@SuppressWarnings("PMD.ExcessiveImports")
060public class FileStorage extends Component {
061
062    private int bufferSize;
063
064    @SuppressWarnings("PMD.UseConcurrentHashMap")
065    private final Map<Channel, Writer> inputWriters = Collections
066        .synchronizedMap(new WeakHashMap<>());
067    @SuppressWarnings("PMD.UseConcurrentHashMap")
068    private final Map<Channel, Writer> outputWriters = Collections
069        .synchronizedMap(new WeakHashMap<>());
070
071    /**
072     * Create a new instance using the given size for the read buffers.
073     * 
074     * @param channel the component's channel. Used for sending {@link Output}
075     * events and receiving {@link Input} events 
076     * @param bufferSize the size of the buffers used for reading
077     */
078    public FileStorage(Channel channel, int bufferSize) {
079        super(channel);
080        this.bufferSize = bufferSize;
081    }
082
083    /**
084     * Create a new instance using the default buffer size of 8192.
085     * 
086     * @param channel the component's channel. Used for sending {@link Output}
087     * events and receiving {@link Input} events 
088     */
089    public FileStorage(Channel channel) {
090        this(channel, 8192);
091    }
092
093    /**
094     * Opens a file for reading using the properties of the event and streams
095     * its content as a sequence of {@link Output} events with the 
096     * end of record flag set in the last event. All generated events are 
097     * considered responses to this event and therefore fired using the event 
098     * processor from the event's I/O subchannel.
099     * 
100     * @param event the event
101     * @throws InterruptedException if the execution was interrupted
102     */
103    @Handler
104    @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
105        "PMD.AccessorClassGeneration", "PMD.AvoidDuplicateLiterals" })
106    public void onStreamFile(StreamFile event)
107            throws InterruptedException {
108        if (Arrays.asList(event.options())
109            .contains(StandardOpenOption.WRITE)) {
110            throw new IllegalArgumentException(
111                "Cannot stream file opened for writing.");
112        }
113        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
114            if (inputWriters.containsKey(channel)) {
115                channel.respond(new IOError(event,
116                    new IllegalStateException("File is already open.")));
117            } else {
118                new FileStreamer(event, channel);
119            }
120        }
121    }
122
123    /**
124     * A file streamer.
125     */
126    private class FileStreamer {
127
128        private final IOSubchannel channel;
129        private final Path path;
130        private AsynchronousFileChannel ioChannel;
131        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
132                ByteBuffer> ioBuffers;
133        private long offset;
134        private final CompletionHandler<Integer,
135                ManagedBuffer<ByteBuffer>> readCompletionHandler
136                    = new ReadCompletionHandler();
137
138        private FileStreamer(StreamFile event, IOSubchannel channel)
139                throws InterruptedException {
140            this.channel = channel;
141            path = event.path();
142            offset = 0;
143            try {
144                try {
145                    ioChannel = AsynchronousFileChannel
146                        .open(event.path(), event.options());
147                } catch (UnsupportedOperationException e) {
148                    runReaderThread(event);
149                    return;
150                }
151            } catch (IOException e) {
152                channel.respond(new IOError(event, e));
153                return;
154            }
155            registerAsGenerator();
156            channel.respond(new FileOpened(event.path(), event.options()));
157            // Reading from file
158            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
159                () -> {
160                    return ByteBuffer.allocateDirect(bufferSize);
161                }, 2);
162            ManagedBuffer<ByteBuffer> buffer = ioBuffers.acquire();
163            synchronized (ioChannel) {
164                ioChannel.read(buffer.backingBuffer(), offset, buffer,
165                    readCompletionHandler);
166            }
167        }
168
169        /**
170         * The read completion handler.
171         */
172        private class ReadCompletionHandler
173                implements
174                CompletionHandler<Integer, ManagedBuffer<ByteBuffer>> {
175
176            @Override
177            @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
178                "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals" })
179            public void completed(
180                    Integer result, ManagedBuffer<ByteBuffer> buffer) {
181                if (result >= 0) {
182                    offset += result;
183                    boolean eof = true;
184                    try {
185                        eof = offset == ioChannel.size();
186                    } catch (IOException e1) {
187                        // Handled like true
188                    }
189                    channel.respond(Output.fromSink(buffer, eof));
190                    if (!eof) {
191                        try {
192                            ManagedBuffer<ByteBuffer> nextBuffer
193                                = ioBuffers.acquire();
194                            nextBuffer.clear();
195                            synchronized (ioChannel) {
196                                ioChannel.read(nextBuffer.backingBuffer(),
197                                    offset,
198                                    nextBuffer, readCompletionHandler);
199                            }
200                        } catch (InterruptedException e) {
201                            // Results in empty buffer
202                        }
203                        return;
204                    }
205                }
206                IOException ioExc = null;
207                try {
208                    ioChannel.close();
209                } catch (ClosedChannelException e) {
210                    // Can be ignored
211                } catch (IOException e) {
212                    ioExc = e;
213                }
214                channel.respond(new Closed(ioExc));
215                unregisterAsGenerator();
216            }
217
218            @Override
219            public void failed(
220                    Throwable exc, ManagedBuffer<ByteBuffer> context) {
221                channel.respond(new Closed(exc));
222                unregisterAsGenerator();
223            }
224        }
225
226        /**
227         * Stream file that doesn't support asynchronous I/O.
228         * 
229         * @param event
230         * @throws IOException
231         */
232        private void runReaderThread(StreamFile event)
233                throws IOException {
234            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
235                () -> {
236                    return ByteBuffer.allocateDirect(bufferSize);
237                }, 2);
238            final SeekableByteChannel ioChannel
239                = Files.newByteChannel(event.path(), event.options());
240            activeEventPipeline().executorService().submit(new Runnable() {
241                @Override
242                @SuppressWarnings("PMD.EmptyCatchBlock")
243                public void run() {
244                    // Reading from file
245                    IOException ioExc = null;
246                    try {
247                        long size = ioChannel.size();
248                        while (ioChannel.position() < size) {
249                            ManagedBuffer<ByteBuffer> buffer
250                                = ioBuffers.acquire();
251                            buffer.fillFromChannel(ioChannel);
252                            channel.respond(Output.fromSink(buffer,
253                                ioChannel.position() == size));
254                        }
255                        ioChannel.close();
256                    } catch (InterruptedException e) {
257                        return;
258                    } catch (ClosedChannelException e) {
259                        // Can be ignored
260                    } catch (IOException e) {
261                        ioExc = e;
262                    }
263                    channel.respond(new Closed(ioExc));
264                }
265            });
266        }
267
268        /*
269         * (non-Javadoc)
270         * 
271         * @see java.lang.Object#toString()
272         */
273        @Override
274        public String toString() {
275            StringBuilder builder = new StringBuilder(50);
276            builder.append("FileStreamer [");
277            if (channel != null) {
278                builder.append("channel=");
279                builder.append(Channel.toString(channel));
280                builder.append(", ");
281            }
282            if (path != null) {
283                builder.append("path=");
284                builder.append(path);
285                builder.append(", ");
286            }
287            builder.append("offset=")
288                .append(offset)
289                .append(']');
290            return builder.toString();
291        }
292
293    }
294
295    /**
296     * Opens a file for writing using the properties of the event. All data from
297     * subsequent {@link Input} events is written to the file.
298     * The end of record flag is ignored.
299     * 
300     * @param event the event
301     * @throws InterruptedException if the execution was interrupted
302     */
303    @Handler
304    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
305    public void onSaveInput(SaveInput event) throws InterruptedException {
306        if (!Arrays.asList(event.options())
307            .contains(StandardOpenOption.WRITE)) {
308            throw new IllegalArgumentException(
309                "File must be opened for writing.");
310        }
311        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
312            if (inputWriters.containsKey(channel)) {
313                channel.respond(new IOError(event,
314                    new IllegalStateException("File is already open.")));
315            } else {
316                new Writer(event, channel);
317            }
318        }
319    }
320
321    /**
322     * Handle input by writing it to the file, if a channel exists.
323     *
324     * @param event the event
325     * @param channel the channel
326     */
327    @Handler
328    public void onInput(Input<ByteBuffer> event, Channel channel) {
329        Writer writer = inputWriters.get(channel);
330        if (writer != null) {
331            writer.write(event.buffer());
332        }
333    }
334
335    /**
336     * Opens a file for writing using the properties of the event. All data from
337     * subsequent {@link Output} events is written to the file. 
338     * The end of record flag is ignored.
339     * 
340     * @param event the event
341     * @throws InterruptedException if the execution was interrupted
342     */
343    @Handler
344    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
345    public void onSaveOutput(SaveOutput event) throws InterruptedException {
346        if (!Arrays.asList(event.options())
347            .contains(StandardOpenOption.WRITE)) {
348            throw new IllegalArgumentException(
349                "File must be opened for writing.");
350        }
351        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
352            if (outputWriters.containsKey(channel)) {
353                channel.respond(new IOError(event,
354                    new IllegalStateException("File is already open.")));
355            } else {
356                new Writer(event, channel);
357            }
358        }
359    }
360
361    /**
362     * Handle {@link Output} events by writing them to the file, if
363     * a channel exists.
364     *
365     * @param event the event
366     * @param channel the channel
367     */
368    @Handler
369    public void onOutput(Output<ByteBuffer> event, Channel channel) {
370        Writer writer = outputWriters.get(channel);
371        if (writer != null) {
372            writer.write(event.buffer());
373        }
374    }
375
376    /**
377     * Handle close by closing the file associated with the channel.
378     *
379     * @param event the event
380     * @param channel the channel
381     * @throws InterruptedException the interrupted exception
382     */
383    @Handler
384    public void onClose(Close event, Channel channel)
385            throws InterruptedException {
386        Writer writer = inputWriters.get(channel);
387        if (writer != null) {
388            writer.close(event);
389        }
390        writer = outputWriters.get(channel);
391        if (writer != null) {
392            writer.close(event);
393        }
394    }
395
396    /**
397     * Handle stop by closing all files.
398     *
399     * @param event the event
400     * @throws InterruptedException the interrupted exception
401     */
402    @Handler(priority = -1000)
403    public void onStop(Stop event) throws InterruptedException {
404        while (!inputWriters.isEmpty()) {
405            Writer handler = inputWriters.entrySet().iterator().next()
406                .getValue();
407            handler.close(event);
408        }
409        while (!outputWriters.isEmpty()) {
410            Writer handler = outputWriters.entrySet().iterator().next()
411                .getValue();
412            handler.close(event);
413        }
414    }
415
416    /**
417     * A writer.
418     */
419    private class Writer {
420
421        private final IOSubchannel channel;
422        private Path path;
423        private AsynchronousFileChannel ioChannel;
424        private long offset;
425        private final CompletionHandler<Integer,
426                WriteContext> writeCompletionHandler
427                    = new WriteCompletionHandler();
428        private int outstandingAsyncs;
429
430        /**
431         * The write context needs to be finer grained than the general file
432         * connection context because an asynchronous write may be only
433         * partially successful, i.e. not all data provided by the write event
434         * may successfully be written in one asynchronous write invocation.
435         */
436        private class WriteContext {
437            public ManagedBuffer<ByteBuffer>.ByteBufferView reader;
438            public long pos;
439
440            /**
441             * Instantiates a new write context.
442             *
443             * @param reader the reader
444             * @param pos the pos
445             */
446            public WriteContext(
447                    ManagedBuffer<ByteBuffer>.ByteBufferView reader, long pos) {
448                this.reader = reader;
449                this.pos = pos;
450            }
451        }
452
453        /**
454         * Instantiates a new writer.
455         *
456         * @param event the event
457         * @param channel the channel
458         * @throws InterruptedException the interrupted exception
459         */
460        public Writer(SaveInput event, IOSubchannel channel)
461                throws InterruptedException {
462            this(event, event.path(), event.options(), channel);
463            inputWriters.put(channel, this);
464            channel.respond(new FileOpened(path, event.options()));
465        }
466
467        /**
468         * Instantiates a new writer.
469         *
470         * @param event the event
471         * @param channel the channel
472         * @throws InterruptedException the interrupted exception
473         */
474        public Writer(SaveOutput event, IOSubchannel channel)
475                throws InterruptedException {
476            this(event, event.path(), event.options(), channel);
477            outputWriters.put(channel, this);
478            channel.respond(new FileOpened(path, event.options()));
479        }
480
481        private Writer(Event<?> event, Path path, OpenOption[] options,
482                IOSubchannel channel) throws InterruptedException {
483            this.channel = channel;
484            this.path = path;
485            offset = 0;
486            try {
487                ioChannel = AsynchronousFileChannel.open(path, options);
488            } catch (IOException e) {
489                channel.respond(new IOError(event, e));
490                return;
491            }
492        }
493
494        /**
495         * Write the buffer.
496         *
497         * @param buffer the buffer
498         */
499        public void write(ManagedBuffer<ByteBuffer> buffer) {
500            int written = buffer.remaining();
501            if (written == 0) {
502                return;
503            }
504            buffer.lockBuffer();
505            synchronized (ioChannel) {
506                if (outstandingAsyncs == 0) {
507                    registerAsGenerator();
508                }
509                outstandingAsyncs += 1;
510                ManagedBuffer<ByteBuffer>.ByteBufferView reader
511                    = buffer.newByteBufferView();
512                ioChannel.write(reader.get(), offset,
513                    new WriteContext(reader, offset),
514                    writeCompletionHandler);
515            }
516            offset += written;
517        }
518
519        /**
520         * A write completion handler.
521         */
522        private class WriteCompletionHandler
523                implements CompletionHandler<Integer, WriteContext> {
524
525            @Override
526            public void completed(Integer result, WriteContext context) {
527                ManagedBuffer<ByteBuffer>.ByteBufferView reader
528                    = context.reader;
529                if (reader.get().hasRemaining()) {
530                    ioChannel.write(reader.get(),
531                        context.pos + reader.get().position(),
532                        context, writeCompletionHandler);
533                    return;
534                }
535                reader.managedBuffer().unlockBuffer();
536                handled();
537            }
538
539            @Override
540            public void failed(Throwable exc, WriteContext context) {
541                try {
542                    if (!(exc instanceof AsynchronousCloseException)) {
543                        channel.respond(new IOError(null, exc));
544                    }
545                } finally {
546                    handled();
547                }
548            }
549
550            @SuppressWarnings("PMD.AssignmentInOperand")
551            private void handled() {
552                synchronized (ioChannel) {
553                    if (--outstandingAsyncs == 0) {
554                        unregisterAsGenerator();
555                        ioChannel.notifyAll();
556                    }
557                }
558            }
559        }
560
561        /**
562         * Close.
563         *
564         * @param event the event
565         * @throws InterruptedException the interrupted exception
566         */
567        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
568            "PMD.EmptyCatchBlock" })
569        public void close(Event<?> event)
570                throws InterruptedException {
571            IOException ioExc = null;
572            try {
573                synchronized (ioChannel) {
574                    while (outstandingAsyncs > 0) {
575                        ioChannel.wait();
576                    }
577                    ioChannel.close();
578                }
579            } catch (ClosedChannelException e) {
580                // Can be ignored
581            } catch (IOException e) {
582                ioExc = e;
583            }
584            channel.respond(new Closed(ioExc));
585            inputWriters.remove(channel);
586            outputWriters.remove(channel);
587        }
588
589        /*
590         * (non-Javadoc)
591         * 
592         * @see java.lang.Object#toString()
593         */
594        @Override
595        public String toString() {
596            StringBuilder builder = new StringBuilder(50);
597            builder.append("FileConnection [");
598            if (channel != null) {
599                builder.append("channel=")
600                    .append(Channel.toString(channel))
601                    .append(", ");
602            }
603            if (path != null) {
604                builder.append("path=")
605                    .append(path)
606                    .append(", ");
607            }
608            builder.append("offset=")
609                .append(offset)
610                .append(']');
611            return builder.toString();
612        }
613
614    }
615
616    /*
617     * (non-Javadoc)
618     * 
619     * @see java.lang.Object#toString()
620     */
621    @Override
622    public String toString() {
623        StringBuilder builder = new StringBuilder();
624        builder.append(Components.objectName(this))
625            .append(" [");
626        if (inputWriters != null) {
627            builder.append(inputWriters.values().stream()
628                .map(chnl -> Components.objectName(chnl))
629                .collect(Collectors.toList()));
630        }
631        builder.append(']');
632        return builder.toString();
633    }
634}