001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.nio.channels.AsynchronousCloseException;
024import java.nio.channels.AsynchronousFileChannel;
025import java.nio.channels.ClosedChannelException;
026import java.nio.channels.CompletionHandler;
027import java.nio.channels.SeekableByteChannel;
028import java.nio.file.Files;
029import java.nio.file.OpenOption;
030import java.nio.file.Path;
031import java.nio.file.StandardOpenOption;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Map;
035import java.util.WeakHashMap;
036import java.util.stream.Collectors;
037import org.jgrapes.core.Channel;
038import org.jgrapes.core.Component;
039import org.jgrapes.core.Components;
040import org.jgrapes.core.Event;
041import org.jgrapes.core.annotation.Handler;
042import org.jgrapes.core.events.Stop;
043import org.jgrapes.io.events.Close;
044import org.jgrapes.io.events.Closed;
045import org.jgrapes.io.events.FileOpened;
046import org.jgrapes.io.events.IOError;
047import org.jgrapes.io.events.Input;
048import org.jgrapes.io.events.OpenFile;
049import org.jgrapes.io.events.Opening;
050import org.jgrapes.io.events.Output;
051import org.jgrapes.io.events.SaveInput;
052import org.jgrapes.io.events.SaveOutput;
053import org.jgrapes.io.events.StreamFile;
054import org.jgrapes.io.util.ManagedBuffer;
055import org.jgrapes.io.util.ManagedBufferPool;
056
057/**
058 * A component that reads from or writes to a file.
059 */
060@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" })
061public class FileStorage extends Component {
062
063    private int bufferSize;
064
065    @SuppressWarnings("PMD.UseConcurrentHashMap")
066    private final Map<Channel, Writer> inputWriters = Collections
067        .synchronizedMap(new WeakHashMap<>());
068    @SuppressWarnings("PMD.UseConcurrentHashMap")
069    private final Map<Channel, Writer> outputWriters = Collections
070        .synchronizedMap(new WeakHashMap<>());
071
072    /**
073     * Create a new instance using the given size for the read buffers.
074     * 
075     * @param channel the component's channel. Used for sending {@link Output}
076     * events and receiving {@link Input} events 
077     * @param bufferSize the size of the buffers used for reading
078     */
079    public FileStorage(Channel channel, int bufferSize) {
080        super(channel);
081        this.bufferSize = bufferSize;
082    }
083
084    /**
085     * Create a new instance using the default buffer size of 8192.
086     * 
087     * @param channel the component's channel. Used for sending {@link Output}
088     * events and receiving {@link Input} events 
089     */
090    public FileStorage(Channel channel) {
091        this(channel, 8192);
092    }
093
094    /**
095     * Opens a file for reading using the properties of the event and streams
096     * its content as a sequence of {@link Output} events with the 
097     * end of record flag set in the last event. All generated events are 
098     * considered responses to this event and therefore fired using the event 
099     * processor from the event's I/O subchannel.
100     * 
101     * @param event the event
102     * @throws InterruptedException if the execution was interrupted
103     */
104    @Handler
105    @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
106        "PMD.AccessorClassGeneration", "PMD.AvoidDuplicateLiterals" })
107    public void onStreamFile(StreamFile event)
108            throws InterruptedException {
109        if (Arrays.asList(event.options())
110            .contains(StandardOpenOption.WRITE)) {
111            throw new IllegalArgumentException(
112                "Cannot stream file opened for writing.");
113        }
114        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
115            if (inputWriters.containsKey(channel)) {
116                channel.respond(new IOError(event,
117                    new IllegalStateException("File is already open.")));
118            } else {
119                new FileStreamer(event, channel);
120            }
121        }
122    }
123
124    /**
125     * A file streamer.
126     */
127    private final class FileStreamer {
128
129        private final IOSubchannel channel;
130        private final Path path;
131        @SuppressWarnings("PMD.ImmutableField")
132        private AsynchronousFileChannel ioChannel;
133        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
134                ByteBuffer> ioBuffers;
135        private long offset;
136        private final CompletionHandler<Integer,
137                ManagedBuffer<ByteBuffer>> readCompletionHandler
138                    = new ReadCompletionHandler();
139
140        private FileStreamer(StreamFile event, IOSubchannel channel)
141                throws InterruptedException {
142            this.channel = channel;
143            path = event.path();
144            offset = 0;
145            try {
146                try {
147                    ioChannel = AsynchronousFileChannel
148                        .open(event.path(), event.options());
149                } catch (UnsupportedOperationException e) {
150                    runReaderThread(event);
151                    return;
152                }
153            } catch (IOException e) {
154                channel.respond(new IOError(event, e));
155                return;
156            }
157            registerAsGenerator();
158            // Reading from file
159            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
160                () -> {
161                    return ByteBuffer.allocateDirect(bufferSize);
162                }, 2);
163            ManagedBuffer<ByteBuffer> buffer = ioBuffers.acquire();
164            // (1) Opening, (2) FileOpened, (3) Output events
165            channel.respond(Event
166                .onCompletion(new Opening<OpenFile>().setResult(event), e -> {
167                    channel.respond(new FileOpened(event));
168                    // Start reading.
169                    synchronized (ioChannel) {
170                        ioChannel.read(buffer.backingBuffer(), offset, buffer,
171                            readCompletionHandler);
172                    }
173                }));
174        }
175
176        /**
177         * The read completion handler.
178         */
179        private final class ReadCompletionHandler implements
180                CompletionHandler<Integer, ManagedBuffer<ByteBuffer>> {
181            @Override
182            @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
183                "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals" })
184            public void completed(
185                    Integer result, ManagedBuffer<ByteBuffer> buffer) {
186                if (result >= 0) {
187                    offset += result;
188                    boolean eof = true;
189                    try {
190                        eof = offset == ioChannel.size();
191                    } catch (IOException e1) {
192                        // Handled like true
193                    }
194                    channel.respond(Output.fromSink(buffer, eof));
195                    if (!eof) {
196                        try {
197                            ManagedBuffer<ByteBuffer> nextBuffer
198                                = ioBuffers.acquire();
199                            nextBuffer.clear();
200                            synchronized (ioChannel) {
201                                ioChannel.read(nextBuffer.backingBuffer(),
202                                    offset,
203                                    nextBuffer, readCompletionHandler);
204                            }
205                        } catch (InterruptedException e) {
206                            // Results in empty buffer
207                        }
208                        return;
209                    }
210                }
211                IOException ioExc = null;
212                try {
213                    ioChannel.close();
214                } catch (ClosedChannelException e) {
215                    // Can be ignored
216                } catch (IOException e) {
217                    ioExc = e;
218                }
219                channel.respond(new Closed<Void>(ioExc));
220                unregisterAsGenerator();
221            }
222
223            @Override
224            public void failed(
225                    Throwable exc, ManagedBuffer<ByteBuffer> context) {
226                channel.respond(new Closed<Void>(exc));
227                unregisterAsGenerator();
228            }
229        }
230
231        /**
232         * Stream file that doesn't support asynchronous I/O.
233         * 
234         * @param event
235         * @throws IOException
236         */
237        private void runReaderThread(StreamFile event)
238                throws IOException {
239            ioBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
240                () -> {
241                    return ByteBuffer.allocateDirect(bufferSize);
242                }, 2);
243            @SuppressWarnings("PMD.CloseResource")
244            final SeekableByteChannel ioChannel
245                = Files.newByteChannel(event.path(), event.options());
246            activeEventPipeline().executorService().submit(new Runnable() {
247                @Override
248                @SuppressWarnings("PMD.EmptyCatchBlock")
249                public void run() {
250                    // Reading from file
251                    IOException ioExc = null;
252                    try {
253                        long size = ioChannel.size();
254                        while (ioChannel.position() < size) {
255                            ManagedBuffer<ByteBuffer> buffer
256                                = ioBuffers.acquire();
257                            buffer.fillFromChannel(ioChannel);
258                            channel.respond(Output.fromSink(buffer,
259                                ioChannel.position() == size));
260                        }
261                        ioChannel.close();
262                    } catch (InterruptedException e) {
263                        return;
264                    } catch (ClosedChannelException e) {
265                        // Can be ignored
266                    } catch (IOException e) {
267                        ioExc = e;
268                    }
269                    channel.respond(new Closed<Void>(ioExc));
270                }
271            });
272        }
273
274        /*
275         * (non-Javadoc)
276         * 
277         * @see java.lang.Object#toString()
278         */
279        @Override
280        public String toString() {
281            StringBuilder builder = new StringBuilder(50);
282            builder.append("FileStreamer [");
283            if (channel != null) {
284                builder.append("channel=");
285                builder.append(Channel.toString(channel));
286                builder.append(", ");
287            }
288            if (path != null) {
289                builder.append("path=").append(path).append(", ");
290            }
291            builder.append("offset=")
292                .append(offset)
293                .append(']');
294            return builder.toString();
295        }
296
297    }
298
299    /**
300     * Opens a file for writing using the properties of the event. All data from
301     * subsequent {@link Input} events is written to the file.
302     * The end of record flag is ignored.
303     * 
304     * @param event the event
305     * @throws InterruptedException if the execution was interrupted
306     */
307    @Handler
308    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
309    public void onSaveInput(SaveInput event) throws InterruptedException {
310        if (!Arrays.asList(event.options())
311            .contains(StandardOpenOption.WRITE)) {
312            throw new IllegalArgumentException(
313                "File must be opened for writing.");
314        }
315        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
316            if (inputWriters.containsKey(channel)) {
317                channel.respond(new IOError(event,
318                    new IllegalStateException("File is already open.")));
319            } else {
320                new Writer(event, channel);
321            }
322        }
323    }
324
325    /**
326     * Handle input by writing it to the file, if a channel exists.
327     *
328     * @param event the event
329     * @param channel the channel
330     */
331    @Handler
332    public void onInput(Input<ByteBuffer> event, Channel channel) {
333        Writer writer = inputWriters.get(channel);
334        if (writer != null) {
335            writer.write(event.buffer());
336        }
337    }
338
339    /**
340     * Opens a file for writing using the properties of the event. All data from
341     * subsequent {@link Output} events is written to the file. 
342     * The end of record flag is ignored.
343     * 
344     * @param event the event
345     * @throws InterruptedException if the execution was interrupted
346     */
347    @Handler
348    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
349    public void onSaveOutput(SaveOutput event) throws InterruptedException {
350        if (!Arrays.asList(event.options())
351            .contains(StandardOpenOption.WRITE)) {
352            throw new IllegalArgumentException(
353                "File must be opened for writing.");
354        }
355        for (IOSubchannel channel : event.channels(IOSubchannel.class)) {
356            if (outputWriters.containsKey(channel)) {
357                channel.respond(new IOError(event,
358                    new IllegalStateException("File is already open.")));
359            } else {
360                new Writer(event, channel);
361            }
362        }
363    }
364
365    /**
366     * Handle {@link Output} events by writing them to the file, if
367     * a channel exists.
368     *
369     * @param event the event
370     * @param channel the channel
371     */
372    @Handler
373    public void onOutput(Output<ByteBuffer> event, Channel channel) {
374        Writer writer = outputWriters.get(channel);
375        if (writer != null) {
376            writer.write(event.buffer());
377        }
378    }
379
380    /**
381     * Handle close by closing the file associated with the channel.
382     *
383     * @param event the event
384     * @param channel the channel
385     * @throws InterruptedException the interrupted exception
386     */
387    @Handler
388    public void onClose(Close event, Channel channel)
389            throws InterruptedException {
390        Writer writer = inputWriters.get(channel);
391        if (writer != null) {
392            writer.close(event);
393        }
394        writer = outputWriters.get(channel);
395        if (writer != null) {
396            writer.close(event);
397        }
398    }
399
400    /**
401     * Handle stop by closing all files.
402     *
403     * @param event the event
404     * @throws InterruptedException the interrupted exception
405     */
406    @Handler(priority = -1000)
407    public void onStop(Stop event) throws InterruptedException {
408        while (!inputWriters.isEmpty()) {
409            Writer handler = inputWriters.entrySet().iterator().next()
410                .getValue();
411            handler.close(event);
412        }
413        while (!outputWriters.isEmpty()) {
414            Writer handler = outputWriters.entrySet().iterator().next()
415                .getValue();
416            handler.close(event);
417        }
418    }
419
420    /**
421     * A writer.
422     */
423    private class Writer {
424
425        private final IOSubchannel channel;
426        private Path path;
427        private AsynchronousFileChannel ioChannel;
428        private long offset;
429        private final CompletionHandler<Integer,
430                WriteContext> writeCompletionHandler
431                    = new WriteCompletionHandler();
432        private int outstandingAsyncs;
433
434        /**
435         * The write context needs to be finer grained than the general file
436         * connection context because an asynchronous write may be only
437         * partially successful, i.e. not all data provided by the write event
438         * may successfully be written in one asynchronous write invocation.
439         */
440        private class WriteContext {
441            public final ManagedBuffer<ByteBuffer>.ByteBufferView reader;
442            public final long pos;
443
444            /**
445             * Instantiates a new write context.
446             *
447             * @param reader the reader
448             * @param pos the pos
449             */
450            public WriteContext(
451                    ManagedBuffer<ByteBuffer>.ByteBufferView reader, long pos) {
452                this.reader = reader;
453                this.pos = pos;
454            }
455        }
456
457        /**
458         * Instantiates a new writer.
459         *
460         * @param event the event
461         * @param channel the channel
462         * @throws InterruptedException the interrupted exception
463         */
464        public Writer(SaveInput event, IOSubchannel channel)
465                throws InterruptedException {
466            this(event, event.path(), event.options(), channel);
467            inputWriters.put(channel, this);
468            channel.respond(new FileOpened(event));
469        }
470
471        /**
472         * Instantiates a new writer.
473         *
474         * @param event the event
475         * @param channel the channel
476         * @throws InterruptedException the interrupted exception
477         */
478        public Writer(SaveOutput event, IOSubchannel channel)
479                throws InterruptedException {
480            this(event, event.path(), event.options(), channel);
481            outputWriters.put(channel, this);
482            channel.respond(new FileOpened(event));
483        }
484
485        private Writer(Event<?> event, Path path, OpenOption[] options,
486                IOSubchannel channel) throws InterruptedException {
487            this.channel = channel;
488            this.path = path;
489            offset = 0;
490            try {
491                ioChannel = AsynchronousFileChannel.open(path, options);
492            } catch (IOException e) {
493                channel.respond(new IOError(event, e));
494            }
495        }
496
497        /**
498         * Write the buffer.
499         *
500         * @param buffer the buffer
501         */
502        public void write(ManagedBuffer<ByteBuffer> buffer) {
503            int written = buffer.remaining();
504            if (written == 0) {
505                return;
506            }
507            buffer.lockBuffer();
508            synchronized (ioChannel) {
509                if (outstandingAsyncs == 0) {
510                    registerAsGenerator();
511                }
512                outstandingAsyncs += 1;
513                ManagedBuffer<ByteBuffer>.ByteBufferView reader
514                    = buffer.newByteBufferView();
515                ioChannel.write(reader.get(), offset,
516                    new WriteContext(reader, offset),
517                    writeCompletionHandler);
518            }
519            offset += written;
520        }
521
522        /**
523         * A write completion handler.
524         */
525        private final class WriteCompletionHandler
526                implements CompletionHandler<Integer, WriteContext> {
527
528            @Override
529            public void completed(Integer result, WriteContext context) {
530                ManagedBuffer<ByteBuffer>.ByteBufferView reader
531                    = context.reader;
532                if (reader.get().hasRemaining()) {
533                    ioChannel.write(reader.get(),
534                        context.pos + reader.get().position(),
535                        context, writeCompletionHandler);
536                    return;
537                }
538                reader.managedBuffer().unlockBuffer();
539                handled();
540            }
541
542            @Override
543            public void failed(Throwable exc, WriteContext context) {
544                try {
545                    if (!(exc instanceof AsynchronousCloseException)) {
546                        channel.respond(new IOError(null, exc));
547                    }
548                } finally {
549                    handled();
550                }
551            }
552
553            @SuppressWarnings("PMD.AssignmentInOperand")
554            private void handled() {
555                synchronized (ioChannel) {
556                    if (--outstandingAsyncs == 0) {
557                        unregisterAsGenerator();
558                        ioChannel.notifyAll();
559                    }
560                }
561            }
562        }
563
564        /**
565         * Close.
566         *
567         * @param event the event
568         * @throws InterruptedException the interrupted exception
569         */
570        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
571            "PMD.EmptyCatchBlock" })
572        public void close(Event<?> event)
573                throws InterruptedException {
574            IOException ioExc = null;
575            try {
576                synchronized (ioChannel) {
577                    while (outstandingAsyncs > 0) {
578                        ioChannel.wait();
579                    }
580                    ioChannel.close();
581                }
582            } catch (ClosedChannelException e) {
583                // Can be ignored
584            } catch (IOException e) {
585                ioExc = e;
586            }
587            channel.respond(new Closed<Void>(ioExc));
588            inputWriters.remove(channel);
589            outputWriters.remove(channel);
590        }
591
592        /*
593         * (non-Javadoc)
594         * 
595         * @see java.lang.Object#toString()
596         */
597        @Override
598        public String toString() {
599            StringBuilder builder = new StringBuilder(50);
600            builder.append("FileConnection [");
601            if (channel != null) {
602                builder.append("channel=")
603                    .append(Channel.toString(channel))
604                    .append(", ");
605            }
606            if (path != null) {
607                builder.append("path=")
608                    .append(path)
609                    .append(", ");
610            }
611            builder.append("offset=")
612                .append(offset)
613                .append(']');
614            return builder.toString();
615        }
616
617    }
618
619    /*
620     * (non-Javadoc)
621     * 
622     * @see java.lang.Object#toString()
623     */
624    @Override
625    public String toString() {
626        StringBuilder builder = new StringBuilder();
627        builder.append(Components.objectName(this))
628            .append(" [");
629        if (inputWriters != null) {
630            builder.append(inputWriters.values().stream()
631                .map(chnl -> Components.objectName(chnl))
632                .collect(Collectors.toList()));
633        }
634        builder.append(']');
635        return builder.toString();
636    }
637}