本文最后更新于:2022年12月9日 上午
Spring Event 通知机制,可以很有效的帮助项目中的代码解耦合。 咱们来看下它的同步和异步使用方式,以及相应的源码。
1.spring event demo - 同步模式 定义一个事件,继承ApplicationEvent。 再定义一个eventPublisher,通过ApplicationEventPublisher来发布事件。 最后定义一个eventSubscriber,实现了ApplicationListener接口,用来监听事件。
1 2 3 4 5 6 7 8 9 10 11 12 public class MyEvent extends ApplicationEvent { private String message; public MyEvent (Object source, String message) { super (source); this .message = message; } public String getMessage () { return message; } }
1 2 3 4 5 6 7 8 9 @Component public class MyEventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void publishEvent (String message) { applicationEventPublisher.publishEvent(new MyEvent (this , message)); } }
1 2 3 4 5 6 7 @Component public class MyEventSubscriber implements ApplicationListener <MyEvent> { @Override public void onApplicationEvent (MyEvent myEvent) { System.out.println(myEvent.getMessage()); } }
output: 从output可以看出,主线程publish event,接下来依然是主线程执行subscriber的内容。
Thread main: Publish Event
main: Subscribed event - test
源码解析 AbstractApplicationContext的publishEvent方法,会让容器中的applicationEventMulticaster发布事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { protected void publishEvent (Object event, @Nullable ResolvableType eventType) { if (this .earlyApplicationEvents != null ) { this .earlyApplicationEvents.add(applicationEvent); } else { this .getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); } if (this .parent != null ) { if (this .parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext)this .parent).publishEvent(event, eventType); } else { this .parent.publishEvent(event); } } } }
这里的simpleApplicationEventMulticaster是spring容器中默认的multicaster。执行方法multicastEvent时,会看是否有指定的executor,如果没有,那么就由当前线程来invoke listener并执行subscriber的内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { public void multicastEvent (ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = eventType != null ? eventType : this .resolveDefaultEventType(event); Executor executor = this .getTaskExecutor(); Iterator var5 = this .getApplicationListeners(event, type).iterator(); while (var5.hasNext()) { ApplicationListener<?> listener = (ApplicationListener)var5.next(); if (executor != null ) { executor.execute(() -> { this .invokeListener(listener, event); }); } else { this .invokeListener(listener, event); } } }private void doInvokeListener (ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); } catch (ClassCastException var6) { String msg = var6.getMessage(); if (msg != null && !this .matchesClassCastMessage(msg, event.getClass())) { throw var6; } Log logger = LogFactory.getLog(this .getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, var6); } } } }
2. spring event demo - 异步模式 加上配置,注入一个simpleApplicationEventMulticaster。将这个multicaster的taskexecutor设置为自定义的线程池。 当publish event时,当前线程会到从线程池里来取线程,进行invoke listener,以及执行subscriber逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class EventConfig { @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME) public SimpleApplicationEventMulticaster myEventMulticaster () { SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster (); simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor()); return simpleApplicationEventMulticaster; } @Bean public TaskExecutor taskExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize(5 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(100 ); executor.setKeepAliveSeconds(300 ); executor.setThreadNamePrefix("thread-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .DiscardPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true ); return executor; } }
output: 从output可以看出,主线程publish event,而subscriber内容由线程池中的线程执行。
Thread main: Publish Event
thread-1: Subscribed event - test
源码解析 1 2 3 4 5 6 7 8 9 10 11 12 public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { public void setTaskExecutor (@Nullable Executor taskExecutor) { this .taskExecutor = taskExecutor; } }
3. Spring容器中的通知机制是如何实现的 3.1 初始化SimpleApplicationEventMulticaster并注入容器 Spring在初始化bean时,会进行ApplicationEventMulticaster的初始化。 会先检查是否有local的applicationEventMulticaster,如果有,那么就会创建local也就是自定义的applicationEventMulticaster,放入容器;如果没有,就会创建默认的SimpleApplicationEventMulticaster放入容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { protected void initApplicationEventMulticaster () { ConfigurableListableBeanFactory beanFactory = this .getBeanFactory(); if (beanFactory.containsLocalBean("applicationEventMulticaster" )) { this .applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster" , ApplicationEventMulticaster.class); if (this .logger.isTraceEnabled()) { this .logger.trace("Using ApplicationEventMulticaster [" + this .applicationEventMulticaster + "]" ); } } else { this .applicationEventMulticaster = new SimpleApplicationEventMulticaster (beanFactory); beanFactory.registerSingleton("applicationEventMulticaster" , this .applicationEventMulticaster); if (this .logger.isTraceEnabled()) { this .logger.trace("No 'applicationEventMulticaster' bean, using [" + this .applicationEventMulticaster.getClass().getSimpleName() + "]" ); } } } }
3.2 注册所有的监听器 接下来,spring容器会注册所有的监听器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { protected void registerListeners () { Iterator var1 = this .getApplicationListeners().iterator(); while (var1.hasNext()) { ApplicationListener<?> listener = (ApplicationListener)var1.next(); this .getApplicationEventMulticaster().addApplicationListener(listener); } String[] listenerBeanNames = this .getBeanNamesForType(ApplicationListener.class, true , false ); String[] var7 = listenerBeanNames; int var3 = listenerBeanNames.length; for (int var4 = 0 ; var4 < var3; ++var4) { String listenerBeanName = var7[var4]; this .getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); } Set<ApplicationEvent> earlyEventsToProcess = this .earlyApplicationEvents; this .earlyApplicationEvents = null ; if (!CollectionUtils.isEmpty(earlyEventsToProcess)) { Iterator var9 = earlyEventsToProcess.iterator(); while (var9.hasNext()) { ApplicationEvent earlyEvent = (ApplicationEvent)var9.next(); this .getApplicationEventMulticaster().multicastEvent(earlyEvent); } } } }
addApplicationListener会将listener和相应的事件类型记录下来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class AbstractApplicationEventMulticaster implements ApplicationEventMulticaster , BeanClassLoaderAware, BeanFactoryAware { public void addApplicationListener (ApplicationListener<?> listener) { synchronized (this .defaultRetriever) { Object singletonTarget = AopProxyUtils.getSingletonTarget(listener); if (singletonTarget instanceof ApplicationListener) { this .defaultRetriever.applicationListeners.remove(singletonTarget); } this .defaultRetriever.applicationListeners.add(listener); this .retrieverCache.clear(); } } }
3.3 事件发生时,multicast event
调用multicaster的multicastEvent方法 上面已经提到,事件发生时,会调用AbstractApplicationContext的publishEvent方法,它会调用注入到容器里的multicaster执行multicastEvent。
根据事件类型,找到所有监听这类事件的listener
Iterator var5 = this.getApplicationListeners(event, type).iterator();
详细看下getApplicationListeners方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) { Object source = event.getSource(); Class<?> sourceType = source != null ? source.getClass() : null ; AbstractApplicationEventMulticaster.ListenerCacheKey cacheKey = new AbstractApplicationEventMulticaster .ListenerCacheKey(eventType, sourceType); AbstractApplicationEventMulticaster.CachedListenerRetriever new Retriever = null ; AbstractApplicationEventMulticaster.CachedListenerRetriever existingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this .retrieverCache.get (cacheKey); if (existingRetriever == null && (this .beanClassLoader == null || ClassUtils.isCacheSafe(event.getClass(), this .beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this .beanClassLoader)))) { new Retriever = new AbstractApplicationEventMulticaster .CachedListenerRetriever(); existingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this .retrieverCache.putIfAbsent(cacheKey, new Retriever ); if (existingRetriever != null ) { new Retriever = null ; } } if (existingRetriever != null ) { Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners(); if (result != null ) { return result; } } return this .retrieveApplicationListeners(eventType, sourceType, new Retriever ); }
对于每一个listener,invoke该listener
1 2 3 4 5 6 7 8 9 10 while (var5.hasNext() ) { ApplicationListener<?> listener = (ApplicationListener)var5.next() ; if (executor != null) { executor.execute(() -> { this.invokeListener(listener , event ) ; }); } else { this.invokeListener(listener , event ) ; } }
References