消息隊列?是消息的鏈接表,存儲內(nèi)核中,由消息標(biāo)識符標(biāo)識。?--《UNIX環(huán)境高級編程》
簡單理解,消息隊列就是一堆消息的有序集合,并緩存于內(nèi)核中。如此一來,多個進(jìn)程就可通過訪問內(nèi)核來實現(xiàn)多個進(jìn)程之間的通信。目前存在的消息隊列有POSIX與System V標(biāo)準(zhǔn)的接口,本篇主要介紹System V接口的使用。
簡介消息隊列的本質(zhì)是位于內(nèi)核空間的鏈表,其中每個節(jié)點都是一個獨立的消息,每個消息都有類型,相同類型的消息組成一個鏈表。
當(dāng)各種各樣的消息發(fā)出時,就如同下圖所示排列在內(nèi)核空間中。形狀看成消息的類型,相同的形狀則表示相同的消息類型。
這些看似雜亂無章的消息,通過消息隊列發(fā)出來后,根據(jù)其發(fā)送的類型與發(fā)送的時間,在接收端中則是有規(guī)律的排序。
如上圖,內(nèi)核中雜亂無章的消息,接收端可通過消息類型與發(fā)送的順序來逐一接收處理??赏ㄟ^消息類型查看指定類型的消息,若指定類型為0,則按時間順序輸出所有接收到的消息。
接口主要用到msgget、msgsnd、msgrcv和msgctl四個接口。其使用方式man手冊說明的比較清晰了,這里簡單描述一下函數(shù)形式及功能。
msgget
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>
int msgget(key_t key, int msgflg);
主要功能是根據(jù)key值獲取一個消息隊列的ID。msgflag主要有兩個值IPC_CREAT 和IPC_EXC,指的是需要新創(chuàng)建消息隊列ID。
msgsnd、msgrcv
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msgsnd與msgrcv主要用于消息隊列的發(fā)送與接收。這里需要注意的是發(fā)送的msgp一般定義為結(jié)構(gòu)體,首個成員為long型,表示消息的類型。如此msgrcv通過指定msgtype來篩選出需要的消息。
msgctl
#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msgctl是用來控制消息隊列的,其中cmd指進(jìn)行的操作,buf記錄了消息隊列的信息。cmd:
- IPC_STAT: 將msg相關(guān)的內(nèi)核信息存儲到buf指向的msqid_ds 結(jié)構(gòu)體中。調(diào)用者需擁有閱讀權(quán)限才可讀取。
- IPC_SET:該命令用來設(shè)置消息隊列的屬性,要設(shè)置的屬性存儲在buf指向的msqid結(jié)構(gòu)中;可設(shè)置屬性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同時,也影響msg_ctime成員。
- IPC_RMID:刪除msqid標(biāo)識的消息隊列
buf:
struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */ msgqnum_t msg_qnum; /* Current number of messages
in queue */ msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */};
如上信息可看到buf中存儲了與消息隊列相關(guān)的屬性,設(shè)置cmd后,可通過buf拿到這些信息。
實例演示功能:?用消息隊列實現(xiàn)server接client的數(shù)據(jù),server可篩選顯示指定消息類型的數(shù)據(jù)。
效果:server接收所有消息:
server 篩選消息類型為2的數(shù)據(jù):
注:代碼里可將消息類型封裝成枚舉,此demo作為演示不做過多封裝。
總結(jié)消息隊列在進(jìn)程間通信的優(yōu)勢總結(jié)起來有以下幾點:
- 緩存:數(shù)據(jù)較大的消息處理起來時間較長,此時將其寫入消息隊列更快,待系統(tǒng)空閑時再處理。提高系統(tǒng)任務(wù)執(zhí)行效率。
- 送達(dá):消息隊列存儲的消息,會一直保留在隊列中直到消息被處理,且被取走后就會被隊列釋放。因此無論多少個進(jìn)程在獲取,每個消息僅會被處理一次。
- 排序:消息在隊列中一直按照“先入先出”的順序來執(zhí)行。因此任務(wù)被處理的時序不會錯亂。
- 異步:消息隊列因為會緩存消息,且順序處理不會丟失。因此多個進(jìn)程可通過消息隊列實現(xiàn)異步通信,互不阻塞。
client.cpp
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name : client.cpp
* Author :
* Version : V1.0
* Description :
* Journal : 2021-03-19 init v1.0
* Brief : Blog address:
* Others :
********************************************************************************
*/
#include <stdio.h>#include <unistd.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <iostream>#include <errno.h>#include <string.h>#include "common.h"
const char TEXT[2][50] = {"this is client1!", "this is client2!"};
int main(int argc, char *argv[]){ int msg_id, key, ret = 0; struct MsgFrame msg_buf = {0, {0}};
if (argc < 2) { PRINT_ERR("usage: %s [msgid] ", argv[0]); goto exit; }
if (strspn(argv[1], "0123456789") != strlen(argv[1])) { PRINT_ERR("Params invalid! "); goto exit; }
/* Obtain the standard key according to the file path */ key = ftok(MSGQ_FILE_PATH, MSGQ_ID); if (key < 0) { PRINT_ERR("ftok failed! errno = %d(%s) ", errno, strerror(errno)); goto exit; }
msg_id = msgget(key, IPC_EXCL); if (msg_id < 0) { PRINT_ERR("msgget failed! errno = %d(%s) ", errno, strerror(errno)); goto exit; }
do { memset(&msg_buf, 0x00, sizeof(msg_buf));#if 0 if (fgets(msg_buf.buffer, sizeof(msg_buf.buffer), stdin) == NULL) { PRINT_ERR("scanf failed! errno = %d(%s) ", errno, strerror(errno)); goto exit_msgid; }#else msg_buf.type = atoi(argv[1]); strncpy(msg_buf.buffer, TEXT[msg_buf.type - 1], strlen(TEXT[msg_buf.type-1]));#endif ret = msgsnd(msg_id, &msg_buf, sizeof(msg_buf.buffer), IPC_NOWAIT); if (ret < 0) { PRINT_ERR("msgnd failed! errno = %d(%s) ", errno, strerror(errno)); goto exit_msgid; } else { PRINT_INFO("[Send %ld %ld] %s ", msg_buf.type, strlen(msg_buf.buffer), msg_buf.buffer); } sleep(1); } while (strncmp(msg_buf.buffer, "end", msg_buf.type) != 0);
exit_msgid: ret = msgctl(msg_id, IPC_RMID, 0); if (ret < 0) { PRINT_ERR("msgctl failed! errno = %d(%s) ", errno, strerror(errno)); }exit: return 0;}
server.cpp
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name : server.cpp
* Author :
* Version : V1.0
* Description :
* Journal : 2021-03-21 init v1.0
* Brief : Blog address:
* Others :
********************************************************************************
*/
#include <stdio.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <iostream>#include <signal.h>#include <errno.h>#include <string.h>#include "common.h"
static int msg_id;
static void SignalHandler(int sig){ switch (sig) { case SIGSTOP: if (msg_id != 0) { msgctl(msg_id, IPC_RMID, 0); } break; default: break; }}
int main(int argc, char *argv[]){ int key, ret = 0; long msg_type = 0; struct MsgFrame msg_recv = {0, {0}};
signal(SIGSTOP, SignalHandler);
if (argc == 1) { msg_type = 0; } else if (argc == 2) { if (strspn(argv[1], "0123456789") != strlen(argv[1])) { PRINT_ERR("Params invalid! "); goto exit; } else { msg_type = atoi(argv[1]); //PRINT_INFO("Receive msg type is %ld ", msg_type); } } else { PRINT_ERR("Params invalid "); goto exit; }
key = ftok(MSGQ_FILE_PATH, MSGQ_ID); if (key < 0) { PRINT_ERR("ftok failed! errno = %d(%s) ", errno, strerror(errno)); goto exit; }
msg_id = msgget(key, IPC_CREAT|0666); if (msg_id < 0) { PRINT_ERR("msgget failed! errno = %d(%s) ", errno, strerror(errno)); goto exit; }
do { memset(msg_recv.buffer, 0x00, MAX_SIZE); ret = msgrcv(msg_id,(void*)&msg_recv, MAX_SIZE, msg_type, 0); if (ret != -1) { PRINT_INFO("[Receive %ld %ld] %s ", msg_recv.type, strlen(msg_recv.buffer), msg_recv.buffer); } else { PRINT_ERR("msgrcv failed! "); } } while (1);
exit: PRINT_INFO("Exit from %s ", __func__); return 0;}
最后
用心感悟,認(rèn)真記錄,寫好每一篇文章,分享每一框干貨。愿每一篇文章不負(fù)自己,不負(fù)看客!
?猜你喜歡
? ????C++編程風(fēng)格指南??
? ? ??Linux進(jìn)程間通信?管道??
??????Linux進(jìn)程間通信?信號??
更多文章內(nèi)容包括但不限于C/C++、Linux、開發(fā)常用神器等,可進(jìn)入開源519公眾號聊天界面回復(fù)“文章目錄” 或者 菜單欄選擇“文章目錄”查看。
本文摘自 :https://blog.51cto.com/u