001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io; 020 021import java.nio.ByteBuffer; 022import java.nio.CharBuffer; 023import org.jgrapes.core.Channel; 024import org.jgrapes.core.Component; 025import org.jgrapes.core.Components; 026import org.jgrapes.core.Event; 027import org.jgrapes.core.EventPipeline; 028import org.jgrapes.core.Subchannel; 029import org.jgrapes.io.util.ManagedBuffer; 030import org.jgrapes.io.util.ManagedBufferPool; 031 032/** 033 * Represents a subchannel for grouping input and output events related 034 * to an I/O resource such as an opened file or a network connection. 035 * 036 * An I/O subchannel has an initiator that creates and manages the subchannel. 037 * Events fired by the initiator are said to flow downstream on the channel. 038 * Events fired by components in response are said to flow upstream. 039 * 040 * Upstream and downstream events are usually handled by two different pipelines 041 * managed by the initiator. One pipeline, accessible only to the initiator, 042 * handles the downstream events. The other, made available as a property of the 043 * I/O subchannel (see {@link #responsePipeline()} and {@link #respond(Event)}), 044 * handles the upstream events. Of course, any pipeline can be 045 * used to send events upstream to the initiator component. However, using 046 * arbitrary pipelines holds the risk that events aren't delivered in the 047 * intended order. 048 * 049 * An I/O subchannel also provides associated buffer pools for byte buffers 050 * and character buffers. Buffers used in responses (upstream events) 051 * should be acquired from these pools only. The initiator should initialize 052 * the pools in such a way that it suits its needs. 053 */ 054public interface IOSubchannel extends Subchannel { 055 056 /** 057 * Gets the {@link EventPipeline} that can be used for events going back to 058 * the initiator of this connection. Consistently using this event pipeline 059 * for response events ensures that the events are written in proper 060 * sequence. 061 * 062 * @return the event pipeline 063 */ 064 EventPipeline responsePipeline(); 065 066 /** 067 * Get the subchannel's byte buffer pool. 068 * 069 * @return the buffer pool 070 */ 071 ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> 072 byteBufferPool(); 073 074 /** 075 * Get the subchannel's char buffer pool. 076 * 077 * @return the buffer pool 078 */ 079 ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer> 080 charBufferPool(); 081 082 /** 083 * Fires the given event on this subchannel using the subchannel's response 084 * pipeline. Effectively, {@code fire(someEvent)} is a shortcut for 085 * {@code getResponsePipeline.add(someEvent, this)}. 086 * 087 * @param <T> the event's type 088 * @param event 089 * the event to fire 090 * @return the event (for easy chaining) 091 */ 092 default <T extends Event<?>> T respond(T event) { 093 return responsePipeline().fire(event, this); 094 } 095 096 /** 097 * Creates a new subchannel of the given component's channel with the 098 * given event pipeline and a buffer pool with two buffers sized 4096. 099 * 100 * @param component the component used to get the main channel 101 * @param responsePipeline the response pipeline 102 * @return the subchannel 103 */ 104 static IOSubchannel create( 105 Component component, EventPipeline responsePipeline) { 106 return new DefaultIOSubchannel(component.channel(), responsePipeline); 107 } 108 109 /** 110 * A simple implementation of {@link IOSubchannel}. 111 */ 112 @SuppressWarnings("PMD.DataClass") 113 class DefaultIOSubchannel extends Subchannel.DefaultSubchannel 114 implements IOSubchannel { 115 private final EventPipeline responsePipeline; 116 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, 117 ByteBuffer> byteBufferPool; 118 private ManagedBufferPool<ManagedBuffer<CharBuffer>, 119 CharBuffer> charBufferPool; 120 121 /** 122 * Creates a new instance with the given main channel and response 123 * pipeline. 124 * 125 * @param mainChannel the main channel 126 * @param responsePipeline the response pipeline to use 127 * 128 */ 129 public DefaultIOSubchannel( 130 Channel mainChannel, EventPipeline responsePipeline) { 131 super(mainChannel); 132 this.responsePipeline = responsePipeline; 133 } 134 135 protected void setByteBufferPool( 136 ManagedBufferPool<ManagedBuffer<ByteBuffer>, 137 ByteBuffer> bufferPool) { 138 this.byteBufferPool = bufferPool; 139 } 140 141 protected void setCharBufferPool( 142 ManagedBufferPool<ManagedBuffer<CharBuffer>, 143 CharBuffer> bufferPool) { 144 this.charBufferPool = bufferPool; 145 } 146 147 /* 148 * (non-Javadoc) 149 * 150 * @see org.jgrapes.io.IOSubchannel#responsePipeline() 151 */ 152 @Override 153 public EventPipeline responsePipeline() { 154 return responsePipeline; 155 } 156 157 /** 158 * Returns the buffer pool set. If no buffer pool has been set, a 159 * buffer pool with with two buffers of size 4096 is created. 160 */ 161 @Override 162 public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> 163 byteBufferPool() { 164 if (byteBufferPool == null) { 165 byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 166 () -> { 167 return ByteBuffer.allocate(4096); 168 }, 2) 169 .setName(Components.objectName(this) 170 + ".upstream.byteBuffers"); 171 } 172 return byteBufferPool; 173 } 174 175 /** 176 * Returns the buffer pool set. If no buffer pool has been set, a 177 * buffer pool with with two buffers of size 4096 is created. 178 */ 179 @Override 180 public ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer> 181 charBufferPool() { 182 if (charBufferPool == null) { 183 charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 184 () -> { 185 return CharBuffer.allocate(4096); 186 }, 2) 187 .setName(Components.objectName(this) 188 + ".upstream.charBuffers"); 189 } 190 return charBufferPool; 191 } 192 193 } 194}