001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.http; 020 021import java.lang.ref.WeakReference; 022import java.net.Inet4Address; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.net.UnknownHostException; 028import java.nio.Buffer; 029import java.nio.ByteBuffer; 030import java.nio.CharBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Optional; 036import javax.net.ssl.SNIHostName; 037import javax.net.ssl.SNIServerName; 038import org.jdrupes.httpcodec.Codec; 039import org.jdrupes.httpcodec.Decoder; 040import org.jdrupes.httpcodec.MessageHeader; 041import org.jdrupes.httpcodec.ProtocolException; 042import org.jdrupes.httpcodec.ServerEngine; 043import org.jdrupes.httpcodec.protocols.http.HttpConstants.HttpStatus; 044import org.jdrupes.httpcodec.protocols.http.HttpField; 045import org.jdrupes.httpcodec.protocols.http.HttpRequest; 046import org.jdrupes.httpcodec.protocols.http.HttpResponse; 047import org.jdrupes.httpcodec.protocols.http.server.HttpRequestDecoder; 048import org.jdrupes.httpcodec.protocols.http.server.HttpResponseEncoder; 049import org.jdrupes.httpcodec.protocols.websocket.WsCloseFrame; 050import org.jdrupes.httpcodec.protocols.websocket.WsMessageHeader; 051import org.jdrupes.httpcodec.types.Converters; 052import org.jdrupes.httpcodec.types.StringList; 053import org.jgrapes.core.Channel; 054import org.jgrapes.core.ClassChannel; 055import org.jgrapes.core.Component; 056import org.jgrapes.core.Components; 057import org.jgrapes.core.EventPipeline; 058import org.jgrapes.core.annotation.Handler; 059import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements; 060import org.jgrapes.core.internal.EventProcessor; 061import org.jgrapes.http.events.ProtocolSwitchAccepted; 062import org.jgrapes.http.events.Request; 063import org.jgrapes.http.events.Response; 064import org.jgrapes.http.events.Upgraded; 065import org.jgrapes.http.events.WebSocketClose; 066import org.jgrapes.io.IOSubchannel; 067import org.jgrapes.io.events.Close; 068import org.jgrapes.io.events.Closed; 069import org.jgrapes.io.events.Input; 070import org.jgrapes.io.events.Output; 071import org.jgrapes.io.events.Purge; 072import org.jgrapes.io.util.LinkedIOSubchannel; 073import org.jgrapes.io.util.ManagedBuffer; 074import org.jgrapes.io.util.ManagedBufferPool; 075import org.jgrapes.net.SocketServer; 076import org.jgrapes.net.events.Accepted; 077 078/** 079 * A converter component that receives and sends byte buffers on a 080 * network channel and web application layer messages on 081 * {@link IOSubchannel}s of its channel. 082 * 083 * Each {@link IOSubchannel} represents a connection established by 084 * the browser. The {@link HttpServer} fires {@link Request} events 085 * (and {@link Input} events, if there is associated data) on the 086 * subchannels. Web application components (short "weblets") handle 087 * these events and use 088 * {@link LinkedIOSubchannel#respond(org.jgrapes.core.Event)} 089 * to send {@link Response} events and, if applicable, {@link Output} 090 * events with data belonging to the response. 091 * 092 * Events must be fired by weblets while handling the {@link Request} 093 * or {@link Input} events only (to be precise: while handling events 094 * processed by the associated {@link EventProcessor}) to ensure 095 * that responses and their associated data do not interleave. 096 */ 097@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.CouplingBetweenObjects" }) 098public class HttpServer extends Component { 099 100 @SuppressWarnings("PMD.SingularField") 101 private WeakReference<Channel> networkChannelPassBack; 102 private List<Class<? extends Request.In>> providedFallbacks; 103 private int matchLevels = 1; 104 private boolean acceptNoSni; 105 private int applicationBufferSize = -1; 106 107 /** 108 * Denotes the network channel in handler annotations. 109 */ 110 private static final class NetworkChannel extends ClassChannel { 111 } 112 113 /** 114 * Create a new server that uses the {@code networkChannel} for network 115 * level I/O. 116 * <P> 117 * As a convenience the server can provide fall back handlers for the 118 * specified types of requests. The fall back handler simply returns 404 ( 119 * "Not found"). 120 * 121 * @param appChannel 122 * this component's channel 123 * @param networkChannel 124 * the channel for network level I/O 125 * @param fallbacks 126 * the requests for which a fall back handler is provided 127 */ 128 @SafeVarargs 129 public HttpServer(Channel appChannel, Channel networkChannel, 130 Class<? extends Request.In>... fallbacks) { 131 super(appChannel, ChannelReplacements.create() 132 .add(NetworkChannel.class, networkChannel)); 133 networkChannelPassBack = new WeakReference<>(networkChannel); 134 this.providedFallbacks = Arrays.asList(fallbacks); 135 } 136 137 /** 138 * Create a new server that creates its own {@link SocketServer} with 139 * the given address and uses it for network level I/O. 140 * 141 * @param appChannel 142 * this component's channel 143 * @param serverAddress the address to listen on 144 * @param fallbacks fall backs 145 */ 146 @SafeVarargs 147 public HttpServer(Channel appChannel, InetSocketAddress serverAddress, 148 Class<? extends Request.In>... fallbacks) { 149 this(appChannel, new SocketServer().setServerAddress(serverAddress), 150 fallbacks); 151 attach((SocketServer) networkChannelPassBack.get()); 152 } 153 154 /** 155 * @return the matchLevels 156 */ 157 public int matchLevels() { 158 return matchLevels; 159 } 160 161 /** 162 * Sets the number of elements from the request path used in the match value 163 * of the generated events (see {@link Request#defaultCriterion()}), defaults 164 * to 1. 165 * 166 * @param matchLevels the matchLevels to set 167 * @return the http server for easy chaining 168 */ 169 public HttpServer setMatchLevels(int matchLevels) { 170 this.matchLevels = matchLevels; 171 return this; 172 } 173 174 /** 175 * Sets the size of the buffers used for {@link Output} events 176 * on the application channel. Defaults to the upstream buffer size 177 * minus 512 (estimate for added protocol overhead). 178 * 179 * @param applicationBufferSize the size to set 180 * @return the http server for easy chaining 181 */ 182 public HttpServer setApplicationBufferSize(int applicationBufferSize) { 183 this.applicationBufferSize = applicationBufferSize; 184 return this; 185 } 186 187 /** 188 * Returns the size of the application side (receive) buffers. 189 * 190 * @return the value or -1 if not set 191 */ 192 public int applicationBufferSize() { 193 return applicationBufferSize; 194 } 195 196 /** 197 * Determines if request from secure (TLS) connections without 198 * SNI are accepted. 199 * 200 * Secure (TLS) requests usually transfer the name of the server that 201 * they want to connect to during handshake. The HTTP server checks 202 * that the `Host` header field of decoded requests matches the 203 * name used to establish the connection. If, however, the connection 204 * is made using the IP-address, the client does not have a host name. 205 * If such connections are to be accepted, this flag, which 206 * defaults to `false`, must be set. 207 * 208 * Note that in request accepted without SNI, the `Host` header field 209 * will be modified to contain the IP-address of the indicated host 210 * to prevent accidental matching with virtual host names. 211 * 212 * @param acceptNoSni the value to set 213 * @return the http server for easy chaining 214 */ 215 public HttpServer setAcceptNoSni(boolean acceptNoSni) { 216 this.acceptNoSni = acceptNoSni; 217 return this; 218 } 219 220 /** 221 * Returns if secure (TLS) requests without SNI are allowed. 222 * 223 * @return the result 224 */ 225 public boolean acceptNoSni() { 226 return acceptNoSni; 227 } 228 229 /** 230 * Creates a new downstream connection as {@link LinkedIOSubchannel} 231 * of the network connection, a {@link HttpRequestDecoder} and a 232 * {@link HttpResponseEncoder}. 233 * 234 * @param event 235 * the accepted event 236 */ 237 @Handler(channels = NetworkChannel.class) 238 public void onAccepted(Accepted event, IOSubchannel netChannel) { 239 new WebAppMsgChannel(event, netChannel); 240 } 241 242 /** 243 * Handles data from the client (from upstream). The data is send through 244 * the {@link HttpRequestDecoder} and events are sent downstream according 245 * to the decoding results. 246 * 247 * @param event the event 248 * @throws ProtocolException if a protocol exception occurs 249 * @throws InterruptedException 250 */ 251 @Handler(channels = NetworkChannel.class) 252 public void onInput( 253 Input<ByteBuffer> event, IOSubchannel netChannel) 254 throws ProtocolException, InterruptedException { 255 @SuppressWarnings("unchecked") 256 final Optional<WebAppMsgChannel> appChannel 257 = (Optional<WebAppMsgChannel>) LinkedIOSubchannel 258 .downstreamChannel(this, netChannel); 259 if (appChannel.isPresent()) { 260 appChannel.get().handleNetInput(event); 261 } 262 } 263 264 /** 265 * Forwards a {@link Closed} event to the application channel. 266 * 267 * @param event the event 268 * @param netChannel the net channel 269 */ 270 @Handler(channels = NetworkChannel.class) 271 public void onClosed(Closed<?> event, IOSubchannel netChannel) { 272 LinkedIOSubchannel.downstreamChannel(this, netChannel, 273 WebAppMsgChannel.class).ifPresent(appChannel -> { 274 appChannel.handleClosed(event); 275 }); 276 } 277 278 /** 279 * Forwards a {@link Purge} event to the application channel. 280 * 281 * @param event the event 282 * @param netChannel the net channel 283 */ 284 @Handler(channels = NetworkChannel.class) 285 public void onPurge(Purge event, IOSubchannel netChannel) { 286 LinkedIOSubchannel.downstreamChannel(this, netChannel, 287 WebAppMsgChannel.class).ifPresent(appChannel -> { 288 appChannel.handlePurge(event); 289 }); 290 } 291 292 /** 293 * Handles a response event from downstream by sending it through an 294 * {@link HttpResponseEncoder} that generates the data (encoded information) 295 * and sends it upstream with {@link Output} events. Depending on whether 296 * the response has a body, subsequent {@link Output} events can 297 * follow. 298 * 299 * @param event 300 * the response event 301 * @throws InterruptedException if the execution was interrupted 302 */ 303 @Handler 304 public void onResponse(Response event, WebAppMsgChannel appChannel) 305 throws InterruptedException { 306 appChannel.handleResponse(event); 307 } 308 309 /** 310 * Receives the message body of a response. A {@link Response} event that 311 * has a message body can be followed by one or more {@link Output} events 312 * from downstream that contain the data. An {@code Output} event 313 * with the end of record flag set signals the end of the message body. 314 * 315 * @param event 316 * the event with the data 317 * @throws InterruptedException if the execution was interrupted 318 */ 319 @Handler 320 public void onOutput(Output<?> event, WebAppMsgChannel appChannel) 321 throws InterruptedException { 322 appChannel.handleAppOutput(event); 323 } 324 325 /** 326 * Handles a close event from downstream by closing the upstream 327 * connections. 328 * 329 * @param event 330 * the close event 331 * @throws InterruptedException if the execution was interrupted 332 */ 333 @Handler 334 public void onClose(Close event, WebAppMsgChannel appChannel) 335 throws InterruptedException { 336 appChannel.handleClose(event); 337 } 338 339 /** 340 * Checks whether the request has been handled (value of {@link Request} 341 * event set to `true`) or the status code in the prepared response 342 * is no longer "Not Implemented". If not, but a fall back has been set, 343 * send a "Not Found" response. If this isn't the case either, send 344 * the default response ("Not implemented") to the client. 345 * 346 * @param event 347 * the request completed event 348 * @param appChannel the application channel 349 * @throws InterruptedException if the execution was interrupted 350 */ 351 @Handler 352 public void onRequestCompleted( 353 Request.In.Completed event, IOSubchannel appChannel) 354 throws InterruptedException { 355 final Request.In requestEvent = event.event(); 356 // A check that also works with null. 357 if (Boolean.TRUE.equals(requestEvent.get()) 358 || requestEvent.httpRequest().response().map( 359 response -> response.statusCode() != HttpStatus.NOT_IMPLEMENTED 360 .statusCode()) 361 .orElse(false)) { 362 // Some other component has taken care 363 return; 364 } 365 366 // Check if "Not Found" should be sent 367 if (providedFallbacks != null 368 && providedFallbacks.contains(requestEvent.getClass())) { 369 ResponseCreationSupport.sendResponse( 370 requestEvent.httpRequest(), appChannel, HttpStatus.NOT_FOUND); 371 return; 372 } 373 374 // Last resort 375 ResponseCreationSupport.sendResponse(requestEvent.httpRequest(), 376 appChannel, HttpStatus.NOT_IMPLEMENTED); 377 } 378 379 /** 380 * Provides a fallback handler for an OPTIONS request with asterisk. Simply 381 * responds with "OK". 382 * 383 * @param event the event 384 * @param appChannel the application channel 385 */ 386 @Handler(priority = Integer.MIN_VALUE) 387 public void onOptions(Request.In.Options event, IOSubchannel appChannel) { 388 if (event.requestUri() == HttpRequest.ASTERISK_REQUEST) { 389 HttpResponse response = event.httpRequest().response().get(); 390 response.setStatus(HttpStatus.OK); 391 appChannel.respond(new Response(response)); 392 event.setResult(true); 393 event.stop(); 394 } 395 } 396 397 /** 398 * Send the response indicating that the protocol switch was accepted 399 * and causes subsequent data to be handled as {@link Input} and 400 * {@link Output} events on the channel. 401 * 402 * As a convenience, the channel is associates with the URI that 403 * was used to request the protocol switch using {@link URI} as key. 404 * 405 * @param event the event 406 * @param appChannel the channel 407 */ 408 @Handler 409 public void onProtocolSwitchAccepted( 410 ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) { 411 appChannel.handleProtocolSwitchAccepted(event, appChannel); 412 } 413 414 /** 415 * An application layer channel. 416 */ 417 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 418 private class WebAppMsgChannel extends LinkedIOSubchannel { 419 // Starts as ServerEngine<HttpRequest,HttpResponse> but may change 420 private final ServerEngine<?, ?> engine; 421 private ManagedBuffer<ByteBuffer> outBuffer; 422 private final boolean secure; 423 private List<String> snis = Collections.emptyList(); 424 private final ManagedBufferPool<ManagedBuffer<ByteBuffer>, 425 ByteBuffer> byteBufferPool; 426 private final ManagedBufferPool<ManagedBuffer<CharBuffer>, 427 CharBuffer> charBufferPool; 428 private ManagedBufferPool<?, ?> currentPool; 429 private final EventPipeline downPipeline; 430 private Upgraded pendingUpgraded; 431 private WsMessageHeader currentWsMessage; 432 433 /** 434 * Instantiates a new channel. 435 * 436 * @param event the event 437 * @param netChannel the net channel 438 */ 439 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 440 public WebAppMsgChannel(Accepted event, IOSubchannel netChannel) { 441 super(HttpServer.this, channel(), netChannel, newEventPipeline()); 442 engine = new ServerEngine<>( 443 new HttpRequestDecoder(), new HttpResponseEncoder()); 444 secure = event.isSecure(); 445 if (secure) { 446 snis = new ArrayList<>(); 447 for (SNIServerName sni : event.requestedServerNames()) { 448 if (sni instanceof SNIHostName) { 449 snis.add(((SNIHostName) sni).getAsciiName()); 450 } 451 } 452 } 453 454 // Calculate "good" application buffer size 455 int bufferSize = applicationBufferSize; 456 if (bufferSize <= 0) { 457 bufferSize = netChannel.byteBufferPool().bufferSize() - 512; 458 if (bufferSize < 4096) { 459 bufferSize = 4096; 460 } 461 } 462 463 String channelName = Components.objectName(HttpServer.this) 464 + "." + Components.objectName(this); 465 byteBufferPool().setName(channelName + ".upstream.byteBuffers"); 466 charBufferPool().setName(channelName + ".upstream.charBuffers"); 467 // Allocate downstream buffer pools. Note that decoding WebSocket 468 // network packets may result in several WS frames that are each 469 // delivered in independent events. Therefore provide some 470 // additional buffers. 471 final int bufSize = bufferSize; 472 byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 473 () -> { 474 return ByteBuffer.allocate(bufSize); 475 }, 2, 100) 476 .setName(channelName + ".downstream.byteBuffers"); 477 charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new, 478 () -> { 479 return CharBuffer.allocate(bufSize); 480 }, 2, 100) 481 .setName(channelName + ".downstream.charBuffers"); 482 483 // Downstream pipeline 484 downPipeline = newEventPipeline(); 485 } 486 487 /** 488 * Handle {@link Input} events from the network. 489 * 490 * @param event the event 491 * @throws ProtocolException the protocol exception 492 * @throws InterruptedException the interrupted exception 493 */ 494 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 495 "PMD.AvoidInstantiatingObjectsInLoops", 496 "PMD.AvoidDeeplyNestedIfStmts", "PMD.CollapsibleIfStatements", 497 "PMD.CognitiveComplexity", "PMD.AvoidDuplicateLiterals" }) 498 public void handleNetInput(Input<ByteBuffer> event) 499 throws ProtocolException, InterruptedException { 500 // Send the data from the event through the decoder. 501 ByteBuffer inData = event.data(); 502 // Don't unnecessary allocate a buffer, may be header only message 503 ManagedBuffer<?> bodyData = null; 504 boolean wasOverflow = false; 505 while (inData.hasRemaining()) { 506 if (wasOverflow) { 507 // Message has (more) body 508 bodyData = currentPool.acquire(); 509 } 510 Decoder.Result<?> result = engine.decode(inData, 511 bodyData == null ? null : bodyData.backingBuffer(), 512 event.isEndOfRecord()); 513 if (result.response().isPresent()) { 514 // Feedback required, send it "in sync", even if 515 // event source is not the regular one. 516 responsePipeline().overrideRestriction().fire( 517 new Response(result.response().get()), this); 518 if (result.isResponseOnly()) { 519 maybeCloseConnection(result); 520 continue; 521 } 522 } 523 if (result.isHeaderCompleted()) { 524 if (!handleRequestHeader(engine.currentRequest().get())) { 525 maybeCloseConnection(result); 526 break; 527 } 528 } 529 if (bodyData != null) { 530 if (bodyData.position() > 0) { 531 downPipeline.fire(Input.fromSink( 532 bodyData, !result.isOverflow() 533 && !result.isUnderflow()), 534 this); 535 } else { 536 bodyData.unlockBuffer(); 537 } 538 bodyData = null; 539 } 540 maybeCloseConnection(result); 541 wasOverflow = result.isOverflow(); 542 } 543 } 544 545 private void maybeCloseConnection(Decoder.Result<?> result) { 546 if (result.closeConnection()) { 547 // Send close "in sync", even if event source is unexpected. 548 responsePipeline().overrideRestriction() 549 .fire(new Close(), this); 550 } 551 } 552 553 @SuppressWarnings({ "PMD.CollapsibleIfStatements", 554 "PMD.CognitiveComplexity" }) 555 private boolean handleRequestHeader(MessageHeader request) { 556 if (request instanceof HttpRequest) { 557 HttpRequest httpRequest = (HttpRequest) request; 558 if (httpRequest.hasPayload()) { 559 if (httpRequest.findValue( 560 HttpField.CONTENT_TYPE, Converters.MEDIA_TYPE) 561 .map(type -> "text" 562 .equalsIgnoreCase(type.value().topLevelType())) 563 .orElse(false)) { 564 currentPool = charBufferPool; 565 } else { 566 currentPool = byteBufferPool; 567 } 568 } 569 if (secure) { 570 if (!snis.contains(httpRequest.host())) { 571 if (acceptNoSni && snis.isEmpty()) { 572 convertHostToNumerical(httpRequest); 573 } else { 574 ResponseCreationSupport.sendResponse(httpRequest, 575 this, 421, "Misdirected Request"); 576 return false; 577 } 578 } 579 } 580 try { 581 downPipeline.fire(Request.In.fromHttpRequest(httpRequest, 582 secure, matchLevels), this); 583 } catch (URISyntaxException e) { 584 ResponseCreationSupport.sendResponse(httpRequest, this, 400, 585 "Bad Request"); 586 return false; 587 } 588 } else if (request instanceof WsMessageHeader) { 589 WsMessageHeader wsMessage = (WsMessageHeader) request; 590 if (wsMessage.hasPayload()) { 591 if (wsMessage.isTextMode()) { 592 currentPool = charBufferPool; 593 } else { 594 currentPool = byteBufferPool; 595 } 596 } 597 } else if (request instanceof WsCloseFrame) { 598 downPipeline.fire( 599 new WebSocketClose((WsCloseFrame) request, this)); 600 } 601 return true; 602 } 603 604 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 605 "PMD.UseStringBufferForStringAppends" }) 606 private void convertHostToNumerical(HttpRequest request) { 607 int port = request.port(); 608 String host; 609 try { 610 InetAddress addr = InetAddress.getByName( 611 request.host()); 612 host = addr.getHostAddress(); 613 if (!(addr instanceof Inet4Address)) { 614 host = "[" + host + "]"; 615 } 616 } catch (UnknownHostException e) { 617 host = InetAddress.getLoopbackAddress().getHostAddress(); 618 } 619 request.setHostAndPort(host, port); 620 } 621 622 /** 623 * Handle a response event from the application layer. 624 * 625 * @param event the event 626 * @throws InterruptedException the interrupted exception 627 */ 628 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 629 "PMD.AvoidBranchingStatementAsLastInLoop", 630 "PMD.CognitiveComplexity" }) 631 public void handleResponse(Response event) throws InterruptedException { 632 if (!engine.encoding() 633 .isAssignableFrom(event.response().getClass())) { 634 return; 635 } 636 final MessageHeader response = event.response(); 637 // Start sending the response 638 @SuppressWarnings("unchecked") 639 ServerEngine<?, MessageHeader> msgEngine 640 = (ServerEngine<?, MessageHeader>) engine; 641 msgEngine.encode(response); 642 if (pendingUpgraded != null) { 643 if (response instanceof HttpResponse 644 && ((HttpResponse) response).statusCode() % 100 == 1) { 645 downPipeline.fire(pendingUpgraded, this); 646 } 647 pendingUpgraded = null; 648 } 649 boolean hasBody = response.hasPayload(); 650 while (true) { 651 outBuffer = upstreamChannel().byteBufferPool().acquire(); 652 final ManagedBuffer<ByteBuffer> buffer = outBuffer; 653 Codec.Result result = engine.encode( 654 Codec.EMPTY_IN, buffer.backingBuffer(), !hasBody); 655 if (result.isOverflow()) { 656 upstreamChannel().respond(Output.fromSink(buffer, false)); 657 continue; 658 } 659 if (hasBody) { 660 // Keep buffer with incomplete response to be further 661 // filled by subsequent Output events 662 break; 663 } 664 // Response is complete 665 if (buffer.position() > 0) { 666 upstreamChannel().respond(Output.fromSink(buffer, true)); 667 } else { 668 buffer.unlockBuffer(); 669 } 670 outBuffer = null; 671 if (result.closeConnection()) { 672 upstreamChannel().respond(new Close()); 673 } 674 break; 675 } 676 677 } 678 679 /** 680 * Handle a {@link ProtocolSwitchAccepted} event from the 681 * application layer. 682 * 683 * @param event the event 684 * @param appChannel the app channel 685 */ 686 public void handleProtocolSwitchAccepted( 687 ProtocolSwitchAccepted event, WebAppMsgChannel appChannel) { 688 appChannel.setAssociated(URI.class, 689 event.requestEvent().requestUri()); 690 final HttpResponse response = event.requestEvent() 691 .httpRequest().response().get() 692 .setStatus(HttpStatus.SWITCHING_PROTOCOLS) 693 .setField(HttpField.UPGRADE, 694 new StringList(event.protocol())); 695 // We send the Upgraded event only after the response has 696 // successfully been encoded (and thus checked). 697 pendingUpgraded = new Upgraded(event.resourceName(), 698 event.protocol()); 699 respond(new Response(response)); 700 } 701 702 /** 703 * Handle output from the application layer. 704 * 705 * @param event the event 706 * @throws InterruptedException the interrupted exception 707 */ 708 @SuppressWarnings({ "PMD.CyclomaticComplexity", "PMD.NcssCount", 709 "PMD.NPathComplexity", "PMD.AvoidInstantiatingObjectsInLoops", 710 "PMD.CognitiveComplexity" }) 711 public void handleAppOutput(Output<?> event) 712 throws InterruptedException { 713 Buffer eventData = event.data(); 714 Buffer input; 715 if (eventData instanceof ByteBuffer) { 716 input = ((ByteBuffer) eventData).duplicate(); 717 } else if (eventData instanceof CharBuffer) { 718 input = ((CharBuffer) eventData).duplicate(); 719 } else { 720 return; 721 } 722 if (engine.switchedTo().equals(Optional.of("websocket")) 723 && currentWsMessage == null) { 724 // When switched to WebSockets, we only have Input and Output 725 // events. Add header automatically. 726 @SuppressWarnings("unchecked") 727 ServerEngine<?, MessageHeader> wsEngine 728 = (ServerEngine<?, MessageHeader>) engine; 729 currentWsMessage = new WsMessageHeader( 730 event.buffer().backingBuffer() instanceof CharBuffer, 731 true); 732 wsEngine.encode(currentWsMessage); 733 } 734 while (input.hasRemaining() || event.isEndOfRecord()) { 735 if (outBuffer == null) { 736 outBuffer = upstreamChannel().byteBufferPool().acquire(); 737 } 738 Codec.Result result = engine.encode(input, 739 outBuffer.backingBuffer(), event.isEndOfRecord()); 740 if (result.isOverflow()) { 741 upstreamChannel() 742 .respond(Output.fromSink(outBuffer, false)); 743 outBuffer = upstreamChannel().byteBufferPool().acquire(); 744 continue; 745 } 746 if (event.isEndOfRecord() || result.closeConnection()) { 747 if (outBuffer.position() > 0) { 748 upstreamChannel() 749 .respond(Output.fromSink(outBuffer, true)); 750 } else { 751 outBuffer.unlockBuffer(); 752 } 753 outBuffer = null; 754 if (result.closeConnection()) { 755 upstreamChannel().respond(new Close()); 756 } 757 break; 758 } 759 } 760 if (engine.switchedTo().equals(Optional.of("websocket")) 761 && event.isEndOfRecord()) { 762 currentWsMessage = null; 763 } 764 } 765 766 /** 767 * Handle a {@link Close} event from the application layer. 768 * 769 * @param event the event 770 * @throws InterruptedException the interrupted exception 771 */ 772 public void handleClose(Close event) throws InterruptedException { 773 if (engine.switchedTo().equals(Optional.of("websocket"))) { 774 fire(new Response(new WsCloseFrame(null, null)), this); 775 return; 776 } 777 upstreamChannel().respond(new Close()); 778 } 779 780 /** 781 * Handle a {@link Closed} event from the network by forwarding 782 * it to the application layer. 783 * 784 * @param event the event 785 */ 786 public void handleClosed(Closed<?> event) { 787 downPipeline.fire(new Closed<Void>(), this); 788 } 789 790 /** 791 * Handle a {@link Purge} event by forwarding it to the 792 * application layer. 793 * 794 * @param event the event 795 */ 796 public void handlePurge(Purge event) { 797 downPipeline.fire(new Purge(), this); 798 } 799 800 } 801 802}