EventBus源码解析

简介

EventBus是一款由GreenRebot开发的事件发布-订阅总线,它简化了组件之间通信的复杂度。

三个角色

  • Event:事件,它可以是任意类型,EventBus会根据事件的类型,进行全局的事件分发。
  • Subsciber:事件订阅者,在EventBus3.0之前,我们必须定义以onEvent开头的方法,onEvent、onEventMainThread、onEventBackgroundThread和onEventAsync,而在3.0之后事件的方法名字可以随意,不过要加上@Subscribe注解。并且指定线程模型,默认是POSTING。
  • Publisher:事件发布者,可以在任意线程发布事件。

四种线程模型

  • POSTING:默认,表示事件处理函数的线程跟发布事件的线程在同一个线程。
  • MAIN:表示事件处理函数的线程在主线程(UI)线程,因此在这里不能进行耗时操作。
  • BACKGROUND:表示事件处理函数的线程在后台线程,因此不能进行UI操作。如果发布事件的线程是主线程(UI线程),那么事件处理函数将会开启一个后台线程,如果果发布事件的线程是在后台线程,那么事件处理函数就使用该线程。
  • ASYNC:表示无论事件发布的线程是哪一个,事件处理函数始终会新建一个子线程运行,同样不能进行UI操作。

基本用法

定义事件

1
2
3
4
5
6
7
8
9
10
11
public class MsgEvent {
private String msg;

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

注册事件

1
EventBus.getDefault().register(subscriber);

处理时间

1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMsgEvent(Object subscriber){
//TODO
}

源码

获取实例

在创建EventBus实例的时候,一种方式是按照我们上面的形式,通过EventBus的静态方法getDefault来获取一个实例。getDefault本身会调用其内部的构造方法,通过传入一个默认的EventBusBuilder来创建EventBus。此外,我们还可以直接通过EventBus的builder()方法获取一个EventBusBuilder的实例,然后通过该构建者模式来个性化地定制自己的EventBus。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 静态的单例实例
static volatile EventBus defaultInstance;

// 默认的构建者
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

// 实际上使用了DCL双检锁机制,这里简化了一下
public static EventBus getDefault() {
if (defaultInstance == null) defaultInstance = new EventBus();
return defaultInstance;
}

public EventBus() {
this(DEFAULT_BUILDER);
}

// 调用getDefault的时候,最终会调用该方法,使用DEFAULT_BUILDER创建一个实例
EventBus(EventBusBuilder builder) {
// ...
}

// 也可以使用下面的方法获取一个构建者,然后使用它来个性化定制EventBus
public static EventBusBuilder builder() {
return new EventBusBuilder();
}

注册

当调用EventBus实例的register方法的时候,会执行下面的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void register(Object subscriber) {
// 首席会获取注册的对象的类型
Class<?> subscriberClass = subscriber.getClass();
// 然后获取注册的对象的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 对当前实例加锁,并不断执行监听的逻辑
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
// 对订阅方法进行注册
subscribe(subscriber, subscriberMethod);
}
}
}

这里的SubscriberMethod封装了订阅方法(使用@Subscribe注解的方法)类型的信息,它的定义如下所示。从下面可以的代码中我们可以看出,实际上该类就是通过几个字段来存储@Subscribe注解中指定的类型信息,以及一个方法的类型变量。

1
2
3
4
5
6
7
8
9
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;

// ...
}

register方法通过subscriberMethodFinder实例的findSubscriberMethods方法来获取该观察者类型中的所有订阅方法,然后将所有的订阅方法分别进行订阅。下面我们先看下查找订阅者的方法。

查找订阅者的订阅方法

下面是SubscriberMethodFinder中的findSubscriberMethods方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 这里首先从缓存当中尝试去取该订阅者的订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

// 当缓存中没有找到该观察者的订阅方法的时候使用下面的两种方法获取方法信息
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException(...);
} else {
// 将获取到的订阅方法放置到缓存当中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

这里我们先从缓存当中尝试获取某个观察者中的所有订阅方法,如果没有可用缓存的话就从该类中查找订阅方法,并在返回结果之前将这些方法信息放置到缓存当中。这里的ignoreGeneratedIndex参数表示是否忽略注解器生成的MyEventBusIndex,该值默认为false。然后,我们会进入到下面的方法中获取订阅方法信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 这里通过FindState对象来存储找到的方法信息
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
// 这里是一个循环操作,会从当前类开始遍历该类的所有父类
while (findState.clazz != null) {
// 获取订阅者信息
findState.subscriberInfo = getSubscriberInfo(findState); // 1
if (findState.subscriberInfo != null) {
// 如果使用了MyEventBusIndex,将会进入到这里并获取订阅方法信息
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 未使用MyEventBusIndex将会进入这里使用反射获取方法信息
findUsingReflectionInSingleClass(findState); // 2
}
// 将findState.clazz设置为当前的findState.clazz的父类
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

在上面的代码中,会从当前订阅者类开始直到它最顶层的父类进行遍历来获取订阅方法信息。这里在循环的内部会根据我们是否使用了MyEventBusIndex走两条路线,对于我们没有使用它的,会直接使用反射来获取订阅方法信息,即进入2处。

下面是使用反射从订阅者中得到订阅方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 获取该类中声明的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 对方法进行遍历判断
for (Method method : methods) {
int modifiers = method.getModifiers();
// 这里会对方法的修饰符进行校验
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 这里对方法的输入参数进行校验
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 获取方法的注解,用来从注解中获取注解的声明信息
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 获取该方法的第一个参数
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 最终将封装之后的方法塞入到列表中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(...);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(...);
}
}
}

这里会对当前类中声明的所有方法进行校验,并将符合要求的方法的信息封装成一个SubscriberMethod对象塞到列表中。

注册订阅方法

直到了如何拿到所有的订阅方法之后,我们回到之前的代码,看下订阅过程中的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 将所有的观察者和订阅方法封装成一个Subscription对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // 1
// 尝试从缓存中根据事件类型来获取所有的Subscription对象
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // 2
if (subscriptions == null) {
// 指定的事件类型没有对应的观察对象的时候
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException(...);
}
}

// 这里会根据新加入的方法的优先级决定插入到队列中的位置
int size = subscriptions.size(); // 2
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

// 这里又会从“订阅者-事件类型”列表中尝试获取该订阅者对应的所有事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); // 3
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

// 如果是黏性事件还要进行如下的处理
if (subscriberMethod.sticky) { // 4
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 这里会向该观察者通知所有的黏性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

这里涉及到了几个集合,它们是用来做缓存的,还有就是来维护观察者、事件类型和订阅方法之间的关系的。注册观察的方法比较长,我们可以一点一点来看。首先,会在代码1处将观察者和订阅方法封装成一个Subscription对象。然后,在2处用到了CopyOnWriteArrayList这个集合,它是一种适用于多读写少场景的数据结构,是一种线程安全的数组型的数据结构,主要用来存储一个事件类型所对应的全部的Subscription对象。EventBus在这里通过一个Map<Class<?>, CopyOnWriteArrayList>类型的哈希表来维护这个映射关系。然后,我们的程序执行到2处,在这里会对Subscription对象的列表进行遍历,并根据订阅方法的优先级,为当前的Subscription对象寻找一个合适的位置。3的地方主要的逻辑是获取指定的观察者对应的全部的观察事件类型,这里也是通过一个哈希表来维护这种映射关系的。然后,在代码4处,程序会根据当前的订阅方法是否是黏性的,来决定是否将当前缓存中的信息发送给新订阅的方法。这里会通过checkPostStickyEventToSubscription方法来发送信息,它内部的实现的逻辑和post方法类似,我们不再进行说明。

取消注册的逻辑比较比较简单,基本上就是注册操作反过来——将当前订阅方法的信息从缓存中踢出来,我们不再进行分分析。下面我们分析另一个比较重要的地方,即发送事件相关的逻辑。

通知

通知的逻辑相对来说会比较复杂一些,因为这里面涉及一些线程之间的操作。我们看下下面的代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void post(Object event) {
// 这里从线程局部变量中取出当前线程的状态信息
PostingThreadState postingState = currentPostingThreadState.get();
// 这里是以上线程局部变量内部维护的一个事件队列
List<Object> eventQueue = postingState.eventQueue;
// 将当前要发送的事件加入到队列中
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 不断循环来发送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState); // 1
}
} finally {
// 恢复当前线程的信息
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

这里的currentPostingThreadState是一个ThreadLocal类型的变量,其中存储了对应于当前线程的PostingThreadState对象,该对象中存储了当前线程对应的事件列表和线程的状态信息等。从上面的代码中可以看出,post方法会在1处不断从当前线程对应的队列中取出事件并进行发布。下面我们看以下这里的postSingleEvent方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 这里向上查找该事件的所有父类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 对上面的事件进行处理
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 找不到该事件的异常处理
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent
&& eventClass != NoSubscriberEvent.class
&& eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

在上面的代码中,我们会根据eventInheritance的值决定是否要同时遍历当前事件的所有父类的事件信息并进行分发。如果设置为true就将执行这一操作,并最终使用postSingleEventForEventType对每个事件类型进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
// 获取指定的事件对应的所有的观察对象
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍历观察对象,并最终执行事件的分发操作
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

在上面的代码中,我们会通过传入的事件类型到缓存中取寻找它对应的全部的Subscription,然后对得到的Subscription列表进行遍历,并依次调用postToSubscription方法执行事件的发布操作。下面是postToSubscription方法的代码,这里我们会根据订阅方法指定的threadMode信息来执行不同的发布策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException(...);
}
}

在上面的方法中,会根据当前的线程状态和订阅方法指定的threadMode信息来决定合适触发方法。这里的invokeSubscriber会在当前线程中立即调用反射来触发指定的观察者的订阅方法。否则会根据具体的情况将事件加入到不同的队列中进行处理。这里的mainThreadPoster最终继承自Handler,当调用它的enqueue方法的时候,它会发送一个事件并在它自身的handleMessage方法中从队列中取值并进行处理,从而达到在主线程中分发事件的目的。这里的backgroundPoster实现了Runnable接口,它会在调用enqueue方法的时候,拿到EventBus的ExecutorService实例,并使用它来执行自己。在它的run方法中会从队列中不断取值来进行执行。

总结

以上就是Android中的EventBus的源码分析,这里我们回答之前提出的几个问题来作结:

  1. 在EventBus中,使用@Subscribe注解的时候指定的ThreadMode是如何实现在不同线程间传递数据的?

要求主线程中的事件通过Handler来实现在主线程中执行,非主线程的方法会使用EventBus内部的ExecutorService来执行。实际在触发方法的时候会根据当前线程的状态和订阅方法的ThreadMode指定的线程状态来决定何时触发方法。非主线程的逻辑会在post的时候加入到一个队列中被随后执行。

  1. 使用注解和反射的时候的效率问题,是否会像Guava的EventBus一样有缓存优化?

内部使用了缓存,确切来说就是维护了一些映射的关系。但是它的缓存没有像Guava一样使用软引用之类方式进行优化,即一直是强引用类型的。

  1. 黏性事件是否是通过内部维护了之前发布的数据来实现的,是否使用了缓存?

黏性事件会通过EventBus内部维护的一个事件类型-黏性事件的哈希表存储,当注册一个观察者的时候,如果发现了它内部有黏性事件监听,会执行post类似的逻辑将事件立即发送给该观察者。

您的支持是我原创的动力