使用内存映射 I/O 实现 Posix 消息队列

《UNIX网络编程卷2:进程间通信》第五章Posix 消息队列的示例代码实现了一种基于文件内存映射创建共享内存实现的消息队列,为了避免不同线程的访问冲突而引入了Posix 互斥锁和Posix 条件变量。本文主要记录一下实现过程中的几个关键点,以备不时之需。

主要函数说明

  • mymq_open() 创建或打开一个消息队列
  • mymq_getattr() 获取消息队列属性,比如最大长度、每个消息的长度等
  • mymq_setattr() 设置消息队列属性
  • mymq_notify() 注册或注销消息队列的调用进程
  • mymq_send() 放置一个消息到消息队列中
  • mymq_receive() 从消息队列中取一个消息
  • mymq_close() 取消文件的内存映射,释放内存空间
  • mymq_unlink() 删除消息队列相关联的文件名

创建消息队列

消息队列的数据结构比较简单清晰,一个消息队列的头部结构,包含了消息总数、当前消息数、队列属性、互斥锁和条件变量等;一个消息队列内可以有多个具体的消息,因此需要具体消息的头部结构,包含该消息的类型、优先级、长度和具体内容;所有的消息在队列内以链表的形式存放。

不同的进程或线程并发访问该消息队列的话,就需要该消息队列是共享的,在此示例中使用了系统函数mmap() 来将一个特定的文件映射到内存中,以使得所有有权限的线程都可以访问。所以书中示例实质是通过了该共享内存实现了IPC 进程间通信,消息队列的数据结构存于共享内存中,并提供了上述接口,是为了更加方便的实现通信交互,并加入了互斥锁竞争机制,屏蔽了调用者对于细节的考虑。

互斥锁的初始化

调用 pthread_mutex_init() 函数就可以完成互斥锁的初始化,函数的第二个入参用来设置互斥锁的属性,多进程共享就需要设置属性为 PTHREAD_PROCESS_SHARED ,记得及时销毁该入参以防内存泄漏。

pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
pthread_mutexattr_destroy(&mattr);	/* be sure to destroy */

条件变量的初始化

调用 pthread_cond_init() 函数就可以完成条件变量的初始化,该函数也需要设置条件变量的属性,多进程共享需要设置为 PTHREAD_PROCESS_SHARED,同样也需要及时销毁防止内存泄漏。

if ( (i = pthread_condattr_init(&cattr)) != 0)
	goto pthreaderr;
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
pthread_condattr_destroy(&cattr);	/* be sure to destroy */

发送消息到消息队列 mymq_send()

往消息队列内加新消息的套路基本就是先加锁,将传入的消息内容拷贝到空闲的消息结构内,最后退出的时候解锁。但有几个需要留意的地方,比如消息队列满了如何处理,加入新消息如何通知接收进程。

消息队列满了

若是非阻塞模式,消息队列满了的话就立即返回,并设置 errno 为 EAGAIN 做提示;若是阻塞模式,则需要阻塞等待条件变量,pthread_cond_wait() 睡眠之前会释放互斥锁,然后睡眠,若等到了条件变量(即有了空位置可以放新消息)则再次尝试获取互斥锁以继续。

/* 4wait for room for one message on the queue */
while (attr->mq_curmsgs >= attr->mq_maxmsg)
    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

新消息通知到接收进程

某进程加入该新消息时,可能已经有另一进程在阻塞等待新消息,那么这个时候可以通过条件变量通知该阻塞的进程起来收消息了,该阻塞的进程醒了但又阻塞在消息队列的互斥锁(pthread_cond_signal() 不会释放锁)加锁上,等到mymq_send() 函数释放锁后它就会获取到锁得到新消息。当然也有可能没有进程阻塞等待新消息,做一下 pthread_cond_signal() 也无伤大雅。

/* 4wake up anyone blocked in mq_receive waiting for a message */
if (attr->mq_curmsgs == 0)
    pthread_cond_signal(&mqhdr->mqh_wait);
attr->mq_curmsgs++;

pthread_mutex_unlock(&mqhdr->mqh_lock);
return(0);

从消息队列接收消息 mymq_receive()

套路一致,基本思路就是加锁取消息最后解锁。当然,也有几点需要注意的地方。

消息队列为空时

若是非阻塞模式,那函数立即返回;若是阻塞模式,则会阻塞等待新消息,pthread_cond_wait() 睡前释放互斥锁等待条件变量唤醒,被唤醒了则重新加锁取消息。

/* 4wait for a message to be placed onto queue */
mqhdr->mqh_nwait++;
while (attr->mq_curmsgs == 0)
    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
mqhdr->mqh_nwait--;

空位置通知

若在取消息之前该消息队列是满的,即没有空位置放置新消息,那么很有可能有进城阻塞等待空位置放新消息,这时我们取走了一个消息就有了空位置,就可以通过条件变量通知一下,那么阻塞的进城就可以放置新消息了。即使没有进程阻塞等待空位置,也无所谓。

通知使用 pthread_cond_signal() 函数即可。

/* 4wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
    pthread_cond_signal(&mqhdr->mqh_wait);
attr->mq_curmsgs--;

示例代码

本书提供了示例代码的下载,可在以下任意一个位置下载该书的代码。

Advertisements

发表评论

Fill in your details below or click an icon to log in:

WordPress.com 徽标

You are commenting using your WordPress.com account. Log Out /  更改 )

Google photo

You are commenting using your Google account. Log Out /  更改 )

Twitter picture

You are commenting using your Twitter account. Log Out /  更改 )

Facebook photo

You are commenting using your Facebook account. Log Out /  更改 )

Connecting to %s

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理