Linux 下的進程間通信:共享存儲

Linux C語言 程序員 Flock 通信 Linux技術 2019-06-02
Linux 下的進程間通信:共享存儲

學習在 Linux 中進程是如何與其他進程進行同步的。

-- Marty Kalin

本篇是 Linux 下 進程間通信 (IPC)系列的第一篇文章。這個系列將使用 C 語言代碼示例來闡明以下 IPC 機制:

  • 共享文件
  • 共享內存(使用信號量)
  • 管道(命名的或非命名的管道)
  • 消息隊列
  • 套接字
  • 信號

在聚焦上面提到的共享文件和共享內存這兩個機制之前,這篇文章將帶你回顧一些核心的概念。

核心概念

進程是運行著的程序,每個進程都有著它自己的地址空間,這些空間由進程被允許訪問的內存地址組成。進程有一個或多個執行線程,而線程是一系列執行指令的集合:單線程進程就只有一個線程,而多線程的進程則有多個線程。一個進程中的線程共享各種資源,特別是地址空間。另外,一個進程中的線程可以直接通過共享內存來進行通信,儘管某些現代語言(例如 Go)鼓勵一種更有序的方式,例如使用線程安全的通道。當然對於不同的進程,默認情況下,它們不能共享內存。

有多種方法啟動之後要進行通信的進程,下面所舉的例子中主要使用了下面的兩種方法:

  • 一個終端被用來啟動一個進程,另外一個不同的終端被用來啟動另一個。
  • 在一個進程(父進程)中調用系統函數 fork,以此生髮另一個進程(子進程)。

第一個例子採用了上面使用終端的方法。這些 代碼示例 的 ZIP 壓縮包可以從我的網站下載到。

共享文件

程序員對文件訪問應該都已經很熟識了,包括許多坑(不存在的文件、文件權限損壞等等),這些問題困擾著程序對文件的使用。儘管如此,共享文件可能是最為基礎的 IPC 機制了。考慮一下下面這樣一個相對簡單的例子,其中一個進程(生產者 producer)創建和寫入一個文件,然後另一個進程(消費者 consumer)從這個相同的文件中進行讀取:

writes +-----------+ reads

producer-------->| disk file |<-------consumer

+-----------+

在使用這個 IPC 機制時最明顯的挑戰是競爭條件可能會發生:生產者和消費者可能恰好在同一時間訪問該文件,從而使得輸出結果不確定。為了避免競爭條件的發生,該文件在處於讀或寫狀態時必須以某種方式處於被鎖狀態,從而阻止在寫操作執行時和其他操作的衝突。在標準系統庫中與鎖相關的 API 可以被總結如下:

  • 生產者應該在寫入文件時獲得一個文件的排斥鎖。一個排斥鎖最多被一個進程所擁有。這樣就可以排除掉競爭條件的發生,因為在鎖被釋放之前沒有其他的進程可以訪問這個文件。
  • 消費者應該在從文件中讀取內容時得到至少一個共享鎖。多個讀取者可以同時保有一個共享鎖,但是沒有寫入者可以獲取到文件內容,甚至在當只有一個讀取者保有一個共享鎖時。

共享鎖可以提升效率。假如一個進程只是讀入一個文件的內容,而不去改變它的內容,就沒有什麼原因阻止其他進程來做同樣的事。但如果需要寫入內容,則很顯然需要文件有排斥鎖。

標準的 I/O 庫中包含一個名為 fcntl 的實用函數,它可以被用來檢查或者操作一個文件上的排斥鎖和共享鎖。該函數通過一個文件描述符(一個在進程中的非負整數值)來標記一個文件(在不同的進程中不同的文件描述符可能標記同一個物理文件)。對於文件的鎖定, Linux 提供了名為 flock 的庫函數,它是 fcntl 的一個精簡包裝。第一個例子中使用 fcntl 函數來暴露這些 API 細節。

示例 1. 生產者程序

#include <stdio.h>

#include <stdlib.h>

#include <fcntl.h>

#include <unistd.h>

#define FileName "data.dat"

void report_and_exit(const char* msg) {

[perror][4](msg);

[exit][5](-1); /* EXIT_FAILURE */

}

int main() {

struct flock lock;

lock.l_type = F_WRLCK; /* read/write (exclusive) lock */

lock.l_whence = SEEK_SET; /* base for seek offsets */

lock.l_start = 0; /* 1st byte in file */

lock.l_len = 0; /* 0 here means 'until EOF' */

lock.l_pid = getpid(); /* process id */

int fd; /* file descriptor to identify a file within a process */

if ((fd = open(FileName, O_RDONLY)) < 0) /* -1 signals an error */

report_and_exit("open to read failed...");

/* If the file is write-locked, we can't continue. */

fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */

if (lock.l_type != F_UNLCK)

report_and_exit("file is still write locked...");

lock.l_type = F_RDLCK; /* prevents any writing during the reading */

if (fcntl(fd, F_SETLK, &lock) < 0)

report_and_exit("can't get a read-only lock...");

/* Read the bytes (they happen to be ASCII codes) one at a time. */

int c; /* buffer for read bytes */

while (read(fd, &c, 1) > 0) /* 0 signals EOF */

write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */

/* Release the lock explicitly. */

lock.l_type = F_UNLCK;

if (fcntl(fd, F_SETLK, &lock) < 0)

report_and_exit("explicit unlocking failed...");

close(fd);

return 0;

}

上面生產者程序的主要步驟可以總結如下:

  • 這個程序首先聲明瞭一個類型為 struct flock 的變量,它代表一個鎖,並對它的 5 個域做了初始化。第一個初始化

lock.l_type = F_WRLCK; /* exclusive lock */

  • 使得這個鎖為排斥鎖(read-write)而不是一個共享鎖(read-only)。假如生產者獲得了這個鎖,則其他的進程將不能夠對文件做讀或者寫操作,直到生產者釋放了這個鎖,或者顯式地調用 fcntl,又或者隱式地關閉這個文件。(當進程終止時,所有被它打開的文件都會被自動關閉,從而釋放了鎖)
  • 上面的程序接著初始化其他的域。主要的效果是整個文件都將被鎖上。但是,有關鎖的 API 允許特別指定的字節被上鎖。例如,假如文件包含多個文本記錄,則單個記錄(或者甚至一個記錄的一部分)可以被鎖,而其餘部分不被鎖。
  • 第一次調用 fcntl

if (fcntl(fd, F_SETLK, &lock) < 0)

  • 嘗試排斥性地將文件鎖住,並檢查調用是否成功。一般來說, fcntl 函數返回 -1 (因此小於 0)意味著失敗。第二個參數 F_SETLK 意味著 fcntl 的調用不是堵塞的;函數立即做返回,要麼獲得鎖,要麼顯示失敗了。假如替換地使用 F_SETLKW(末尾的 W 代指等待),那麼對 fcntl 的調用將是阻塞的,直到有可能獲得鎖的時候。在調用 fcntl 函數時,它的第一個參數 fd 指的是文件描述符,第二個參數指定了將要採取的動作(在這個例子中,F_SETLK 指代設置鎖),第三個參數為鎖結構的地址(在本例中,指的是 &lock)。
  • 假如生產者獲得了鎖,這個程序將向文件寫入兩個文本記錄。
  • 在向文件寫入內容後,生產者改變鎖結構中的 l_type 域為 unlock 值:

lock.l_type = F_UNLCK;

  • 並調用 fcntl 來執行解鎖操作。最後程序關閉了文件並退出。

示例 2. 消費者程序

#include <stdio.h>

#include <stdlib.h>

#include <fcntl.h>

#include <unistd.h>

#define FileName "data.dat"

void report_and_exit(const char* msg) {

[perror][4](msg);

[exit][5](-1); /* EXIT_FAILURE */

}

int main() {

struct flock lock;

lock.l_type = F_WRLCK; /* read/write (exclusive) lock */

lock.l_whence = SEEK_SET; /* base for seek offsets */

lock.l_start = 0; /* 1st byte in file */

lock.l_len = 0; /* 0 here means 'until EOF' */

lock.l_pid = getpid(); /* process id */

int fd; /* file descriptor to identify a file within a process */

if ((fd = open(FileName, O_RDONLY)) < 0) /* -1 signals an error */

report_and_exit("open to read failed...");

/* If the file is write-locked, we can't continue. */

fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */

if (lock.l_type != F_UNLCK)

report_and_exit("file is still write locked...");

lock.l_type = F_RDLCK; /* prevents any writing during the reading */

if (fcntl(fd, F_SETLK, &lock) < 0)

report_and_exit("can't get a read-only lock...");

/* Read the bytes (they happen to be ASCII codes) one at a time. */

int c; /* buffer for read bytes */

while (read(fd, &c, 1) > 0) /* 0 signals EOF */

write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */

/* Release the lock explicitly. */

lock.l_type = F_UNLCK;

if (fcntl(fd, F_SETLK, &lock) < 0)

report_and_exit("explicit unlocking failed...");

close(fd);

return 0;

}

相比於鎖的 API,消費者程序會相對複雜一點兒。特別的,消費者程序首先檢查文件是否被排斥性的被鎖,然後才嘗試去獲得一個共享鎖。相關的代碼為:

lock.l_type = F_WRLCK;

...

fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */

if (lock.l_type != F_UNLCK)

report_and_exit("file is still write locked...");

在 fcntl 調用中的 F_GETLK 操作指定檢查一個鎖,在本例中,上面代碼的聲明中給了一個 F_WRLCK 的排斥鎖。假如特指的鎖不存在,那麼 fcntl 調用將會自動地改變鎖類型域為 F_UNLCK 以此來顯示當前的狀態。假如文件是排斥性地被鎖,那麼消費者將會終止。(一個更健壯的程序版本或許應該讓消費者睡會兒,然後再嘗試幾次。)

假如當前文件沒有被鎖,那麼消費者將嘗試獲取一個共享(read-only)鎖(F_RDLCK)。為了縮短程序,fcntl 中的 F_GETLK 調用可以丟棄,因為假如其他進程已經保有一個讀寫鎖,F_RDLCK 的調用就可能會失敗。重新調用一個只讀鎖能夠阻止其他進程向文件進行寫的操作,但可以允許其他進程對文件進行讀取。簡而言之,共享鎖可以被多個進程所保有。在獲取了一個共享鎖後,消費者程序將立即從文件中讀取字節數據,然後在標準輸出中打印這些字節的內容,接著釋放鎖,關閉文件並終止。

下面的 % 為命令行提示符,下面展示的是從相同終端開啟這兩個程序的輸出:

% ./producer

Process 29255 has written to data file...

% ./consumer

Now is the winter of our discontent

Made glorious summer by this sun of York

在本次的代碼示例中,通過 IPC 傳輸的數據是文本:它們來自莎士比亞的戲劇《理查三世》中的兩行臺詞。然而,共享文件的內容還可以是紛繁複雜的,任意的字節數據(例如一個電影)都可以,這使得文件共享變成了一個非常靈活的 IPC 機制。但它的缺點是文件獲取速度較慢,因為文件的獲取涉及到讀或者寫。同往常一樣,編程總是伴隨著折中。下面的例子將通過共享內存來做 IPC,而不是通過共享文件,在性能上相應的有極大的提升。

共享內存

對於共享內存,Linux 系統提供了兩類不同的 API:傳統的 System V API 和更新一點的 POSIX API。在單個應用中,這些 API 不能混用。但是,POSIX 方式的一個壞處是它的特性仍在發展中,並且依賴於安裝的內核版本,這非常影響代碼的可移植性。例如,默認情況下,POSIX API 用內存映射文件來實現共享內存:對於一個共享的內存段,系統為相應的內容維護一個備份文件。在 POSIX 規範下共享內存可以被配置為不需要備份文件,但這可能會影響可移植性。我的例子中使用的是帶有備份文件的 POSIX API,這既結合了內存獲取的速度優勢,又獲得了文件存儲的持久性。

下面的共享內存例子中包含兩個程序,分別名為 memwriter 和 memreader,並使用信號量來調整它們對共享內存的獲取。在任何時候當共享內存進入一個寫入者場景時,無論是多進程還是多線程,都有遇到基於內存的競爭條件的風險,所以,需要引入信號量來協調(同步)對共享內存的獲取。

memwriter 程序應當在它自己所處的終端首先啟動,然後 memreader 程序才可以在它自己所處的終端啟動(在接著的十幾秒內)。memreader 的輸出如下:

This is the way the world ends...

在每個源程序的最上方註釋部分都解釋了在編譯它們時需要添加的鏈接參數。

首先讓我們複習一下信號量是如何作為一個同步機制工作的。一般的信號量也被叫做一個計數信號量,因為帶有一個可以增加的值(通常初始化為 0)。考慮一家租用自行車的商店,在它的庫存中有 100 輛自行車,還有一個供職員用於租賃的程序。每當一輛自行車被租出去,信號量就增加 1;當一輛自行車被還回來,信號量就減 1。在信號量的值為 100 之前都還可以進行租賃業務,但如果等於 100 時,就必須停止業務,直到至少有一輛自行車被還回來,從而信號量減為 99。

二元信號量是一個特例,它只有兩個值:0 和 1。在這種情況下,信號量的表現為互斥量(一個互斥的構造)。下面的共享內存示例將把信號量用作互斥量。當信號量的值為 0 時,只有 memwriter 可以獲取共享內存,在寫操作完成後,這個進程將增加信號量的值,從而允許 memreader 來讀取共享內存。

示例 3. memwriter 進程的源程序

/** Compilation: gcc -o memwriter memwriter.c -lrt -lpthread **/

#include <stdio.h>

#include <stdlib.h>

#include <sys/mman.h>

#include <sys/stat.h>

#include <fcntl.h>

#include <unistd.h>

#include <semaphore.h>

#include <string.h>

#include "shmem.h"

void report_and_exit(const char* msg) {

[perror][4](msg);

[exit][5](-1);

}

int main() {

int fd = shm_open(BackingFile, /* name from smem.h */

O_RDWR | O_CREAT, /* read/write, create if needed */

AccessPerms); /* access permissions (0644) */

if (fd < 0) report_and_exit("Can't open shared mem segment...");

ftruncate(fd, ByteSize); /* get the bytes */

caddr_t memptr = mmap(NULL, /* let system pick where to put segment */

ByteSize, /* how many bytes */

PROT_READ | PROT_WRITE, /* access protections */

MAP_SHARED, /* mapping visible to other processes */

fd, /* file descriptor */

0); /* offset: start at 1st byte */

if ((caddr_t) -1 == memptr) report_and_exit("Can't get segment...");

[fprintf][7](stderr, "shared mem address: %p [0..%d]\n", memptr, ByteSize - 1);

[fprintf][7](stderr, "backing file: /dev/shm%s\n", BackingFile );

/* semahore code to lock the shared mem */

sem_t* semptr = sem_open(SemaphoreName, /* name */

O_CREAT, /* create the semaphore */

AccessPerms, /* protection perms */

0); /* initial value */

if (semptr == (void*) -1) report_and_exit("sem_open");

[strcpy][8](memptr, MemContents); /* copy some ASCII bytes to the segment */

/* increment the semaphore so that memreader can read */

if (sem_post(semptr) < 0) report_and_exit("sem_post");

sleep(12); /* give reader a chance */

/* clean up */

munmap(memptr, ByteSize); /* unmap the storage */

close(fd);

sem_close(semptr);

shm_unlink(BackingFile); /* unlink from the backing file */

return 0;

}

下面是 memwriter 和 memreader 程序如何通過共享內存來通信的一個總結:

  • 上面展示的 memwriter 程序調用 shm_open 函數來得到作為系統協調共享內存的備份文件的文件描述符。此時,並沒有內存被分配。接下來調用的是令人誤解的名為 ftruncate 的函數

ftruncate(fd, ByteSize); /* get the bytes */

  • 它將分配 ByteSize 字節的內存,在該情況下,一般為大小適中的 512 字節。memwriter 和 memreader 程序都只從共享內存中獲取數據,而不是從備份文件。系統將負責共享內存和備份文件之間數據的同步。
  • 接著 memwriter 調用 mmap 函數:

caddr_t memptr = mmap(NULL, /* let system pick where to put segment */

ByteSize, /* how many bytes */

PROT_READ | PROT_WRITE, /* access protections */

MAP_SHARED, /* mapping visible to other processes */

fd, /* file descriptor */

0); /* offset: start at 1st byte */

  • 來獲得共享內存的指針。(memreader 也做一次類似的調用。) 指針類型 caddr_t 以 c 開頭,它代表 calloc,而這是動態初始化分配的內存為 0 的一個系統函數。memwriter 通過庫函數 strcpy(字符串複製)來獲取後續寫操作的 memptr。
  • 到現在為止,memwriter 已經準備好進行寫操作了,但首先它要創建一個信號量來確保共享內存的排斥性。假如 memwriter 正在執行寫操作而同時 memreader 在執行讀操作,則有可能出現競爭條件。假如調用 sem_open 成功了:

sem_t* semptr = sem_open(SemaphoreName, /* name */

O_CREAT, /* create the semaphore */

AccessPerms, /* protection perms */

0); /* initial value */

  • 那麼,接著寫操作便可以執行。上面的 SemaphoreName(任意一個唯一的非空名稱)用來在 memwriter 和 memreader 識別信號量。初始值 0 將會傳遞給信號量的創建者,在這個例子中指的是 memwriter 賦予它執行寫操作的權利。
  • 在寫操作完成後,memwriter* 通過調用sem_post` 函數將信號量的值增加到 1:

if (sem_post(semptr) < 0) ..

  • 增加信號了將釋放互斥鎖,使得 memreader 可以執行它的讀操作。為了更好地測量,memwriter 也將從它自己的地址空間中取消映射,

munmap(memptr, ByteSize); /* unmap the storage *

  • 這將使得 memwriter 不能進一步地訪問共享內存。

示例 4. memreader 進程的源代碼

/** Compilation: gcc -o memreader memreader.c -lrt -lpthread **/

#include <stdio.h>

#include <stdlib.h>

#include <sys/mman.h>

#include <sys/stat.h>

#include <fcntl.h>

#include <unistd.h>

#include <semaphore.h>

#include <string.h>

#include "shmem.h"

void report_and_exit(const char* msg) {

[perror][4](msg);

[exit][5](-1);

}

int main() {

int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* empty to begin */

if (fd < 0) report_and_exit("Can't get file descriptor...");

/* get a pointer to memory */

caddr_t memptr = mmap(NULL, /* let system pick where to put segment */

ByteSize, /* how many bytes */

PROT_READ | PROT_WRITE, /* access protections */

MAP_SHARED, /* mapping visible to other processes */

fd, /* file descriptor */

0); /* offset: start at 1st byte */

if ((caddr_t) -1 == memptr) report_and_exit("Can't access segment...");

/* create a semaphore for mutual exclusion */

sem_t* semptr = sem_open(SemaphoreName, /* name */

O_CREAT, /* create the semaphore */

AccessPerms, /* protection perms */

0); /* initial value */

if (semptr == (void*) -1) report_and_exit("sem_open");

/* use semaphore as a mutex (lock) by waiting for writer to increment it */

if (!sem_wait(semptr)) { /* wait until semaphore != 0 */

int i;

for (i = 0; i < [strlen][6](MemContents); i++)

write(STDOUT_FILENO, memptr + i, 1); /* one byte at a time */

sem_post(semptr);

}

/* cleanup */

munmap(memptr, ByteSize);

close(fd);

sem_close(semptr);

unlink(BackingFile);

return 0;

}

memwriter 和 memreader 程序中,共享內存的主要著重點都在 shm_open 和 mmap 函數上:在成功時,第一個調用返回一個備份文件的文件描述符,而第二個調用則使用這個文件描述符從共享內存段中獲取一個指針。它們對 shm_open 的調用都很相似,除了 memwriter 程序創建共享內存,而 `memreader 只獲取這個已經創建的內存:

int fd = shm_open(BackingFile, O_RDWR | O_CREAT, AccessPerms); /* memwriter */

int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* memreader */

有了文件描述符,接著對 mmap 的調用就是類似的了:

caddr_t memptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

mmap 的第一個參數為 NULL,這意味著讓系統自己決定在虛擬內存地址的哪個地方分配內存,當然也可以指定一個地址(但很有技巧性)。MAP_SHARED 標誌著被分配的內存在進程中是共享的,最後一個參數(在這個例子中為 0 ) 意味著共享內存的偏移量應該為第一個字節。size 參數特別指定了將要分配的字節數目(在這個例子中是 512);另外的保護參數(AccessPerms)暗示著共享內存是可讀可寫的。

當 memwriter 程序執行成功後,系統將創建並維護備份文件,在我的系統中,該文件為 /dev/shm/shMemEx,其中的 shMemEx 是我為共享存儲命名的(在頭文件 shmem.h 中給定)。在當前版本的 memwriter 和 memreader 程序中,下面的語句

shm_unlink(BackingFile); /* removes backing file */

將會移除備份文件。假如沒有 unlink 這個語句,則備份文件在程序終止後仍然持久地保存著。

memreader 和 memwriter 一樣,在調用 sem_open 函數時,通過信號量的名字來獲取信號量。但 memreader 隨後將進入等待狀態,直到 memwriter 將初始值為 0 的信號量的值增加。

if (!sem_wait(semptr)) { /* wait until semaphore != 0 */

一旦等待結束,memreader 將從共享內存中讀取 ASCII 數據,然後做些清理工作並終止。

共享內存 API 包括顯式地同步共享內存段和備份文件。在這次的示例中,這些操作都被省略了,以免文章顯得雜亂,好讓我們專注於內存共享和信號量的代碼。

即便在信號量代碼被移除的情況下,memwriter 和 memreader 程序很大機率也能夠正常執行而不會引入競爭條件:memwriter 創建了共享內存段,然後立即向它寫入;memreader 不能訪問共享內存,直到共享內存段被創建好。然而,當一個寫操作處於混合狀態時,最佳實踐需要共享內存被同步。信號量 API 足夠重要,值得在代碼示例中著重強調。

總結

上面共享文件和共享內存的例子展示了進程是怎樣通過共享存儲來進行通信的,前者通過文件而後者通過內存塊。這兩種方法的 API 相對來說都很直接。這兩種方法有什麼共同的缺點嗎?現代的應用經常需要處理流數據,而且是非常大規模的數據流。共享文件或者共享內存的方法都不能很好地處理大規模的流數據。按照類型使用管道會更加合適一些。所以這個系列的第二部分將會介紹管道和消息隊列,同樣的,我們將使用 C 語言寫的代碼示例來輔助講解。


via: https://opensource.com/article/19/4/interprocess-communication-linux-storage

作者: Marty Kalin 選題: lujun9972 譯者: FSSlc 校對: wxy

本文由 LCTT 原創編譯, Linux中國 榮譽推出

點擊“瞭解更多”可訪問文內鏈接

相關推薦

推薦中...