001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.nio.ByteBuffer;
022import java.nio.CharBuffer;
023import org.jgrapes.core.Channel;
024import org.jgrapes.core.Component;
025import org.jgrapes.core.Components;
026import org.jgrapes.core.Event;
027import org.jgrapes.core.EventPipeline;
028import org.jgrapes.core.Subchannel;
029import org.jgrapes.io.util.ManagedBuffer;
030import org.jgrapes.io.util.ManagedBufferPool;
031
032/**
033 * Represents a subchannel for grouping input and output events related
034 * to an I/O resource such as an opened file or a network connection.
035 * 
036 * An I/O subchannel has an initiator that creates and manages the subchannel.
037 * Events fired by the initiator are said to flow downstream on the channel.
038 * Events fired by components in response are said to flow upstream.
039 * 
040 * Upstream and downstream events are usually handled by two different pipelines
041 * managed by the initiator. One pipeline, accessible only to the initiator,
042 * handles the downstream events. The other, made available as a property of the
043 * I/O subchannel (see {@link #responsePipeline()} and {@link #respond(Event)}), 
044 * handles the upstream events. Of course, any pipeline can be
045 * used to send events upstream to the initiator component. However, using
046 * arbitrary pipelines holds the risk that events aren't delivered in the
047 * intended order.
048 * 
049 * An I/O subchannel also provides associated buffer pools for byte buffers
050 * and character buffers. Buffers used in responses (upstream events)
051 * should be acquired from these pools only. The initiator should initialize 
052 * the pools in such a way that it suits its needs.
053 */
054public interface IOSubchannel extends Subchannel {
055
056    /**
057     * Gets the {@link EventPipeline} that can be used for events going back to
058     * the initiator of this connection. Consistently using this event pipeline
059     * for response events ensures that the events are written in proper
060     * sequence.
061     * 
062     * @return the event pipeline
063     */
064    EventPipeline responsePipeline();
065
066    /**
067     * Get the subchannel's byte buffer pool.
068     * 
069     * @return the buffer pool
070     */
071    ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
072            byteBufferPool();
073
074    /**
075     * Get the subchannel's char buffer pool.
076     * 
077     * @return the buffer pool
078     */
079    ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
080            charBufferPool();
081
082    /**
083     * Fires the given event on this subchannel using the subchannel's response
084     * pipeline. Effectively, {@code fire(someEvent)} is a shortcut for
085     * {@code getResponsePipeline.add(someEvent, this)}.
086     * 
087     * @param <T> the event's type
088     * @param event
089     *            the event to fire
090     * @return the event (for easy chaining)
091     */
092    default <T extends Event<?>> T respond(T event) {
093        return responsePipeline().fire(event, this);
094    }
095
096    /**
097     * Creates a new subchannel of the given component's channel with the
098     * given event pipeline and a buffer pool with two buffers sized 4096.
099     *
100     * @param component the component used to get the main channel
101     * @param responsePipeline the response pipeline
102     * @return the subchannel
103     */
104    static IOSubchannel create(
105            Component component, EventPipeline responsePipeline) {
106        return new DefaultIOSubchannel(component.channel(), responsePipeline);
107    }
108
109    /**
110     * A simple implementation of {@link IOSubchannel}.
111     */
112    @SuppressWarnings("PMD.DataClass")
113    class DefaultIOSubchannel extends Subchannel.DefaultSubchannel
114            implements IOSubchannel {
115        private final EventPipeline responsePipeline;
116        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
117                ByteBuffer> byteBufferPool;
118        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
119                CharBuffer> charBufferPool;
120
121        /**
122         * Creates a new instance with the given main channel and response
123         * pipeline.  
124         * 
125         * @param mainChannel the main channel
126         * @param responsePipeline the response pipeline to use
127         * 
128         */
129        public DefaultIOSubchannel(
130                Channel mainChannel, EventPipeline responsePipeline) {
131            super(mainChannel);
132            this.responsePipeline = responsePipeline;
133        }
134
135        protected void setByteBufferPool(
136                ManagedBufferPool<ManagedBuffer<ByteBuffer>,
137                        ByteBuffer> bufferPool) {
138            this.byteBufferPool = bufferPool;
139        }
140
141        protected void setCharBufferPool(
142                ManagedBufferPool<ManagedBuffer<CharBuffer>,
143                        CharBuffer> bufferPool) {
144            this.charBufferPool = bufferPool;
145        }
146
147        /*
148         * (non-Javadoc)
149         * 
150         * @see org.jgrapes.io.IOSubchannel#responsePipeline()
151         */
152        @Override
153        public EventPipeline responsePipeline() {
154            return responsePipeline;
155        }
156
157        /**
158         * Returns the buffer pool set. If no buffer pool has been set, a
159         * buffer pool with with two buffers of size 4096 is created.
160         */
161        @Override
162        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
163                byteBufferPool() {
164            if (byteBufferPool == null) {
165                byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
166                    () -> {
167                        return ByteBuffer.allocate(4096);
168                    }, 2)
169                        .setName(Components.objectName(this)
170                            + ".upstream.byteBuffers");
171            }
172            return byteBufferPool;
173        }
174
175        /**
176         * Returns the buffer pool set. If no buffer pool has been set, a
177         * buffer pool with with two buffers of size 4096 is created.
178         */
179        @Override
180        public ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
181                charBufferPool() {
182            if (charBufferPool == null) {
183                charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
184                    () -> {
185                        return CharBuffer.allocate(4096);
186                    }, 2)
187                        .setName(Components.objectName(this)
188                            + ".upstream.charBuffers");
189            }
190            return charBufferPool;
191        }
192
193    }
194}