EventBus源码分析

基本用法

  1. 通过@Subscribe注解声明订阅方法,参数为定义好的事件类型
1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN)
public void onRecieve(StringEvent event) {
Toast.makeText(this, event.val, Toast.LENGTH_SHORT).show();
}
  1. 注册订阅者
1
EventBus.getDefault().register(this);
  1. 发送事件,事件将会被传递给第一步创建的订阅方法
1
EventBus.getDefault().post(new StringEvent("Hello"));
  1. 解除订阅
1
EventBus.getDefault().unregister(this);

注册订阅者

注册阶段从使用上来看主要调用了两个方法:

  • public static EventBus getDefault():获取默认的EventBus对象
  • public void register(Object subscriber):对所有订阅方法进行订阅

获取EventBus对象

EventBus使用了典型的DCL单例模式获取默认EventBus:

1
2
3
4
5
6
7
8
9
10
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

订阅类中所有订阅方法

此阶段主要分为两个两个步骤:

  1. subscriberMethodFinder.findSubscriberMethods(subscriberClass):查找类中所有的订阅方法

  2. subscribe(subscriber, subscriberMethod):对所有订阅方法进行订阅

1
2
3
4
5
6
7
8
9
10
11
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//查找订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//进行订阅
subscribe(subscriber, subscriberMethod);
}
}
}

查找订阅方法

SubscriberMethodFinder首先会从缓存(METHOD_CACHE)中获取。如果没有缓存,则根据ignoreGeneratedIndex的值选择反射对应类获取或者直接从注解处理器生成的信息中获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//从缓存中获取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {
//通过反射获取
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//通过注解处理器生成的信息查找
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

ignoreGeneratedIndex为true时会通过反射的方式查找类中所有符合条件的方法(public的非static,非abstract,非bridge,非synthetic且含有Subscribe注解的方法)

1
2
3
4
5
6
7
8
9
10
11
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
//查找该类以及它的父类
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
//返回订阅方法集合
return getMethodsAndRelease(findState);
}

ignoreGeneratedIndex为false时,参考后文

订阅类中的所有方法

所有订阅方法都会被封装成一个SubscriberMethod对象,register()方法的最后一个步骤即为:遍历SubscribeMethod集合,使用subscibe()方法将Subscriber与SubscribeMethod建立订阅关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//按照事件类型分组,存储所有Subscription
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//按照订阅方法的优先级排序
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//按照订阅者分组存储所有事件
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
//省略粘性事件。。。
}
}

订阅关系被描述为一个Subscription对象,包含了订阅者以及它的所有订阅方法。所有的Subscription按照事件类型(订阅方法的参数)分组,存储在Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType中,组内按照订阅方法的优先级进行排序。同时,所有的事件会按照Subscriber分组存储在Map<Object, List<Class<?>>> typesBySubscriber中。对于粘性事件的处理参考后文

事件传递

当调用EventBus.getDefault().post(new Event())后,事件将按照类型传递给注册的订阅者。过程如下:

  • 通过ThreadLocal获取当前线程的状态(PostingThreadState);
  • 将事件放入当前的事件队列(PostingThreadState.eventQueue)中;
  • 遍历事件队列,将所有事件传递给postSingleEvent()方法。

该方法主要负责取出当前线程的所有事件,并一个个交给postSingleEvent()处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void post(Object event) {
//使用ThreadLocal获取当前线程的PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

postSingleEvent()方法主要负责确定所有需要传递的事件类型,当subscriptionFound为true时(默认为true),EventBus会寻找EventType的所有接口和超类,并通过postSingleEventForEventType()依次传递给相应的订阅者。如果为false,则仅传递当前事件类型。如果该事件没有订阅者,则会将该事件传递给NoSubscriberEvent的订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

postSingleEventForEventType()会通过传递过来的事件类型,从subscriptionsByEventType中取出对应的Subscription并交给postToSubscription()方法处理,直到处理完所有的Subscription或被取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

之前的所有方法总的来说都是对事件的细化,事件最总会由postToSubscription()方法进行传递。根据SubscriptionthreadMode,会执行不同的传递逻辑:

  • POSTING(默认模式):直接通过反射调用订阅方法,在这种模式下订阅方法的执行一定要快,否则将会阻塞剩余事件的传递;
  • ASYNC:进入AsyncPoster的事件队列(PendingPostQueue),通过一个异步线程去调用订阅方法;

剩余的ThreadMode和环境有关,对于在Android中的情景

  • MAIN:如果当前为UI线程,则直接反射调用订阅方法,否则进入mainThreadPosterPendingPostQueue并通过Handler切换到UI线程去调用订阅方法;
  • MAIN_ORDERED:总是进入mainThreadPosterPendingPostQueue并通过Handler切换到UI线程去调用订阅方法;
  • BACKGROUND:如果当前为UI线程,则进入BackgroundPosterPendingPostQueue,并通过一个异步线程去调用订阅方法,否则同POSTING

对于不在Android中的情景

  • MAIN、MAIN_ORDERED:同POSTING
  • BACKGROUND:进入BackgroundPosterPendingPostQueue,并通过一个异步线程去调用订阅方法。

对于ASYNC和BACKGROUND都会有异步线程执行的情况,但是执行过程有区别,前者对于每个事件会使用不同的线程去执行,而后者会在一个线程中执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

对于Poster,这里只分析mainThreadPosterHandlerPosterHandlerPoster继承自Handler,并实现了Poster接口,沿用了Android消息机制中的HandlerLooper,但消息队列是自己维护了一个PendingPostQueue(链表结构)。当有事件传递过来时,将该事件放入队列中,并通过Handler发送一个空消息:

1
2
3
4
5
6
7
8
9
10
11
12
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

空消息经过Android的消息机制的传递,会交给HandlerPosterhandlerMessage()方法处理。这里通过一个死循环不断从队列中取出事件,然后通过反射调用订阅方法,值得注意的是,为了避免事件太多阻塞UI线程,当执行时间大于maxMillisInsideHandleMessage时,会结束事件的执行,并重新发送一条空消息,等到重新传递到这里时再继续处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

至于AsyncPosterBackgroundPoster执行过程类似,只是将Handler换成了线程池Executors.newCachedThreadPool()

解除订阅

在订阅者不再使用时需要调用EventBus.unregister()方法解除订阅,否则会造成内存泄漏。解除订阅只是简单的将订阅时添加到typesBySubscribersubscriptionsByEventType中的相应数据移除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

性能优化

粘性事件

感悟

对象池

设计模式