Handler消息机制

一、基本介绍

所有线程的消息队列管理
github

二、几大主要类

1.Handler

我们在使用的时候基本都是和Handler做接触,消息发送、消息接收处理等。都是从Handler开始的。

1.发送消息 SendMessage(Message msg)

可以看到所有的消息发送方法都会汇聚到一点

  ...
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}

/**
* Enqueue a message at the front of the message queue, to be processed on
* the next iteration of the message loop. You will receive it in
* {@link #handleMessage}, in the thread attached to this handler.
* <b>This method is only for use in very special circumstances -- it
* can easily starve the message queue, cause ordering problems, or have
* other unexpected side-effects.</b>
*
* @return Returns true if the message was successfully placed in to the
* message queue. Returns false on failure, usually because the
* looper processing the message queue is exiting.
*/
public final boolean sendMessageAtFrontOfQueue(Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, 0);
}
...

也就是这个enqueueMessage的方法,进入这个方法看一下:

...
// 从Handler中将Message推送给MessageQueue
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// 设置这个 message 的target = Handler.this,以后在looper读取分发后需要确定哪个handler分发
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
...

可以看到,在得到消息后首先将此时的Handler对象保存在msg的target参数中,也是为了后续在分发Message消息的时候找到对应的Handler。
第二个是这里的mAsynchronous参数,会根据这个参数来设定该消息的类型:true为异步,false为同步。

这里的queue则是MessageQueue的对象,在一般使用的情况下,都是从Looper类中创建并获取的:

...
public Handler(Callback callback, boolean async) {
...
// 获取 Looper
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
...

可以看到Looper.myLooper()是获取Looper对象的一个方法,Looper是一个工厂模式。获取后就可以得到内部的MessageQueue了。
那么Looper是在哪里被创建呢?ActivityThread:
...
// 主要的启动点,Activity的启动方法
public static void main(String[] args) {
...
// 创建 MainLooper 就是我们需要的主进程的Handler所对应的Looper
Looper.prepareMainLooper();

ActivityThread thread = new ActivityThread();
thread.attach(false);

if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}

if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}

// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);

// 打开这个Looper功能,也就是开启for循环
Looper.loop();

throw new RuntimeException("Main thread loop unexpectedly exited");
}

可以看到这里使用了Looper的两个方法:prepareMainLooper()和loop()。前者为Looper对象的创建,后者为开启Looper分发的功能开关。

那么整个过程其实是先通过prepareXXXX()方法创建Looper,然后使用loop()方法进行分发。

从刚刚我们看到了Handler类中的enqueueMessage()方法中调用了MessageQueue中的enqueueMessage()方法发送Message,看一下MessageQueue中:

	...
/**
* 入队列方法,Message从Handler进入MessageQueue的入口点
*/
boolean enqueueMessage(Message msg, long when) {
// 加入链表的时候按时间顺序从小到大排序,然后判读是否需要唤醒,nativeWake() 用来唤醒之前等待的线程
synchronized (this) {
Message p = mMessages;//头部消息
boolean needWake;
//如果队列中没有消息,或者当前进入的消息比消息队列中的消息等待时间短,那么就放在消息队列的头部
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 判断唤醒条件
needWake = mBlocked && p.target == null && msg.isAsynchronous();
// 提交至native层,需要 nextPollTimeoutMillis 时间阻塞
Message prev;
// 根据时间顺序插入该条消息Message
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//将消息插入合适的位置
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
// 唤醒代码
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
...

这里是将Message数据添入队列中的重要位置,从MessageQueue来看,Message从这里进入队列中。队列使用链表的形式。
when为Message执行时间,这个队列的排列顺序就是根据执行顺序排列的。
当我们拿到待处理消息Message和队列头Message的时候,首先需要判断,待处理消息是否需要即可处理,即when为0或者when值小于队列头Message的时间。是,则插入到队列头部;否,则使用for循环去查找到响应位置,后插入队列中。

需要注意:nativeWake()函数,牵扯至Native层的阻塞和唤醒,后续描述。

2.接收消息loop()

正如上述所讲,Looper.loop()就是打开这个消息分发的开关。

..
/**
* 轮询这个线程中的MessageQueue
*/
public static void loop() {
final Looper me = myLooper();
// 获取 Queue 队列
final MessageQueue queue = me.mQueue;

// 确保这个线程的标识是本地进程的标识,并跟踪这个标识的符号
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();

for (;;) {
// 获取下一个消息 message
Message msg = queue.next(); // might block

// 调用 message 中存储的 handler 对象来调用其 dispatchMessage 方法
msg.target.dispatchMessage(msg);

// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
// 确保在调度过程中,线程的标识没有被破坏
final long newIdent = Binder.clearCallingIdentity();

msg.recycleUnchecked();
}
}
...

开启loop()后就会打开一个for的死循环,该循环通过MessageQueue的next()方法获取下一个Message消息。
首先看一下MessageQueue的next()方法:
	...
/**
* 获取下一个消息,Message从MessageQueue推出的出口点
*/
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {

// 操作 native 层的 MessageQueue 阻塞 nextPollTimeoutMills 毫秒时间。
// nextPollTimeoutMills = -1,一直阻塞不会超时
// nextPollTimeoutMills = 0,不会阻塞,立即返回
// nextPollTimeoutMills > 0,最长阻塞 nextPollTimeoutMillis 毫秒,如果期间有程序唤醒会立即返回。
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

// 提交至native层,需要 nextPollTimeoutMillis 时间阻塞
nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// 获取系统开机到现在的时间,如果使用System.currentMillis()会有误差
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 当 msg.target == null 时是在调用 postSyncBarrier 函数时阻塞,具体场景有 ViewRootImpl 中的 scheduleTraversals() 函数和 unscheduleTraversals() 函数
// 他会使所有的同步消息忽略,循环找出第一个异步消息
// 可以通过 setAsynchronous 设置为异步消息
if (msg != null && msg.target == null) {
// do while 循环,得到下一个message
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// 得到消息后比较时间
if (msg != null) {
// 当前消息比某个时间要小
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
// 下一条消息的等待时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 不需要等待,直接返回消息
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// 没有消息
nextPollTimeoutMillis = -1;
}

// 获取空闲时处理任务的handler 用于发现线程何时阻塞等待更多消息的回调接口。
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
// 如果空闲时处理任务的handler个数为0,继续让线程阻塞
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}

// 判断当前空闲时处理任务的handler是否是为空
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

//
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

pendingIdleHandlerCount = 0;

nextPollTimeoutMillis = 0;
}
}
...

此时MessageQueue会去寻找下一个Message给Looper,当队列中没有下一条消息时,它会是线程进入等待状态;在有消息的时候,如果消息的执行时间没有到(未到执行时间),线程也会进入等待状态,否则就会返回消息Message给Looper。

需要注意:nativePollOnce()函数牵扯到Native层的阻塞和唤醒,后续描述。
注释中可以看到nextPollTimeoutMillis

-1:表示一直阻塞不会超时
n(n>0):最长阻塞 nextPollTimeoutMillis 毫秒,如果期间有程序唤醒会立即返回。
0:不会阻塞,立即返回

返回消息后Looper.loop()就会处理进来的消息,msg.target.dispatchMessage(msg);这里就会通过消息中的target(handler)发送给dispatchMessage(msg)


/**
* Handle system messages here.
*
* handler消息分发
*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
// 如果有 Runable,则直接执行他的 run 方法
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
// 如果有实现 Callback 接口,则直接使用
return;
}
}
// 如果没有实现则使用自己的 handleMessage
handleMessage(msg);
}
}

public void handleMessage(Message msg) {
}


可以看到,在Message没有定义callback的时候就会调用handlerMessage(msg),而这个方法我们很常见,也即在使用handler的时候重写的方法。

Handler总结

这样整个就完成了一个从消息发送,到消息接收的一个主路线。但是这样仅仅是主路线,支撑主路线的也很重要。
在这里Handler主要完成数据的接收,数据的发送的对外总节点。隐藏其他类的细节。也大致知道了Looper和MessageQueue的主要部分代码。

Looper

Looper主要完成Handler在多线程中的线程隔离和消息分发的关键作用,他会创建MessageQueue,用来管理消息。

1.创建


/**
* 创建一个可以退出的 Looper
*/
public static void prepare() {
prepare(true);
}

/**
* quitAllowed 为该 MessageQueue 为可退出
* 创建一个 Looper
*/
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

/**
* 创建 MainLooper, 在 ActivityThread 中调用
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

/**
* 返回 MainLooper
*/
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}


prepareMainLooper(),prepare(boolean quitAllowed),prepare()

可以看到这里有一个Main单词,就知道他是和主启动有关。在上述也看到了这个方法,即开启App的时候会调用,来初始化这个Looper对象。
代码中可以看到他调用了prepare(boolean quit)方法,然后将这个Looper对象赋值到Looper类中,也就是sMainLooper

这里需要注意的是,sThreadLocal属性:

// 管理 Looper 中的所有 Looper 对象,保存在一个 ThreadLocal 中,使其具有线程域
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

使用ThreadLocal来保存我们所有的Looper对象。因为在创建好Looper之后,他们都会存放在sThreadLocal的一个Key-Value结构中:

sThreadLocal.set(new Looper(quitAllowed));

ThreadLocal 本地线程缓存

位置:…/libcore/luni/src/main/java/java/lang/ThreadLocal.javal
功能:解决线程上下文中变量的传递
在源码上,不知道是因为下载版本的问题,有点不同:

1.ThreadLocal -> ThreadLocalMap -> Entry -> Object
2.ThreadLocal -> Values -> Object

很奇怪我竟然会是第二种情况的源码… 不过从原理上都差不多。
他们都会很像一个Map,存储一个线程对象,和对应与该对象的泛型结构T,也即上述的Object类型对象。这个泛型结构T会被层层封装成一个统一结构ThreadLocalMap(Values)。继而有了该数据对象,有了存储位置Thread对象,那么可以发现会将该数据传入thread对象中的threadLocals(localValues),进入Thread源码中发现,原来Thread类中已经准备好了该属性threadLocals(localValues)

ThreadLocal.ThreadLocalMap threadLocals = null;

此时就知道了ThreadLocal的大致走向:

1.Thread存在ThreadLocalMap的属性,用于存储我们需要保存的该类型对象。
2.Looper在调用ThreadLocal的时候会通过Thread中的threadLocals属性去寻找Entry数据
3.ThreadLocalMap内部会有一个Entry数组,用于存储我们真正保存的数据
4.Entry继承WeakReference,因此具有Key-Value的结构,这里也即:ThreadLocal-Object

github

当然这只是整体的大致流程,存储部分细节牵扯值Hash算法。
每个ThreadLocal对象都有一个threadLocalHashCode,用于生成指定的数组下标。

class ThreadLocal {
// 当前的HashCode
private final int threadLocalHashCode = nextHashCode();
// Integer的原子性管理类
private static AtomicInteger nextHashCode = new AtomicInteger();
// 让Hash码能均匀分布在2的n次方的数组里
private static final int HASH_INCREMENT = 0x61c88647;
// 获取下一个HashCode
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

class ThreadLocalMap {
// 初识容量,必须是2的幂
private static final int INITIAL_CAPACITY = 16;
// 存放table,长度必须是2的幂
private Entry[] table;
// entry个数
private int size = 0;
// 容量阀值
private int threshold;

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 新建tables
table = new Entry[INITIAL_CAPACITY];
// 新建下标索引
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 添加值table至数组中
table[i] = new Entry(firstKey, firstValue);
// 初始化个数为1
size = 1;
// 获得阀值
setThreshold(INITIAL_CAPACITY);
}

// 容量阀值限定在 长度的2/3
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
}
}

即代码中int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);来获取下标,需要注意这里的INITIAL_CAPACITY必须是2的幂,进而table的长度也必须是2的幂,才可以使用这个方法来获取下标索引。

2.构造函数

在构造函数中我们会看到,此时将创建消息队列MessageQueue和保存当前所在的线程环境mThread。

/**
* Looper 的构造函数,这里创建了此 Looper 下的 MessageQueue 和
*/
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

而在我们需要得到这个Looper对象时,就可以通过myLooper获取当前线程的当去Looper对象,也即Looper对象中的MessageQueue:

  /**
* 返回与当前线程有关联的 Looper 对象,没有则为 null。
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}

3.开启循环loop()

在Looper中最重要的就是loop()方法了,上面也说过,消息机制会通过这个方法开启一个读取MessageQueue队列的循环,取出Message,分发Message的功能。

MessageQueue

MessageQueue主要完成消息队列的职责,做好入队列和出队列,队列管理的作用。

入队列 enqueueMessge

上面讲解Handler中也大致提了入队列时会调用MessageQueue的enqueueMessage方法,使Message根据策略添加至MessageQueue中。

出队列 next

在获取下一个Message时,loop()方法会调用MessageQueue的next方法,获取下一条Message消息

Message

消息对象,由于消息队列是以链表的形式,所以内部会有一个next参数来指向下一条Message。

存储

一般我们会使用arg1、arg2来存储int数据
而一些对象类型需要使用obj来存储
what可以对消息进行分类等
when则是上述所说的当前消息应该何时处理的时间值
target则会在Handler对象接收到Message时将自身赋值给他
而Message的队列,就是一个链表结构 github

总结

大致讲解重要的几个大类:Handler、Looper、ThreadLocal、MessageQueue、Message
github

三、native层

可以从源码中看出在Java层的下面还隐含这C++的jni层调用。
github

MessageQueue:Message的入口

MessageQueue.java文件中的jni函数列表:

public final class MessageQueue {
// 在android_os_MessageQueue.cpp文件下
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
}

文件:android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 创建NativeMessageQueue
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

nativeMessageQueue->incStrong(env);
// 将指针返回给Java层的mPtr
return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue

在创建NativeMessageQueue后,将指针返回给Java层的mPtr,可以看到,这里已经将引用传给了Java层的mPtr:

/**
* MessageQueue 的构造函数
*/
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;

// 使用 native 层创建 MessageQueue,并返回其引用地址
mPtr = nativeInit();
}

那么接着看到Native层MessageQueue中创建了NativeMessageQueue:

NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 通过tls,类似java的ThreadLocal获取looper
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
// 把looper放入线程中
Looper::setForThread(mLooper);
}
}

可以看到熟悉的Looper类,这里调用getForThread()方法,来获取当前线程下的Looper对象。如果不存在,则创建新的Looper并使用setForThread()传递给Looper保存。

Native层的Looper

那么我们看一下在Looper中如何处理这些对象:

void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS

if (looper != NULL) {
looper->incStrong((void*)threadDestructor);
}

pthread_setspecific(gTLSKey, looper.get());

if (old != NULL) {
old->decStrong((void*)threadDestructor);
}
}

sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);

return (Looper*)pthread_getspecific(gTLSKey);
}

可以看到,在get和set中存在两个函数:pthread_getspecificpthread_setspecific。两个函数和ThreadLocal类的set、get很像。都是使Looper和Thread进行绑定,达到类似key-value的存储结构。

阻塞与唤醒:nativePollOnce与nativeWake

在之前的MessageQueue的next()函数中会通过nativePollOnce(ptr, nextPollTimeoutMillis);函数通过native层调用进行阻塞。

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

可以看到调用的是NativeMessageQueue的pollOnce函数:
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

发现调用的是Looper的pollOnce函数:
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}

if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis);
}
}

最后我们看到timeoutMillis参数被pollInner函数调用了:

int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}

// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

// We are about to idle.
mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

// No longer idling.
mPolling = false;

// Acquire lock.
mLock.lock();

// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}

// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}

// Check for poll timeout.
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}

for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;

// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();

handler->handleMessage(message);
}

mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

mLock.unlock();

for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}

response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}

在java层的MessageQueue中mMessage保存这Message链表的第一个引用,而Native层保存在Looper中mMessageEnvelopes。可以说这里有两个Message队列,并且他们都是通过时间顺序排列。timeOutMillis表示Java层下个 要执行的消息还要多久执行,mNextMessageUpdate表示Native层下个要执行的消息还要多久执行。
其次就是关于epoll_wait()函数的细节,关于epoll的运作。

epoll

在Unix进程间通信中,管道就是一个文件,在管道两端,分别是两个打开文件文件描述符,这两个打开文件描述符都是对应同一个文件,其中一个是用来读的,别一个是用来写的,一般的使用方式就是,一个线程通过读文件描述符中来读管道的内容,当管道没有内容时,这个线程就会进入等待状态,而另外一个线程通过写文件描述符来向管道中写入内容,写入内容的时候,如果另一端正有线程正在等待管道中的内容,那么这个线程就会被唤醒。等待和唤醒的操作就是借助epoll机制。

原理

epoll是Linux下多路复用IO接口select/poll的增强版本,能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它不会复用文件描述符集合来传递结果而迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll 那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

优点

支持一个进程打开大数目的Socket
效率高

工作模式

模式分为LT和ET

LT(level-triggered):是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET(edge-triggered):在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

接口

1.int epoll_create(int size)
创建一个epoll的句柄,size为监听的数目一共有多大。他会占用一个fd值,在使用完epoll后,必须调用close()关闭,否则可能会导致fd被耗尽。

2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
epoll的事件注册函数,他会注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作类型:

EPOLL_CTL_ADD:注册新的fd到epfd中
EPOLL_CTL_MOD:修改已经注册的fd的监听事件
EPOLL_CTL_DEL:从epfd中删除一个fd

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事件:

typedef union epoll_data {
void* ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events;
epoll_data_t data
};

events可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3.int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout)
等待事件的产生。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

回到问题:Looper与epoll

上面说了这么多,就是知道了几个函数和epoll的原理。
那么这些在Looper这里是如何使用的呢?
首先需要看一下Native的Looper的构造函数:

Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {

int wakeFds[2];
int result = pipe(wakeFds);

mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);

result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);

// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);

struct epoll_event eventItem;
// zero out unused members of data field union
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}

首先通过管道pipe创建了读端与写端两个文件描述符,最后通过epoll_create创建epoll专用文件描述符,最后通过epoll_ctl告诉mEpollFd需要监控mWakeReadPipeFd描述符的EPOLLIN事件。当管道中有内容可读时,就唤醒当前正在管道中的内容线程。
然后,在pollInner通过epoll_wait函数来看epoll专用文件描述符mEpollFd所监控的文件描述符是否有IO事件发生,超时时间为timeOutMillis(-1则永久阻塞)。

最后,当我们需要唤醒epoll阻塞时,调用nativeWake函数来唤醒线程,看到最后还是通过NativeMessageQueue来调用Looper下的wake函数。

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {  
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();
}

class NativeMessageQueue : public MessageQueue, public LooperCallback {
void NativeMessageQueue::wake() {
mLooper->wake();
}
}

wake函数下可以看到,通过打开文件描述符mWakeWritePipeFd向管道中写入一个W字符串,这里向管道里写入内容的目的就是为了唤醒应用的主程序:

void Looper::wake() {  

ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);

}

这里还有一个awoken方法,该方法将管道中的内容清空:

void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

这里为什么要引入epoll的机制来完成一个同步阻塞的功能呢?实际上Looper的功能是强大的,它提供了addfd的方法,外部可调用该方法动态添加需要监控的描述符与回调。