文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一篇带给你EventBus原理解析

2024-11-30 18:41

关注

EventBus基于观察者模式,原理是大体也是将观察者与要观察的消息注册到一个map,当有消息发布时,从map中找到观察者,并调用观察者对应的方法,下面我们基于源码进行解析来看看是不是这样的原理

不废话,没有卖课,直接来

1、EventBus订阅

(1)订阅整体流程

EventBus的定义始于register方法:

public void register(Object subscriber) {
//获取订阅者/观察者的class对象
Class subscriberClass = subscriber.getClass();
//从订阅者class对象中找出所有订阅消息的方法列表
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//变量订阅消息的方法列表
//定义消息
subscribe(subscriber, subscriberMethod);
}
}
}

register方法主要是从订阅者class对象中找出所有订阅消息的方法列表,然后对每个订阅方法调用subscribe进行订阅:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//方法订阅的消息类型,即订阅方法的参数类型class
Class eventType = subscriberMethod.eventType;
//创建订阅者/观察者
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//从事件/订阅者map中,找出订阅事件的订阅者列表
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//事件/订阅者map中没有事件相关的订阅者,则:
//新建订阅事件的订阅者列表
subscriptions = new CopyOnWriteArrayList<>();
//并将事件的class作为key,事件订阅者列表作为value存入事件/订阅者map
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//
if (subscriptions.contains(newSubscription)) {
//已经订阅过,则抛出异常
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
//按优先级将订阅者加入对应的位置
subscriptions.add(i, newSubscription);
break;
}
}
//订阅者/事件map中找到订阅事件列表
List<Class> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
//如果订阅者/事件map没有订阅事件列表,则:
//新建订阅事件列表,并存入订阅者/事件map中,key是订阅者,value是要订阅的事件列表
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
//如果订阅事件是粘性事件,则:
if (eventInheritance) {
//事件允许发布到其父类或者父接口,则:
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List).
Set<Map.Entry<Class, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class, Object> entry : entries) {
//遍历粘性事件map
Class candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
//事件是粘性事件的父类、父接口,则:
Object stickyEvent = entry.getValue();
//发布粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//发布粘性事件
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}


register方法中的subscriberMethodFinder.findSubscriberMethods(subscriberClass),它从订阅者class对象中找到要所有要订阅的事件,我们看看它是怎么实现的:

List<SubscriberMethod> findSubscriberMethods(Class subscriberClass) {
//从缓存中获取订阅者的所有事件方法列表
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//缓存汇总没有,则:
if (ignoreGeneratedIndex) {
//如果忽略从编译器期自动生成订阅者/事件关系文件中查找,则:
//通过反射订阅者class找到订阅事件方法列表
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//如果从编译器期自动生成订阅者/事件关系文件中查找,则:
//从订阅者/事件关系文件中查找订阅事件方法列表
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将订阅者的订阅事件列表进行缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

(2)反射查找订阅事件方法列表

private List<SubscriberMethod> findUsingReflection(Class subscriberClass) {
//从查找状态对象复用池中获取/创建一个新的查找状态对象
FindState findState = prepareFindState();
//初始化查找状态对象,设置订阅者class
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//通过反射查找查找状态对象当前class,最开始从订阅者class开始查找,接着递归其父类或者父接口查找
findUsingReflectionInSingleClass(findState);
//继续从其父类/父接口中查找
findState.moveToSuperclass();
}
//返回订阅事件方法列表,并将查找状态对象缓存到复用池中
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//反射获取当前class定义的方法列表
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
//出现异常,则获取所有方法(包含所有父类/父接口)
methods = findState.clazz.getMethods();
//并标记不需要从父类/父接口汇总查找
findState.skipSuperClasses = true;
}
for (Method method : methods) {
//遍历方法列表
//获取方法访问性
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//方法是public、静态、抽象,则:
//反射获取方法的参数类型列表
Class[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
//方法参数只有一个参数,则:
//反射获取方法的Subscribe注解对象
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//方法被Subscribe注解修饰,则:
//获取参数类型class
Class eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
//获取事件方法的线程模型
ThreadMode threadMode = subscribeAnnotation.threadMode();
//收集注解信息:线程模型、优先级、粘性,创建事件方法,并加入订阅者的订阅事件方法列表中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//如果方法严格校验并且被Subscribe注解修饰,则抛出方法参数多于1个的异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//如果方法严格校验并且被Subscribe注解修饰,则抛出方法非public、非静态、非抽象的异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

findUsingReflectionInSingleClass通过反射从class中获取方法列表,并按Subscribe注解规则(规则是:方法被Subscribe修饰、是public/static/abstract、只有一个参数)进行过滤,找到匹配的事件方法,并加入订阅事件方法列表

(3)编译期注解处理器,从编译期生成的订阅者/事件关系文件中查找订阅方法列表

EventBusAnnotationProcessor 是EventBus的编译期注解处理器实现类,其核心源码如下:

//订阅者/事件方法列表关系列表
private final ListMap methodsByClass = new ListMap<>();
//被忽略的类
private final Set classesToSkip = new HashSet<>();
@Override
public boolean process(Set annotations, RoundEnvironment env) {
Messager messager = processingEnv.getMessager();
try {
//读取模块定义的订阅者/事件索引类名(含包名+类名)
String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
if (index == null) {
messager.printMessage(Diagnostic.Kind.ERROR, "No option " + OPTION_EVENT_BUS_INDEX +
" passed to annotation processor");
return false;
}
verbose = Boolean.parseBoolean(processingEnv.getOptions().get(OPTION_VERBOSE));
int lastPeriod = index.lastIndexOf('.');
//订阅者/事件索引类包名
String indexPackage = lastPeriod != -1 ? index.substring(0, lastPeriod) : null;
//......
//通过注解收集订阅者/事件方法列表关系列表
collectSubscribers(annotations, env, messager);
//根据订阅语法规则,生成忽略被忽略的观察者列表
checkForSubscribersToSkip(messager, indexPackage);
if (!methodsByClass.isEmpty()) {
//生成订阅者/事件索引类文件
createInfoIndexFile(index);
} else {
messager.printMessage(Diagnostic.Kind.WARNING, "No @Subscribe annotations found");
}
writerRoundDone = true;
} catch (RuntimeException e) {
// IntelliJ does not handle exceptions nicely, so log and print a message
e.printStackTrace();
messager.printMessage(Diagnostic.Kind.ERROR, "Unexpected error in EventBusAnnotationProcessor: " + e);
}
return true;
}
//通过注解收集订阅者/事件方法列表关系列表
private void collectSubscribers(Set annotations, RoundEnvironment env, Messager messager) {
for (TypeElement annotation : annotations) {
//遍历注解类型列表
//从注解类型元信息中获取匹配的注解元信息列表
Set elements = env.getElementsAnnotatedWith(annotation);
for (Element element : elements) {
//遍历注解元信息列表
if (element instanceof ExecutableElement) {
//如果是方法元信息,则:
ExecutableElement method = (ExecutableElement) element;
if (checkHasNoErrors(method, messager)) {
//校验方法元信息,如果方法是public且只有一个参数,则:
//获取方法所属的类类型元信息,此类类型就是对应的订阅者,方法是消耗处理事件
TypeElement classElement = (TypeElement) method.getEnclosingElement();
//将订阅者/事件方法加入订阅者/事件方法列表map中,key是订阅者,value是事件方法列表
methodsByClass.putElement(classElement, method);
}
} else {
messager.printMessage(Diagnostic.Kind.ERROR, "@Subscribe is only valid for methods", element);
}
}
}
}
private void checkForSubscribersToSkip(Messager messager, String myPackage) {
for (TypeElement skipCandidate : methodsByClass.keySet()) {
//遍历订阅者/事件方法列表map
//订阅者class类类型元信息
TypeElement subscriberClass = skipCandidate;
while (subscriberClass != null) {
//循环:订阅者class不为空,则:
if (!isVisible(myPackage, subscriberClass)) {
//如果订阅者class不是public或者与要生成的索引类不是同包,则忽略此订阅则,将其加入忽略列表
boolean added = classesToSkip.add(skipCandidate);
//.......
break;
}
//取出订阅者要订阅的事件方法列表
List methods = methodsByClass.get(subscriberClass);
if (methods != null) {
for (ExecutableElement method : methods) {
//遍历订阅的事件方法列表
String skipReason = null;
VariableElement param = method.getParameters().get(0);
TypeMirror typeMirror = getParamTypeMirror(param, messager);
if (!(typeMirror instanceof DeclaredType) ||
!(((DeclaredType) typeMirror).asElement() instanceof TypeElement)) {
//参数类型无法处理(不是基类性、类/接口类型、数组类型、类型变量、空类型),则标间忽略原因,一般不会出现此情况
skipReason = "event type cannot be processed";
}
if (skipReason == null) {
TypeElement eventTypeElement = (TypeElement) ((DeclaredType) typeMirror).asElement();
if (!isVisible(myPackage, eventTypeElement)) {
//如果参数类型对应索引类不可访问:如不是public或者不在同包名等,则标间忽略原因
skipReason = "event type is not public";
}
}
if (skipReason != null) {
//如果被标记被忽略,则:
//加入忽略列表
boolean added = classesToSkip.add(skipCandidate);
//.......
break;
}
}
}
//获取订阅类的父类/父接口,继续递归过滤
subscriberClass = getSuperclass(subscriberClass);
}
}
}
//生成订阅者/事件方法列表索引类文件
private void createInfoIndexFile(String index) {
BufferedWriter writer = null;
try {
//生成订阅者/事件方法列表索引类文件,文件为索引类全限定名
JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
int period = index.lastIndexOf('.');
//索引类包名
String myPackage = period > 0 ? index.substring(0, period) : null;
//索引类名
String clazz = index.substring(period + 1);
//创建文件读写器
writer = new BufferedWriter(sourceFile.openWriter());
if (myPackage != null) {
//文件写入类包名
writer.write("package " + myPackage + ";\n\n");
}
//文件写入引用的类
writer.write("import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n");
writer.write("import org.greenrobot.eventbus.ThreadMode;\n\n");
writer.write("import java.util.HashMap;\n");
writer.write("import java.util.Map;\n\n");
writer.write("\n");
//文件写入定义类并实现SubscriberInfoIndex 接口
writer.write("public class " + clazz + " implements SubscriberInfoIndex {\n");
//文件写入声明订阅者/事件方法列表map,key是订阅者class,value是SubscriberInfo订阅信息对象
writer.write(" private static final Map, SubscriberInfo> SUBSCRIBER_INDEX;\n\n");
//文件写入静态块
writer.write(" static {\n");
//文件写入创建订阅者/事件方法列表map
writer.write(" SUBSCRIBER_INDEX = new HashMap, SubscriberInfo>();\n\n");
//调用writeIndexLines完成将订阅者/事件方法列表写入map中
writeIndexLines(writer, myPackage);
writer.write(" }\n\n");
//文件写入putIndex方法,方法用于将订阅者要订阅的信息存入map中
writer.write(" private static void putIndex(SubscriberInfo info) {\n");
writer.write(" SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n");
writer.write(" }\n\n");
//文件写入实现SubscriberInfoIndex 接口的getSubscriberInfo方法,方法是根据订阅者class从map中取订阅者信息并返回
writer.write(" @Override\n");
writer.write(" public SubscriberInfo getSubscriberInfo(Class subscriberClass) {\n");
writer.write(" SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n");
writer.write(" if (info != null) {\n");
writer.write(" return info;\n");
writer.write(" } else {\n");
writer.write(" return null;\n");
writer.write(" }\n");
writer.write(" }\n");
writer.write("}\n");
} catch (IOException e) {
throw new RuntimeException("Could not write source for " + index, e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
//Silent
}
}
}
}
//完成将订阅者/事件方法列表写入map中
private void writeIndexLines(BufferedWriter writer, String myPackage) throws IOException {
for (TypeElement subscriberTypeElement : methodsByClass.keySet()) {
//遍历收集到的订阅者/事件方法列表
if (classesToSkip.contains(subscriberTypeElement)) {
//订阅者被忽略
continue;
}
//获取订阅者class
String subscriberClass = getClassString(subscriberTypeElement, myPackage);
if (isVisible(myPackage, subscriberTypeElement)) {
//调用索引类的putIndex方法,创建订阅信息类SimpleSubscriberInfo,作为参数,其作用是写入如下语句:
putIndex(new SimpleSubscriberInfo(subscriberClass, true, SubscriberMethodInfo[]))
writeLine(writer, 2,
"putIndex(new SimpleSubscriberInfo(" + subscriberClass + ".class,",
"true,", "new SubscriberMethodInfo[] {");
List methods = methodsByClass.get(subscriberTypeElement);
writeCreateSubscriberMethods(writer, methods, "new SubscriberMethodInfo", myPackage);
writer.write(" }));\n\n");
} else {
writer.write(" // Subscriber not visible to index: " + subscriberClass + "\n");
}
}
}
//创建SubscriberMethodInfo对象并写入索引文件
private void writeCreateSubscriberMethods(BufferedWriter writer, List methods,
String callPrefix, String myPackage) throws IOException {
for (ExecutableElement method : methods) {
//遍历事件方法列表
//获取方法参数列表
List parameters = method.getParameters();
//获取第一个参数类型对象
TypeMirror paramType = getParamTypeMirror(parameters.get(0), null);
//参数类型元对象
TypeElement paramElement = (TypeElement) processingEnv.getTypeUtils().asElement(paramType);
//获取方法名称
String methodName = method.getSimpleName().toString();
//获取参数类型类型,即事件类型
String eventClass = getClassString(paramElement, myPackage) + ".class";
//获取方法Subscribe注解对象
Subscribe subscribe = method.getAnnotation(Subscribe.class);
List parts = new ArrayList<>();
//创建SubscriberMethodInfo
parts.add(callPrefix + "(\"" + methodName + "\",");
String lineEnd = "),";
if (subscribe.priority() == 0 && !subscribe.sticky()) {
//如果注解信息中优先级为0并且非粘性事件,则
if (subscribe.threadMode() == ThreadMode.POSTING) {
//如果线程模式为POSTING,则:
//创建:new SubscriberMethodInfo(methodName, eventClass)
parts.add(eventClass + lineEnd);
} else {
//线程模式不是POSTING,则:
//创建:new SubscriberMethodInfo(methodName, eventClass, ThreadMode.xxx)
parts.add(eventClass + ",");
parts.add("ThreadMode." + subscribe.threadMode().name() + lineEnd);
}
} else {
//如果注解信息中优先级不是0或者是粘性事件,则
//创建:new SubscriberMethodInfo(methodName, eventClass, ThreadMode.xxx, priority, sticky)
parts.add(eventClass + ",");
parts.add("ThreadMode." + subscribe.threadMode().name() + ",");
parts.add(subscribe.priority() + ",");
parts.add(subscribe.sticky() + lineEnd);
}
writeLine(writer, 3, parts.toArray(new String[parts.size()]));
if (verbose) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Indexed @Subscribe at " +
method.getEnclosingElement().getSimpleName() + "." + methodName +
"(" + paramElement.getSimpleName() + ")");
}
}
}

编译器注解处理器其实做的是跟运行时反射获取订阅者class的订阅事件方法列表有点类似,都是从订阅者class收集subscribe注解修饰的订阅事件方法列表。

编译器注解处理器主要做:

(4)从编译期生成的订阅者/事件关系文件中查找订阅方法列表

从编译期生成的订阅者/事件索引类文件中查找订阅方法列表的入口是findUsingInfo:

private List findUsingInfo(Class subscriberClass) {
//从查找状态对象复用池中获取/创建一个新的查找状态对象
FindState findState = prepareFindState();
//初始化查找状态对象,设置订阅者class
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//从索引类中查找订阅者对应的定义信息对象
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果订阅者订阅的订阅信息对象不为空,则:
//从订阅信息对象中获取订阅事件方法数组
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//将数组转为list
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//如果索引类中没有匹配的,则通过运行时注解反射获取订阅事件方法列表
findUsingReflectionInSingleClass(findState);
}
//递归从父类/父接口继续查找
findState.moveToSuperclass();
}
//返回订阅事件方法列表
return getMethodsAndRelease(findState);
}

findUsingInfo方法先从索引类中找到订阅者class对应的订阅事件方法列表,如果没有则转为运行时注解通过反射查找

从索引类中找到订阅者class对应的订阅信息对象的源码如下:

private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
//订阅信息对象不为空,且有父级订阅信息对象,则:
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
//如果订阅者class与父级订阅信息对象的class一样,则返回父级订阅信息对象
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
//遍历所有的索引类,即遍历所有模块的索引类(一个模块一个索引类)
//从索引类订阅者/订阅信息map中查找订阅信息对象
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
//找到则返回
return info;
}
}
}
return null;
}

2、EventBus发布消息

EventBus基于观察者模式,所以其发布消息的流程大致为从观察者列表中找到匹配的观察者,并将消息传递给匹配的观察者处理。

EventBus是不是这样的处理逻辑呢,我们来看看其发布源码,EventBus的发布消息入口是post方法:

public void post(Object event) {
//从ThreadLocal获取当前线程的PostingThreadState对象
PostingThreadState postingState = currentPostingThreadState.get();
//发布消息队列
List eventQueue = postingState.eventQueue;
//将消息加入队列中
eventQueue.add(event);
if (!postingState.isPosting) {
//如果发布队列没有运行,则:
//标记发布状态所处的线程是否是主线程
postingState.isMainThread = isMainThread();
//标记正在发布消息
postingState.isPosting = true;
if (postingState.canceled) {
//已取消发布,则抛出异常
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//遍历消息队列
//每次从队列删除首个消息,并进行发布
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

post主要做的是将消息加入队列中,并遍历队列对每个消息进行一一发布,我们继续看下postSingleEvent:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//如果支持发布的消息的父级类型消息,则:
//从当前消息class找出其所有的父类/父接口类型列表,含本身
List> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
//遍历消息类型列表
Class clazz = eventTypes.get(h);
//对每一种消息类型进行发布消息
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//如果不支持发布的消息的父级类型消息,则:
//直接发布当前消息
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//......
}

postSingleEvent中分为两种情况,一种支持遍历当前消息的父级类型及本身作为消息,然后按照类型一一发布消息。

一种仅支持消息本身类型,则直接发布当前消息,我们继续往下看postSingleEventForEventType:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) {
CopyOnWriteArrayList subscriptions;
synchronized (this) {
//根据消息类型class从消息/订阅者列表map中找到订阅此消息的订阅者列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//遍历此消息的订阅者列表
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//调用postToSubscription将消息传递给订阅者进行处理
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

postSingleEventForEventType中根据消息类型class从消息/订阅者列表map中找到订阅者列表,然后遍历订阅者列表,将消息传递给每个订阅者进行消息的处理。

我们继续往下看postToSubscription:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//如果订阅方法要求线程模式是POSTING,即在当前线程进行处理,则:
//直接调用invokeSubscriber方法回调订阅者的订阅方法处理消息
invokeSubscriber(subscription, event);
break;
case MAIN:
//如果订阅方法要求线程模式是MAIN,即要求在主线程进行处理,则:
if (isMainThread) {
//当前已是主线程,则直接调用invokeSubscriber方法回调订阅者的订阅方法处理消息
invokeSubscriber(subscription, event);
} else {
//当前不是主线程,则将使用handler转入主线程中处理
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
//如果订阅方法要求线程模式是MAIN_ORDERED,即要求在主线程进行串行处理,则:
if (mainThreadPoster != null) {
//如果设置了主线程handler,则使用handler转入主线程中处理
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
//如果没有设置主线程handler,则直接在当前线程处理
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
//如果订阅方法要求线程模式是BACKGROUND,即要求在子线程进行处理,则:
if (isMainThread) {
//当前是主线程,则将消息转入子线程进行处理
backgroundPoster.enqueue(subscription, event);
} else {
//当前已子线程,则直接在当前线程处理
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//如果订阅方法要求线程模式是ASYNC,即要求在异步子线程进行处理,则:
//将消息转入子线程进行处理
asyncPoster.enqueue(subscription, event);
break;
default:
//无效的线程模型,抛出异常
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

postToSubscription中主要是对线程模型进行处理:

  • POSTING:直接在当前线程处理。
  • MAIN:如果当前已是主线程,则直接处理消息;如果当前不是主线程,则通过handler转入主线程处理。
  • MAIN_ORDERED:主线程中串行处理,如果已经设置主线程Handler,则将消息加入消息队列等待loop进行调度;如果没有设置主线程Handler则直接在当前线程处理。
  • BACKGROUND:在子线程中处理,如果当前是主线程,则将消息转入子线程中再进行处理;如果当前已是子线程,则直接在当前线程处理。
  • *ASYNC:异步处理消息,不管当前是主线程还是子线程,都需要加入新的子线程中再进行处理。

直接处理消息的是通过invokeSubscriber方法,看名称就是表示反射调用订阅者的订阅方法处理消息,那是不是呢?我们看看其源码实现:

void invokeSubscriber(Subscription subscription, Object event) {
try {
//通过反射调用订阅者的订阅方法处理消息
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

就是通过反射调用订阅者的订阅方法,将消息传递给订阅方法处理的;

(1)切换到主线程(MAIN)/主线程串行(MAIN_ORDERED)处理消息:

不管MAIN还是MAIN_ORDERED都是通过mainThreadPoster,将消息放入mainThreadPoster的队列中,mainThreadPoster其实就是主线程中的Handler:

//使用mainThreadSupport创建主线程消息发布器
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//创建HandlerPoster作为mainThreadPoster
return new HandlerPoster(eventBus, looper, 10);
}
}
}

mainThreadPoster默认是通过MainThreadSupport.AndroidHandlerMainThreadSupport.createPoster创建的,也就是mainThreadPoster默认是HandlerPoster的实例对象,我们看看HandlerPoster:

public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//将消息进行包装
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//将消息加入等待队列中
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//发送一个空消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//死循环
//从等待队列中取出消息
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
//没有等待处理的消息,则退出循环
return;
}
}
}
//回调EventBus的invokeSubscriber处理消息
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
//没超过10毫秒重新发送空消息重新调度
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

  • 切换到主线程主要使用的是主线程的Handler实现。
  • 每次将消息加入等待队列,然后Handler发送一个空消息。
  • 在Handler的handleMessage中遍历等待队列消息出队并进行处理。

(2)切换到子线程(BACKGROUND)处理消息:

切换到子线程处理消息是通过backgroundPoster的enqueue方法,backgroundPoster是BackgroundPoster的实现类,我们看看其内部实现:

final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//将消息进行包装
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//将消息加入等待队列中
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//线程加入使用线程池中,等待回调run方法
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
//死循环
//从等待队列中获取等待处理的消息,1000表示如果队列为空等待1000毫秒
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
//如果消息为空,则退出循环
return;
}
}
}
//调用EventBus反射调用订阅者的订阅方法处理消息
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}

  • 切换到子线程处理消息主要是将消息加入等待队列,backgroundPoster本身是一个线程实现,将backgroundPoster本身加入线程池中运行。
  • backgroundPoster线程执行时遍历等待队列,从队列中取出等待处理的消息,通过eventBus.invokeSubscriber处理消息。

其中默认使用的线程池是缓存线程池:

executorService = builder.executorService;
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}

(3)异步消息(ASYNC):

异步消息是通过asyncPoster的enqueue方法实现,asyncPoster是AsyncPoster的实例对象,它本身就是一个线程,我们看看其实现:

class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
//将消息加入队列中
queue.enqueue(pendingPost);
//线程加入线程池中
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
//每次取出一个消息
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
//调用EventBus反射调用订阅者的订阅方法处理消息
eventBus.invokeSubscriber(pendingPost);
}
}

异步消息的实现主要是将消息加入等待队列,然后将线程加入队列,线程执行是每次去等待队列的一个消息(队尾,先进先出原则),最后调用EventBus反射调用订阅者的订阅方法处理消息。

异步消息使用的线程池跟BACKGROUND模式使用的线程池是同一个,都是缓存线程池,与BACKGROUND的区别是:

  • 异步消息每次都将线程加入线程池中。
  • 而BACKGROUND则不需要每次。
  • 异步消息线程执行每次只去队尾消息进行处理,而BACKGROUND线程执行时会处理队列中的所有消息。

总结:

  • EventBus基于观察者模式,其首先需要先将订阅者与消息类型进行注册到一个消息/订阅者列表map中,发布消息时从map找找到对应的订阅者并调用相应的订阅方法进行处理消息。
  • 注册时通过订阅者class通过反射收集subscribe注解修饰的方法列表及注解信息,最终注册到消息/订阅者列表map中,这个运行时注解是先对耗时的;所以EventBus提供了编译期注解处理器来做个收集的工作,降低性能损耗。
  • 发布消息时根据消息类型class从消息/订阅者列表map中找到对应的订阅者列表并进行一一处理,同时根据注解中的线程模式进行线程切换处理。


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
咦!没有更多了?去看看其它编程学习网 内容吧