001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2022,2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.mail; 020 021import jakarta.mail.AuthenticationFailedException; 022import jakarta.mail.Authenticator; 023import jakarta.mail.Folder; 024import jakarta.mail.FolderClosedException; 025import jakarta.mail.Message; 026import jakarta.mail.MessagingException; 027import jakarta.mail.NoSuchProviderException; 028import jakarta.mail.PasswordAuthentication; 029import jakarta.mail.Session; 030import jakarta.mail.Store; 031import jakarta.mail.event.ConnectionEvent; 032import jakarta.mail.event.ConnectionListener; 033import jakarta.mail.event.MessageCountAdapter; 034import jakarta.mail.event.MessageCountEvent; 035import jakarta.mail.event.StoreEvent; 036import jakarta.mail.event.StoreListener; 037import java.io.IOException; 038import java.time.Duration; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.HashMap; 042import java.util.HashSet; 043import java.util.List; 044import java.util.Map; 045import java.util.Optional; 046import java.util.Properties; 047import java.util.Set; 048import java.util.function.Consumer; 049import java.util.logging.Level; 050import org.eclipse.angus.mail.imap.IMAPFolder; 051import org.eclipse.angus.mail.imap.IdleManager; 052import org.jgrapes.core.Channel; 053import org.jgrapes.core.Components; 054import org.jgrapes.core.Components.Timer; 055import org.jgrapes.core.Event; 056import org.jgrapes.core.EventPipeline; 057import org.jgrapes.core.Subchannel; 058import org.jgrapes.core.annotation.Handler; 059import org.jgrapes.io.events.Closed; 060import org.jgrapes.io.events.ConnectError; 061import org.jgrapes.io.events.IOError; 062import org.jgrapes.io.events.Opening; 063import org.jgrapes.mail.events.MailFoldersUpdated; 064import org.jgrapes.mail.events.MailMonitorOpened; 065import org.jgrapes.mail.events.OpenMailMonitor; 066import org.jgrapes.mail.events.UpdateMailFolders; 067import org.jgrapes.util.Password; 068 069/** 070 * A component that opens mail stores and monitors mail folders for 071 * mails. After establishing a connection to a store and selected 072 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 073 * the existing and all subsequently arriving mails will be sent 074 * downstream using {@link MailFoldersUpdated} events. 075 * 076 * This implementation uses the {@link IdleManager}. The 077 * {@link IdleManager} works only if its {@link IdleManager#watch} 078 * method is invoked (for a folder) after any operation on that folder. 079 * Note that operations such as e.g. setting the deleted flag of 080 * a message is also an operation on a folder. 081 * 082 * Folders are updated in response to an {@link UpdateMailFolders} event 083 * or when the store signals the arrival of new messages. Information 084 * about the folders is delivered by a {@link MailFoldersUpdated} event. 085 * Folders may be freely used while handling the event, because the 086 * folders will be re-registered with the {@link IdleManager} 087 * when the {@link MailFoldersUpdated} event completes. 088 * Any usage of folders independent of handling the events mentioned 089 * will result in a loss of the monitor function. 090 * 091 * If required, the monitor function may be reestablished any time 092 * by firing a {@link UpdateMailFolders} event for the folders used. 093 */ 094@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 095 "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports", 096 "PMD.CouplingBetweenObjects" }) 097public class MailMonitor extends MailConnectionManager< 098 MailMonitor.MonitorChannel, OpenMailMonitor> { 099 100 private Duration maxIdleTime = Duration.ofMinutes(25); 101 private static IdleManager idleManager; 102 private final EventPipeline retrievals = newEventPipeline(); 103 104 /** 105 * Creates a new server using the given channel. 106 * 107 * @param componentChannel the component's channel 108 */ 109 public MailMonitor(Channel componentChannel) { 110 super(componentChannel); 111 } 112 113 @Override 114 protected boolean connectionsGenerate() { 115 return true; 116 } 117 118 /** 119 * Sets the maximum idle time. A running {@link IMAPFolder#idle()} 120 * is terminated and renewed after this time. Defaults to 25 minutes. 121 * 122 * @param maxIdleTime the new max idle time 123 */ 124 public MailMonitor setMaxIdleTime(Duration maxIdleTime) { 125 this.maxIdleTime = maxIdleTime; 126 return this; 127 } 128 129 /** 130 * Returns the max idle time. 131 * 132 * @return the duration 133 */ 134 public Duration maxIdleTime() { 135 return maxIdleTime; 136 } 137 138 /** 139 * Configure the component. Currently, only max idle time 140 * is supported. 141 * 142 * @param values the values 143 */ 144 @Override 145 protected void configureComponent(Map<String, String> values) { 146 Optional.ofNullable(values.get("maxIdleTime")) 147 .map(Integer::parseInt).map(Duration::ofSeconds) 148 .ifPresent(this::setMaxIdleTime); 149 } 150 151 /** 152 * Open a store as specified by the event and monitor the folders 153 * (also specified by the event). Information about all existing 154 * and all subsequently arriving mails will be signaled downstream 155 * using {@link MailFoldersUpdated} events. 156 * 157 * @param event the event 158 * @param channel the channel 159 */ 160 @Handler 161 public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) { 162 Properties sessionProps = new Properties(mailProps); 163 sessionProps.putAll(event.mailProperties()); 164 sessionProps.put("mail.imap.usesocketchannels", true); 165 Session session = Session.getInstance(sessionProps, 166 // Workaround for class loading problem in OSGi with j.m. 2.1. 167 // Authenticator's classpath allows accessing provider's service. 168 // See https://github.com/eclipse-ee4j/mail/issues/631 169 new Authenticator() { 170 @Override 171 @SuppressWarnings("PMD.StringInstantiation") 172 protected PasswordAuthentication 173 getPasswordAuthentication() { 174 return new PasswordAuthentication( 175 sessionProps.getProperty("mail.user"), 176 new String(event.password().or(() -> password()) 177 .map(Password::password).orElse(new char[0]))); 178 } 179 }); 180 181 try { 182 synchronized (MailMonitor.class) { 183 // Cannot be created earlier, need session. 184 if (idleManager == null) { 185 idleManager = new IdleManager(session, 186 Components.defaultExecutorService()); 187 } 188 } 189 new MonitorChannel(event, channel, session.getStore(), 190 sessionProps.getProperty("mail.user"), 191 event.password().or(this::password).orElse(null)); 192 } catch (NoSuchProviderException e) { 193 fire(new ConnectError(event, "Cannot create store.", e)); 194 } catch (IOException e) { 195 fire(new IOError(event, "Cannot create resource.", e)); 196 } 197 } 198 199 /** 200 * Retrieves the folders specified in the event. 201 * 202 * @param event the event 203 * @param channel the channel 204 */ 205 @Handler 206 public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) { 207 if (!connections.contains(channel)) { 208 return; 209 } 210 // This can take very long. 211 retrievals 212 .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event)); 213 } 214 215 /** 216 * The Enum ChannelState. 217 */ 218 @SuppressWarnings("PMD.FieldNamingConventions") 219 private enum ChannelState { 220 Opening { 221 @Override 222 public boolean isOpening() { 223 return true; 224 } 225 }, 226 Open { 227 @Override 228 public boolean isOpen() { 229 return true; 230 } 231 }, 232 Reopening { 233 @Override 234 public boolean isOpening() { 235 return true; 236 } 237 }, 238 Reopened { 239 @Override 240 public boolean isOpen() { 241 return true; 242 } 243 }, 244 Closing, 245 Closed; 246 247 /** 248 * Checks if is open. 249 * 250 * @return true, if is open 251 */ 252 public boolean isOpen() { 253 return false; 254 } 255 256 /** 257 * Checks if is opening. 258 * 259 * @return true, if is opening 260 */ 261 public boolean isOpening() { 262 return false; 263 } 264 } 265 266 /** 267 * The specific implementation of the {@link MailChannel}. 268 */ 269 protected class MonitorChannel extends 270 MailConnectionManager<MailMonitor.MonitorChannel, 271 OpenMailMonitor>.AbstractMailChannel 272 implements ConnectionListener, StoreListener { 273 274 private final EventPipeline requestPipeline; 275 private ChannelState state = ChannelState.Opening; 276 private final Store store; 277 private final String user; 278 private final Password password; 279 private final String[] subscribed; 280 @SuppressWarnings("PMD.UseConcurrentHashMap") 281 private final Map<String, Folder> folderCache = new HashMap<>(); 282 private final Timer idleTimer; 283 284 /** 285 * Instantiates a new monitor channel. 286 * 287 * @param event the event that triggered the creation 288 * @param mainChannel the main channel (of this {@link Subchannel}) 289 * @param store the store 290 * @param user the user 291 * @param password the password 292 */ 293 public MonitorChannel(OpenMailMonitor event, Channel mainChannel, 294 Store store, String user, Password password) { 295 super(event, mainChannel); 296 this.store = store; 297 this.user = user; 298 this.password = password; 299 this.subscribed = event.folderNames(); 300 requestPipeline = event.processedBy().get(); 301 store.addConnectionListener(this); 302 store.addStoreListener(this); 303 idleTimer = Components.schedule(t -> { 304 requestPipeline.fire(new UpdateMailFolders(), this); 305 }, maxIdleTime); 306 connect( 307 t -> downPipeline().fire(new ConnectError(event, t), 308 mainChannel)); 309 } 310 311 /** 312 * Attempt connections until connected. Attempts are stopped 313 * if it is the first time that the connection is to be 314 * established and the error indicates that the connection 315 * will never succeed (e.g. due to an authentication 316 * problem). 317 * 318 * @param onOpenFailed the on open failed 319 */ 320 private void connect(Consumer<Throwable> onOpenFailed) { 321 synchronized (this) { 322 if (state.isOpen()) { 323 return; 324 } 325 activeEventPipeline().executorService().submit(() -> { 326 while (state.isOpening()) { 327 try { 328 attemptConnect(onOpenFailed); 329 } catch (InterruptedException e) { 330 break; 331 } 332 } 333 }); 334 } 335 } 336 337 /** 338 * Single connection attempt. 339 * 340 * @param onOpenFailed the on open failed 341 * @throws InterruptedException the interrupted exception 342 */ 343 @SuppressWarnings({ "PMD.AvoidInstanceofChecksInCatchClause", 344 "PMD.StringInstantiation" }) 345 private void attemptConnect(Consumer<Throwable> onOpenFailed) 346 throws InterruptedException { 347 try { 348 store.connect(user, new String(password.password())); 349 synchronized (this) { 350 if (state == ChannelState.Opening) { 351 state = ChannelState.Open; 352 } else { 353 state = ChannelState.Reopened; 354 } 355 } 356 } catch (MessagingException e) { 357 synchronized (this) { 358 if (state == ChannelState.Opening 359 && (e instanceof AuthenticationFailedException 360 || e instanceof NoSuchProviderException)) { 361 logger.log(Level.WARNING, 362 "Connecting to store failed, closing.", e); 363 state = ChannelState.Closed; 364 super.close(); 365 if (onOpenFailed != null) { 366 onOpenFailed.accept(e); 367 } 368 return; 369 } 370 } 371 logger.log(Level.WARNING, 372 "(Re)connecting to store failed, retrying.", e); 373 Thread.sleep(5000); 374 } 375 } 376 377 /** 378 * Close the connection to the store. 379 */ 380 @Override 381 public void close() { 382 synchronized (this) { 383 if (state == ChannelState.Closing 384 || state == ChannelState.Closed) { 385 return; 386 } 387 state = ChannelState.Closing; 388 } 389 390 idleTimer.cancel(); 391 try { 392 // Initiate close, callback will inform downstream components. 393 store.close(); 394 } catch (MessagingException e) { 395 // According to the documentation, the listeners should 396 // be invoked nevertheless. 397 logger.log(Level.WARNING, "Cannot close connection properly.", 398 e); 399 } 400 } 401 402 /** 403 * Callback from store.connect is the connection is successful. 404 * 405 * @param event the event 406 */ 407 @Override 408 @SuppressWarnings({ "PMD.GuardLogStatement", 409 "PMD.AvoidDuplicateLiterals" }) 410 public void opened(ConnectionEvent event) { 411 folderCache.clear(); 412 if (state == ChannelState.Reopened) { 413 // This is a re-open, only retrieve messages. 414 requestPipeline.fire(new UpdateMailFolders(), this); 415 return; 416 } 417 // (1) Opening, (2) Opened, (3) start retrieving mails 418 downPipeline().fire(Event.onCompletion(new Opening<Void>(), 419 o -> downPipeline().fire( 420 Event.onCompletion( 421 new MailMonitorOpened(openEvent(), store), 422 p -> requestPipeline 423 .fire(new UpdateMailFolders(), this)), 424 this)), 425 this); 426 } 427 428 /** 429 * According to the documentation, 430 * {@link ConnectionEvent#DISCONNECTED} is currently not 431 * used. It's implemented nevertheless and called explicitly. 432 * 433 * @param event the event or `null` if called explicitly 434 */ 435 @Override 436 public void disconnected(ConnectionEvent event) { 437 synchronized (this) { 438 folderCache.clear(); 439 if (state.isOpen()) { 440 state = ChannelState.Reopening; 441 connect(null); 442 } 443 } 444 } 445 446 /** 447 * Callback that indicates the connection close, 448 * can be called any time by jakarta mail. 449 * 450 * Whether closing is intended (callback after a call to 451 * {@link #close}) can be checked by looking at the state. 452 * 453 * @param event the event 454 */ 455 @Override 456 public void closed(ConnectionEvent event) { 457 // Ignore if already closed. 458 if (state == ChannelState.Closed) { 459 return; 460 } 461 462 // Handle involuntary close by reopening. 463 if (state != ChannelState.Closing) { 464 disconnected(event); 465 return; 466 } 467 468 // Cleanup and remove channel. 469 synchronized (this) { 470 state = ChannelState.Closed; 471 folderCache.clear(); 472 } 473 downPipeline().fire(new Closed<Void>(), this); 474 super.close(); 475 } 476 477 @Override 478 public void notification(StoreEvent event) { 479 if (event.getMessage().contains("SocketException")) { 480 logger.fine(() -> "Problem with store: " + event.getMessage()); 481 if (store.isConnected()) { 482 logger.fine(() -> "Updating folders to resume"); 483 requestPipeline.fire(new UpdateMailFolders(), this); 484 return; 485 } 486 logger.fine(() -> "Reconnecting to resume"); 487 disconnected(null); 488 } 489 } 490 491 /** 492 * Retrieve the new messages from the folders specified in the 493 * event. 494 * 495 * @param event 496 */ 497 @SuppressWarnings({ "PMD.CognitiveComplexity", 498 "PMD.AvoidInstantiatingObjectsInLoops", 499 "PMD.AvoidDuplicateLiterals" }) 500 public void onUpdateFolders(UpdateMailFolders event) { 501 List<Folder> folders = new ArrayList<>(); 502 List<Message> newMsgs = new ArrayList<>(); 503 if (store.isConnected()) { 504 Set<String> folderNames 505 = new HashSet<>(Arrays.asList(subscribed)); 506 if (event.folderNames().length > 0) { 507 folderNames.retainAll(Arrays.asList(event.folderNames())); 508 } 509 try { 510 for (var folderName : folderNames) { 511 @SuppressWarnings("PMD.CloseResource") 512 Folder folder = getFolder(folderName); 513 if (folder == null) { 514 continue; 515 } 516 folders.add(folder); 517 } 518 } catch (FolderClosedException e) { 519 disconnected(null); 520 } 521 } else { 522 disconnected(null); 523 } 524 event.setResult(folders); 525 Event.onCompletion(event, e -> downPipeline().fire(Event 526 .onCompletion(new MailFoldersUpdated(folders, newMsgs), 527 this::refreshWatches), 528 this)); 529 } 530 531 @SuppressWarnings({ "PMD.GuardLogStatement", 532 "PMD.AvoidRethrowingException", "PMD.CloseResource" }) 533 private Folder getFolder(String folderName) 534 throws FolderClosedException { 535 synchronized (folderCache) { 536 Folder folder = folderCache.get(folderName); 537 if (folder != null) { 538 return folder; 539 } 540 try { 541 folder = store.getFolder(folderName); 542 if (folder == null || !folder.exists()) { 543 logger.fine(() -> "No folder \"" + folderName 544 + "\" in store " + store); 545 return null; 546 } 547 folder.open(Folder.READ_WRITE); 548 folderCache.put(folderName, folder); 549 // Add MessageCountListener to listen for new messages. 550 folder.addMessageCountListener(new MessageCountAdapter() { 551 @Override 552 public void 553 messagesAdded(MessageCountEvent countEvent) { 554 retrievals.submit("UpdateFolder", 555 () -> updateFolders(countEvent)); 556 } 557 558 @Override 559 public void 560 messagesRemoved(MessageCountEvent countEvent) { 561 retrievals.submit("UpdateFolder", 562 () -> updateFolders(countEvent)); 563 } 564 }); 565 return folder; 566 } catch (FolderClosedException e) { 567 throw e; 568 } catch (MessagingException e) { 569 logger.log(Level.FINE, 570 "Cannot open folder: " + e.getMessage(), e); 571 } 572 return null; 573 } 574 } 575 576 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 577 "PMD.GuardLogStatement" }) 578 private void updateFolders(MessageCountEvent event) { 579 List<Message> newMsgs = new ArrayList<>(); 580 if (event.getType() == MessageCountEvent.ADDED) { 581 newMsgs.addAll(Arrays.asList(event.getMessages())); 582 } else if (event.getType() != MessageCountEvent.REMOVED) { 583 return; 584 } 585 downPipeline().fire( 586 Event.onCompletion( 587 new MailFoldersUpdated( 588 new ArrayList<>(folderCache.values()), newMsgs), 589 this::refreshWatches), 590 this); 591 } 592 593 /** 594 * Registers the folders from which messages have been received 595 * with the {@link IdleManager}. 596 * 597 * @param event the event 598 */ 599 @SuppressWarnings({ "PMD.CloseResource", "PMD.UnusedPrivateMethod" }) 600 private void refreshWatches(MailFoldersUpdated event) { 601 if (!state.isOpen()) { 602 return; 603 } 604 for (Folder folder : event.folders()) { 605 try { 606 idleManager.watch(getFolder(folder.getFullName())); 607 } catch (MessagingException e) { 608 logger.log(Level.WARNING, "Cannot watch folder.", 609 e); 610 } 611 } 612 idleTimer.reschedule(maxIdleTime); 613 } 614 } 615 616 @Override 617 public String toString() { 618 return Components.objectName(this); 619 } 620 621}