Android 消息机制(三)Native层消息机制

之前的两篇文章讲解了java层消息机制的过程以及使用,中间省略了比较多的代码就是在native层实现的消息机制中使用的。这篇文章就对这一部分进行讲解。

起点

我们从第一篇文章知道,Looper.loop()方法被调用后,会启动一个无限循环,而在这个循环中,调用了MessageQueuenext()方法以获取下一条消息,而next()方法中会首先调用nativePollOnce()方法,这个方法的作用在之前说过是阻塞,达到超时时间或有新的消息到达时得到eventFd的通知再唤醒消息队列,其实这个方法也是native消息处理的开始。

进入Native层

android_os_MessageQueue_nativePollOnce()

1
2
3
4
5
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
1
2
3
4
5
6
7
8
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
...
}

调用了Native LooperpollOnce()方法:

pollOnce()

1
2
3
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;

if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}

if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis);
}
}

我们之前直接跳过了26行之前的内容,现在我们还是不能理解这一段的意义,从程序上看,我们从mResponse容器中取出了一个response并把他的内容放入了传入的地址参数中返回。首先,这个调用中没有传入地址参数,其次,这个mResponse数组是什么呢?

我们继续先往下看,下面的pollInner()方法比较长也是native消息机制的核心,我们拆成几个部分看。

pollInner()

Request 与 Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
mPolling = false;
mLock.lock();
...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}

当第7行系统调用epoll_wait()返回时,说明因注册的fd有消息或达到超时,在第11行就对收到的唤醒events进行遍历,首先判断有消息的fd是不是用于唤醒的mWakeEventFd,如果不是的话,说明是系统调用addFd()方法设置的自定义fd(后面会讲)。那么我们需要对这个事件作出响应。

第21到28行就对这个event做处理,首先,我们以这个fdkeymRequests中找到他的索引,这个mRequests是我们在addFd()方法一并注册的以fdkeyRequestvalue的映射表。找到request之后,28行调用pushResponse()方法去建立response

1
2
3
4
5
6
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

现在我们要处理的任务已经被封装成了一个Response对象,等待被处理,那么真正的处理在哪里呢?

在上面的代码与处理response的代码中间夹着的是处理MessageEnvelope的代码,我们后面再讲这段,现在到处理response的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}

遍历所有response对象,取出之前注册的request对象的信息,然后调用了request.callback->handleEvent()方法进行回调,如果该回调返回0,则调用removeFd()方法取消这个fd的注册。

再梳理一遍这个过程:注册的自定义fd被消息唤醒,从mRequests中以fdkey找到对应的注册好的request对象然后生成response对象,在MessageEnvelop处理完毕之后处理response,调用request中的callbackhandleEvent()方法。

那么addFd()注册自定义fdremoveFd()取消注册是如何实现的呢?

addFd()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
...
{ // acquire lock
AutoMutex _l(mLock);

Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

struct epoll_event eventItem;
request.initEventItem(&eventItem);

ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}

第6-13行使用传入的参数初始化了request对象,然后16行由request来初始化注册epoll使用的event。19行根据mRequests.indexOfKey()方法取出的值来判断fd是否已经注册,如果未注册,则在20行进行系统调用epoll_ctl()注册新监听并在25行将fdrequest存入mRequest,如果已注册,则在27行更新注册并在42行更新request

这就是自定义fd设置的过程:保存request并使用epoll_ctl系统调用注册fd的监听。

removeFd()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int Looper::removeFd(int fd, int seq) {
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
return 0;
}
mRequests.removeItemsAt(requestIndex);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
scheduleEpollRebuildLocked();
} else {
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
return 1;
}

解除的过程相反,在第11行删除mRequests中的键值对,然后在第13行系统调用epoll_ctl()解除fdepoll注册。

MessageEnvelop消息处理

之前说到,在request生成responseresponse的处理中间有一段代码执行了MessageEnvelop消息的处理,这个顺序保证了MessageEnvelop优先于fd引起的request的处理。

现在我们来看这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler

mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

可以看到mMessageEnvelopes容器中存储了所有的消息,第4行从首位置取出一条消息,随后进行时间判断,如果时间到达,先移出容器,与java层比较相似都是调用了handlerhandleMessage()来进行消息的处理。

那么MessageEnvelope是如何添加的呢?

Native Looper提供了一套与javaMessageQueue类似的方法,用于添加MessageEnvelope

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

if (mSendingMessage) {
return;
}
} // release lock
if (i == 0) {
wake();
}
}

这段添加MessageEnvelope的代码不需要太多的解释。

小结

现在我们看到,其实Native中的消息机制有两个方面,一方面是通过addFd()注册的自定义fd触发消息处理,通过mRequests保存的request对象中的callback进行消息处理。另一方面是通过与java层类似的MessageEnvelop消息对象进行处理,调用的是该对象handler域的handleMessage()方法,与java层非常类似。优先级是先处理MessageEnvelop再处理request

一些思考

现在消息机制全部内容分析下来,我们可以看到android的消息机制不算复杂,分为nativejava两个部分,这两个部分分别有自己的消息处理机制,其中关键的超时与唤醒部分是借助了linux系统epoll机制来实现的。

连接javanative层消息处理过程的是next()方法中的nativePollOnce()java层消息循环先调用它,自身阻塞,进入native的消息处理,在native消息处理完毕后返回,再进行java层的消息处理,正是因为如此,如果我们在处理java层消息的时候执行了耗时或阻塞的任务(甚至阻塞了整个主线程),整个java层的消息循环就会阻塞,也无法进入native层的消息处理,也就无法响应例如触摸事件这样的消息,导致ANR的发生。这也就是我们不应在主线程中执行这类任务的原因。