001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2024 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.http; 020 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.SocketAddress; 024import java.nio.Buffer; 025import java.nio.ByteBuffer; 026import java.nio.CharBuffer; 027import java.util.Optional; 028import java.util.concurrent.Callable; 029import org.jdrupes.httpcodec.ClientEngine; 030import org.jdrupes.httpcodec.Codec; 031import org.jdrupes.httpcodec.Decoder; 032import org.jdrupes.httpcodec.MessageHeader; 033import org.jdrupes.httpcodec.ProtocolException; 034import org.jdrupes.httpcodec.protocols.http.HttpField; 035import org.jdrupes.httpcodec.protocols.http.HttpResponse; 036import org.jdrupes.httpcodec.protocols.http.client.HttpRequestEncoder; 037import org.jdrupes.httpcodec.protocols.http.client.HttpResponseDecoder; 038import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame; 039import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader; 040import org.jdrupes.httpcodec.types.Converters; 041import org.jgrapes.core.Channel; 042import org.jgrapes.core.ClassChannel; 043import org.jgrapes.core.Component; 044import org.jgrapes.core.Components; 045import org.jgrapes.core.Components.PoolingIndex; 046import org.jgrapes.core.EventPipeline; 047import org.jgrapes.core.annotation.Handler; 048import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements; 049import org.jgrapes.http.events.HostUnresolved; 050import org.jgrapes.http.events.HttpConnected; 051import org.jgrapes.http.events.Request; 052import org.jgrapes.http.events.Response; 053import org.jgrapes.http.events.WebSocketClose; 054import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 055import org.jgrapes.io.events.Close; 056import org.jgrapes.io.events.Closed; 057import org.jgrapes.io.events.IOError; 058import org.jgrapes.io.events.Input; 059import org.jgrapes.io.events.OpenSocketConnection; 060import org.jgrapes.io.events.Output; 061import org.jgrapes.io.util.ManagedBuffer; 062import org.jgrapes.io.util.ManagedBufferPool; 063import org.jgrapes.net.SocketIOChannel; 064import org.jgrapes.net.events.ClientConnected; 065 066/** 067 * A converter component that receives and sends web application 068 * layer messages and byte buffers on associated network channels. 069 */ 070@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" }) 071public class HttpConnector extends Component { 072 073 private int applicationBufferSize = -1; 074 private final Channel netMainChannel; 075 private final Channel netSecureChannel; 076 private final PoolingIndex<SocketAddress, SocketIOChannel> pooled 077 = new PoolingIndex<>(); 078 079 /** 080 * Denotes the network channel in handler annotations. 081 */ 082 private static final class NetworkChannel extends ClassChannel { 083 } 084 085 /** 086 * Create a new connector that uses the {@code networkChannel} for network 087 * level I/O. 088 * 089 * @param appChannel 090 * this component's channel 091 * @param networkChannel 092 * the channel for network level I/O 093 * @param secureChannel 094 * the channel for secure network level I/O 095 */ 096 public HttpConnector(Channel appChannel, Channel networkChannel, 097 Channel secureChannel) { 098 super(appChannel, ChannelReplacements.create() 099 .add(NetworkChannel.class, networkChannel, secureChannel)); 100 this.netMainChannel = networkChannel; 101 this.netSecureChannel = secureChannel; 102 } 103 104 /** 105 * Create a new connector that uses the {@code networkChannel} for network 106 * level I/O. 107 * 108 * @param appChannel 109 * this component's channel 110 * @param networkChannel 111 * the channel for network level I/O 112 */ 113 public HttpConnector(Channel appChannel, Channel networkChannel) { 114 super(appChannel, ChannelReplacements.create() 115 .add(NetworkChannel.class, networkChannel)); 116 this.netMainChannel = networkChannel; 117 this.netSecureChannel = null; 118 } 119 120 /** 121 * Sets the size of the buffers used for {@link Input} events 122 * on the application channel. Defaults to the upstream buffer size 123 * minus 512 (estimate for added protocol overhead). 124 * 125 * @param applicationBufferSize the size to set 126 * @return the http server for easy chaining 127 */ 128 public HttpConnector setApplicationBufferSize(int applicationBufferSize) { 129 this.applicationBufferSize = applicationBufferSize; 130 return this; 131 } 132 133 /** 134 * Returns the size of the application side (receive) buffers. 135 * 136 * @return the value or -1 if not set 137 */ 138 public int applicationBufferSize() { 139 return applicationBufferSize; 140 } 141 142 /** 143 * Starts the processing of a request from the application layer. 144 * When a network connection has been established, the application 145 * layer will be informed by a {@link HttpConnected} event, fired 146 * on a subchannel that is created for the processing of this 147 * request. 148 * 149 * @param event the request 150 * @throws InterruptedException if processing is interrupted 151 * @throws IOException Signals that an I/O exception has occurred. 152 */ 153 @Handler 154 public void onRequest(Request.Out event) 155 throws InterruptedException, IOException { 156 new WebAppMsgChannel(event); 157 } 158 159 /** 160 * Handles output from the application. This may be the payload 161 * of e.g. a POST or data to be transferes on a websocket connection. 162 * 163 * @param event the event 164 * @param appChannel the application layer channel 165 * @throws InterruptedException the interrupted exception 166 */ 167 @Handler 168 @SuppressWarnings({ "PMD.CompareObjectsWithEquals", 169 "PMD.AvoidDuplicateLiterals" }) 170 public void onOutput(Output<?> event, WebAppMsgChannel appChannel) 171 throws InterruptedException { 172 if (appChannel.httpConnector() == this) { 173 appChannel.handleAppOutput(event); 174 } 175 } 176 177 /** 178 * Called when the network connection is established. Triggers the 179 * further processing of the initial request. 180 * 181 * @param event the event 182 * @param netConnChannel the network layer channel 183 * @throws InterruptedException if the execution is interrupted 184 * @throws IOException Signals that an I/O exception has occurred. 185 */ 186 @Handler(channels = NetworkChannel.class) 187 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 188 public void onConnected(ClientConnected event, 189 SocketIOChannel netConnChannel) 190 throws InterruptedException, IOException { 191 // Check if this is a response to our request 192 var appChannel = event.openEvent().associated(WebAppMsgChannel.class) 193 .filter(c -> c.httpConnector() == this); 194 if (appChannel.isPresent()) { 195 appChannel.get().connected(netConnChannel); 196 } 197 } 198 199 /** 200 * Handles I/O error events from the network layer. 201 * 202 * @param event the event 203 * @throws IOException Signals that an I/O exception has occurred. 204 */ 205 @Handler(channels = NetworkChannel.class) 206 @SuppressWarnings("PMD.CompareObjectsWithEquals") 207 public void onIoError(IOError event) throws IOException { 208 for (Channel channel : event.channels()) { 209 if (channel instanceof SocketIOChannel netConnChannel) { 210 // Error while using established network connection 211 Optional<WebAppMsgChannel> appChannel 212 = netConnChannel.associated(WebAppMsgChannel.class) 213 .filter(c -> c.httpConnector() == this); 214 if (appChannel.isPresent()) { 215 // Error while using a network connection 216 appChannel.get().handleIoError(event, netConnChannel); 217 continue; 218 } 219 // Just in case... 220 pooled.remove(netConnChannel.remoteAddress(), netConnChannel); 221 continue; 222 } 223 224 // Error while trying to establish the network connection 225 if (event.event() instanceof OpenSocketConnection connEvent) { 226 connEvent.associated(WebAppMsgChannel.class) 227 .filter(c -> c.httpConnector() == this).ifPresent(c -> { 228 c.openError(event); 229 }); 230 } 231 } 232 } 233 234 /** 235 * Processes any input from the network layer. 236 * 237 * @param event the event 238 * @param netConnChannel the network layer channel 239 * @throws InterruptedException if the thread is interrupted 240 * @throws ProtocolException if the protocol is violated 241 */ 242 @Handler(channels = NetworkChannel.class) 243 @SuppressWarnings("PMD.CompareObjectsWithEquals") 244 public void onInput(Input<ByteBuffer> event, SocketIOChannel netConnChannel) 245 throws InterruptedException, ProtocolException { 246 Optional<WebAppMsgChannel> appChannel 247 = netConnChannel.associated(WebAppMsgChannel.class) 248 .filter(c -> c.httpConnector() == this); 249 if (appChannel.isPresent()) { 250 appChannel.get().handleNetInput(event, netConnChannel); 251 } 252 } 253 254 /** 255 * Called when the network connection is closed. 256 * 257 * @param event the event 258 * @param netConnChannel the net conn channel 259 */ 260 @Handler(channels = NetworkChannel.class) 261 public void onClosed(Closed<?> event, SocketIOChannel netConnChannel) { 262 netConnChannel.associated(WebAppMsgChannel.class) 263 .filter(c -> c.httpConnector() == this).ifPresent( 264 appChannel -> appChannel.handleClosed(event)); 265 pooled.remove(netConnChannel.remoteAddress(), netConnChannel); 266 } 267 268 /** 269 * Handles a close event from the application channel. Such an 270 * event may only be fired if the connection has been upgraded 271 * to a websocket connection. 272 * 273 * @param event the event 274 * @param appChannel the application channel 275 */ 276 @Handler 277 @SuppressWarnings("PMD.CompareObjectsWithEquals") 278 public void onClose(Close event, WebAppMsgChannel appChannel) { 279 if (appChannel.httpConnector() == this) { 280 appChannel.handleClose(event); 281 } 282 } 283 284 /** 285 * An application layer channel. 286 */ 287 private class WebAppMsgChannel extends DefaultIOSubchannel { 288 // Starts as ClientEngine<HttpRequest,HttpResponse> but may change 289 private final ClientEngine<?, ?> engine 290 = new ClientEngine<>(new HttpRequestEncoder(), 291 new HttpResponseDecoder()); 292 private final InetSocketAddress serverAddress; 293 private final Request.Out request; 294 private ManagedBuffer<ByteBuffer> outBuffer; 295 private ManagedBufferPool<ManagedBuffer<ByteBuffer>, 296 ByteBuffer> byteBufferPool; 297 private ManagedBufferPool<ManagedBuffer<CharBuffer>, 298 CharBuffer> charBufferPool; 299 private ManagedBufferPool<?, ?> currentPool; 300 private SocketIOChannel netConnChannel; 301 private final EventPipeline downPipeline; 302 private WsMessageHeader currentWsMessage; 303 304 /** 305 * Instantiates a new channel. 306 * 307 * @param event the event 308 * @param netChannel the net channel 309 * @throws InterruptedException 310 * @throws IOException 311 */ 312 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 313 public WebAppMsgChannel(Request.Out event) 314 throws InterruptedException, IOException { 315 super(channel(), newEventPipeline()); 316 317 // Downstream pipeline, needed even if connection fails 318 downPipeline = newEventPipeline(); 319 320 // Extract request data and check host 321 request = event; 322 var uri = request.requestUri(); 323 var port = uri.getPort(); 324 if (port == -1) { 325 if ("https".equalsIgnoreCase(uri.getScheme())) { 326 port = 443; 327 } else if ("http".equalsIgnoreCase(uri.getScheme())) { 328 port = 80; 329 } 330 } 331 serverAddress = new InetSocketAddress(uri.getHost(), port); 332 if (serverAddress.isUnresolved()) { 333 downPipeline.fire(new HostUnresolved(event, 334 "Host cannot be resolved."), this); 335 return; 336 } 337 338 // Re-use network connection, if possible 339 SocketIOChannel recycled = pooled.poll(serverAddress); 340 if (recycled != null) { 341 connected(recycled); 342 return; 343 } 344 345 // Fire on network channel (targeting the network connector) 346 // as a follow up event (using the current pipeline). 347 var useSecure = uri.getScheme().equalsIgnoreCase("https") 348 && netSecureChannel != null; 349 fire(new OpenSocketConnection(serverAddress) 350 .setAssociated(WebAppMsgChannel.class, this), 351 useSecure ? netSecureChannel : netMainChannel); 352 } 353 354 private HttpConnector httpConnector() { 355 return HttpConnector.this; 356 } 357 358 /** 359 * Error in response to trying to open a new TCP connection. 360 * 361 * @param event the event 362 */ 363 public void openError(IOError event) { 364 // Already removed from connecting by caller, simply forward. 365 downPipeline.fire(IOError.duplicate(event), this); 366 } 367 368 /** 369 * Error from established TCP connection. 370 * 371 * @param event the event 372 * @param netConnChannel the network channel 373 */ 374 public void handleIoError(IOError event, 375 SocketIOChannel netConnChannel) { 376 downPipeline.fire(IOError.duplicate(event), this); 377 } 378 379 /** 380 * Sets the network connection channel for this application channel. 381 * 382 * @param netConnChannel the net conn channel 383 * @throws InterruptedException the interrupted exception 384 * @throws IOException Signals that an I/O exception has occurred. 385 */ 386 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 387 public final void connected(SocketIOChannel netConnChannel) 388 throws InterruptedException, IOException { 389 // Associate the network channel with this application channel 390 this.netConnChannel = netConnChannel; 391 netConnChannel.setAssociated(WebAppMsgChannel.class, this); 392 request.connectedCallback().ifPresent( 393 consumer -> consumer.accept(request, netConnChannel)); 394 395 // Estimate "good" application buffer size 396 int bufferSize = applicationBufferSize; 397 if (bufferSize <= 0) { 398 bufferSize = netConnChannel.byteBufferPool().bufferSize() - 512; 399 if (bufferSize < 4096) { 400 bufferSize = 4096; 401 } 402 } 403 String channelName = Components.objectName(HttpConnector.this) 404 + "." + Components.objectName(this); 405 byteBufferPool().setName(channelName + ".upstream.byteBuffers"); 406 charBufferPool().setName(channelName + ".upstream.charBuffers"); 407 // Allocate downstream buffer pools. Note that decoding WebSocket 408 // network packets may result in several WS frames that are each 409 // delivered in independent events. Therefore provide some 410 // additional buffers. 411 final int bufSize = bufferSize; 412 byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 413 () -> { 414 return ByteBuffer.allocate(bufSize); 415 }, 2, 100) 416 .setName(channelName + ".downstream.byteBuffers"); 417 charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 418 () -> { 419 return CharBuffer.allocate(bufSize); 420 }, 2, 100) 421 .setName(channelName + ".downstream.charBuffers"); 422 423 sendMessageUpstream(request.httpRequest(), netConnChannel); 424 425 // Forward Connected event downstream to e.g. start preparation 426 // of output events for payload data. 427 downPipeline.fire(new HttpConnected(request, 428 netConnChannel.localAddress(), netConnChannel.remoteAddress()), 429 this); 430 } 431 432 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 433 "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" }) 434 private void sendMessageUpstream(MessageHeader message, 435 SocketIOChannel netConnChannel) { 436 // Now send request as if it came from downstream (to 437 // avoid confusion with output events that may be 438 // generated in parallel, see below). 439 responsePipeline().submit("SynchronizedResponse", 440 new Callable<Void>() { 441 442 @Override 443 @SuppressWarnings({ "PMD.CommentRequired", 444 "PMD.AvoidBranchingStatementAsLastInLoop", 445 "PMD.AvoidDuplicateLiterals", 446 "PMD.AvoidInstantiatingObjectsInLoops" }) 447 public Void call() throws InterruptedException { 448 @SuppressWarnings("unchecked") 449 ClientEngine<MessageHeader, MessageHeader> untypedEngine 450 = (ClientEngine<MessageHeader, 451 MessageHeader>) engine; 452 untypedEngine.encode(message); 453 boolean hasBody = message.hasPayload(); 454 while (true) { 455 outBuffer 456 = netConnChannel.byteBufferPool().acquire(); 457 Codec.Result result 458 = engine.encode(Codec.EMPTY_IN, 459 outBuffer.backingBuffer(), !hasBody); 460 if (result.isOverflow()) { 461 netConnChannel 462 .respond(Output.fromSink(outBuffer, false)); 463 continue; 464 } 465 if (hasBody) { 466 // Keep buffer with incomplete request to be 467 // further 468 // filled by subsequent Output events 469 break; 470 } 471 // Request is completely encoded 472 if (outBuffer.position() > 0) { 473 netConnChannel 474 .respond(Output.fromSink(outBuffer, true)); 475 } else { 476 outBuffer.unlockBuffer(); 477 } 478 outBuffer = null; 479 if (result.closeConnection()) { 480 netConnChannel.respond(new Close()); 481 } 482 break; 483 } 484 return null; 485 } 486 }); 487 } 488 489 @SuppressWarnings({ "PMD.CommentRequired", "PMD.CyclomaticComplexity", 490 "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops", 491 "PMD.AvoidDuplicateLiterals", "PMD.CognitiveComplexity" }) 492 public void handleAppOutput(Output<?> event) 493 throws InterruptedException { 494 Buffer eventData = event.data(); 495 Buffer input; 496 if (eventData instanceof ByteBuffer) { 497 input = ((ByteBuffer) eventData).duplicate(); 498 } else if (eventData instanceof CharBuffer) { 499 input = ((CharBuffer) eventData).duplicate(); 500 } else { 501 return; 502 } 503 if (engine.switchedTo().equals(Optional.of("websocket")) 504 && currentWsMessage == null) { 505 // When switched to WebSockets, we only have Input and Output 506 // events. Add header automatically. 507 @SuppressWarnings("unchecked") 508 ClientEngine<MessageHeader, ?> wsEngine 509 = (ClientEngine<MessageHeader, ?>) engine; 510 currentWsMessage = new WsMessageHeader( 511 event.buffer().backingBuffer() instanceof CharBuffer, 512 true); 513 wsEngine.encode(currentWsMessage); 514 } 515 while (input.hasRemaining() || event.isEndOfRecord()) { 516 if (outBuffer == null) { 517 outBuffer = netConnChannel.byteBufferPool().acquire(); 518 } 519 Codec.Result result = engine.encode(input, 520 outBuffer.backingBuffer(), event.isEndOfRecord()); 521 if (result.isOverflow()) { 522 netConnChannel.respond(Output.fromSink(outBuffer, false)); 523 outBuffer = netConnChannel.byteBufferPool().acquire(); 524 continue; 525 } 526 if (event.isEndOfRecord() || result.closeConnection()) { 527 if (outBuffer.position() > 0) { 528 netConnChannel 529 .respond(Output.fromSink(outBuffer, true)); 530 } else { 531 outBuffer.unlockBuffer(); 532 } 533 outBuffer = null; 534 if (result.closeConnection()) { 535 netConnChannel.respond(new Close()); 536 } 537 break; 538 } 539 } 540 if (engine.switchedTo().equals(Optional.of("websocket")) 541 && event.isEndOfRecord()) { 542 currentWsMessage = null; 543 } 544 } 545 546 @SuppressWarnings({ "PMD.CommentRequired", 547 "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" }) 548 public void handleNetInput(Input<ByteBuffer> event, 549 SocketIOChannel netConnChannel) 550 throws InterruptedException, ProtocolException { 551 // Send the data from the event through the decoder. 552 ByteBuffer inData = event.data(); 553 // Don't unnecessary allocate a buffer, may be header only message 554 ManagedBuffer<?> bodyData = null; 555 boolean wasOverflow = false; 556 Decoder.Result<?> result; 557 while (inData.hasRemaining()) { 558 if (wasOverflow) { 559 // Message has (more) body 560 bodyData = currentPool.acquire(); 561 } 562 result = engine.decode(inData, 563 bodyData == null ? null : bodyData.backingBuffer(), 564 event.isEndOfRecord()); 565 if (result.response().isPresent()) { 566 sendMessageUpstream(result.response().get(), 567 netConnChannel); 568 if (result.isResponseOnly()) { 569 maybeReleaseConnection(result); 570 continue; 571 } 572 } 573 if (result.isHeaderCompleted()) { 574 MessageHeader header 575 = engine.responseDecoder().header().get(); 576 if (!handleResponseHeader(header)) { 577 maybeReleaseConnection(result); 578 break; 579 } 580 } 581 if (bodyData != null) { 582 if (bodyData.position() > 0) { 583 boolean eor 584 = !result.isOverflow() && !result.isUnderflow(); 585 downPipeline.fire(Input.fromSink(bodyData, eor), this); 586 } else { 587 bodyData.unlockBuffer(); 588 } 589 bodyData = null; 590 } 591 maybeReleaseConnection(result); 592 wasOverflow = result.isOverflow(); 593 } 594 } 595 596 @SuppressWarnings("PMD.CognitiveComplexity") 597 private boolean handleResponseHeader(MessageHeader response) { 598 if (response instanceof HttpResponse) { 599 HttpResponse httpResponse = (HttpResponse) response; 600 if (httpResponse.hasPayload()) { 601 if (httpResponse.findValue( 602 HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE) 603 .map(type -> "text" 604 .equalsIgnoreCase(type.value().topLevelType())) 605 .orElse(false)) { 606 currentPool = charBufferPool; 607 } else { 608 currentPool = byteBufferPool; 609 } 610 } 611 downPipeline.fire(new Response(httpResponse), this); 612 } else if (response instanceof WsMessageHeader) { 613 WsMessageHeader wsMessage = (WsMessageHeader) response; 614 if (wsMessage.hasPayload()) { 615 if (wsMessage.isTextMode()) { 616 currentPool = charBufferPool; 617 } else { 618 currentPool = byteBufferPool; 619 } 620 } 621 } else if (response instanceof WsCloseFrame) { 622 downPipeline.fire( 623 new WebSocketClose((WsCloseFrame) response, this)); 624 } 625 return true; 626 } 627 628 private void maybeReleaseConnection(Decoder.Result<?> result) { 629 if (result.isOverflow() || result.isUnderflow()) { 630 // Data remains to be processed 631 return; 632 } 633 MessageHeader header 634 = engine.responseDecoder().header().get(); 635 // Don't release if something follows 636 if (header instanceof HttpResponse 637 && ((HttpResponse) header).statusCode() % 100 == 1) { 638 return; 639 } 640 if (engine.switchedTo().equals(Optional.of("websocket"))) { 641 if (!result.closeConnection()) { 642 return; 643 } 644 // Is web socket close, inform application layer 645 downPipeline.fire(new Closed<Void>(), this); 646 } 647 netConnChannel.setAssociated(WebAppMsgChannel.class, null); 648 if (!result.closeConnection()) { 649 // May be reused 650 pooled.add(serverAddress, netConnChannel); 651 } 652 netConnChannel = null; 653 } 654 655 @SuppressWarnings("PMD.CommentRequired") 656 public void handleClose(Close event) { 657 if (engine.switchedTo().equals(Optional.of("websocket"))) { 658 sendMessageUpstream(new WsCloseFrame(null, null), 659 netConnChannel); 660 } 661 } 662 663 @SuppressWarnings("PMD.CommentRequired") 664 public void handleClosed(Closed<?> event) { 665 if (engine.switchedTo().equals(Optional.of("websocket"))) { 666 downPipeline.fire(new Closed<Void>(), this); 667 } 668 } 669 670 } 671 672}