简介 进程间通信(IPC,Inter-Process Communication)指至少两个进程或线程间传送数据或信号的一些技术或方法。
IPC 分类 graph TB
A[进程间通信 IPC] --> B[Unix IPC]
A --> C[System V IPC]
A --> D[Posix IPC]
A --> E[网络IPC]
B --> B1[管道 Pipe]
B --> B2[有名管道 FIFO]
B --> B3[信号 Signal]
C --> C1[System V消息队列]
C --> C2[System V信号量]
C --> C3[System V共享内存]
D --> D1[Posix消息队列]
D --> D2[Posix信号量]
D --> D3[Posix共享内存]
E --> E1[套接字 Socket]
E --> E2[TCP/UDP]
style A fill:#ffcccc
style B fill:#ccffcc
style C fill:#ccccff
style D fill:#ffffcc
style E fill:#ffccff
IPC 特点
生命周期 :IPC 的生命周期都与内核相同,除非显式删除
权限控制 :大多数 IPC 机制都支持权限控制
性能差异 :不同 IPC 方式的性能差异很大
IPC 性能对比
IPC 方式
速度
容量
持久性
适用场景
管道
快
小
无
父子进程通信
消息队列
中
中
有
进程间消息传递
共享内存
最快
大
有
大数据量传输
信号量
-
-
有
同步控制
套接字
慢
大
无
网络通信
管道(Pipe) 管道(Pipe)是一种半双工的通信方式,数据只能单向流动,且只能在具有亲缘关系的进程间使用。
特点
单向通信 :数据只能在一个方向上流动
亲缘关系 :只能用于父子进程或兄弟进程之间
缓冲区限制 :管道有固定大小的缓冲区(通常 64KB)
阻塞读写 :当管道满时写操作阻塞,空时读操作阻塞
工作原理 sequenceDiagram
participant 父进程 as 父进程
participant 管道 as 管道缓冲区
participant 子进程 as 子进程
父进程->>管道: 创建管道 pipe()
父进程->>子进程: fork() 创建子进程
父进程->>管道: 写入数据 write()
管道->>子进程: 数据流动
子进程->>管道: 读取数据 read()
Go 实现示例 匿名管道 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "fmt" "io" "os" "os/exec" ) func main () { reader, writer, err := os.Pipe() if err != nil { panic (err) } cmd := exec.Command("cat" ) cmd.Stdin = reader cmd.Stdout = os.Stdout if err := cmd.Start(); err != nil { panic (err) } go func () { defer writer.Close() for i := 0 ; i < 10 ; i++ { fmt.Fprintf(writer, "Message %d\n" , i) } }() cmd.Wait() }
命名管道(FIFO) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package mainimport ( "fmt" "os" "syscall" ) func main () { fifoPath := "/tmp/myfifo" err := syscall.Mkfifo(fifoPath, 0666 ) if err != nil && !os.IsExist(err) { panic (err) } file, err := os.OpenFile(fifoPath, os.O_WRONLY, os.ModeNamedPipe) if err != nil { panic (err) } defer file.Close() fmt.Fprintf(file, "Hello from FIFO\n" ) }
使用场景
父子进程间的简单数据传递
命令行工具链(如 ls | grep)
进程间单向数据流
注意事项
管道是阻塞的,需要协调读写操作
管道缓冲区有限,不适合大数据量传输
命名管道需要文件系统支持
信号(Signal) 信号是 Unix/Linux 系统中用于进程间通信的一种机制,用于通知进程发生了某种事件。
常见信号
信号
值
说明
默认动作
SIGHUP
1
挂起信号
终止
SIGINT
2
中断信号(Ctrl+C)
终止
SIGQUIT
3
退出信号
终止+核心转储
SIGKILL
9
强制终止
终止
SIGTERM
15
终止信号
终止
SIGUSR1
10
用户自定义信号1
终止
SIGUSR2
12
用户自定义信号2
终止
信号处理流程 flowchart TD
A[进程A] -->|发送信号| B[内核]
B -->|传递信号| C[进程B]
C --> D{信号处理方式?}
D -->|默认处理| E[执行默认动作]
D -->|忽略信号| F[忽略信号]
D -->|自定义处理| G[执行信号处理函数]
style A fill:#ffcccc
style C fill:#ccffcc
style G fill:#ccccff
Go 实现示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package mainimport ( "fmt" "os" "os/signal" "syscall" "time" ) func main () { sigChan := make (chan os.Signal, 1 ) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1) go func () { for { fmt.Println("Working..." ) time.Sleep(1 * time.Second) } }() sig := <-sigChan fmt.Printf("Received signal: %v\n" , sig) fmt.Println("Cleaning up..." ) time.Sleep(1 * time.Second) fmt.Println("Exiting..." ) }
发送信号 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package mainimport ( "fmt" "os" "syscall" ) func main () { pid := 12345 process, err := os.FindProcess(pid) if err != nil { panic (err) } err = process.Signal(syscall.SIGTERM) if err != nil { panic (err) } fmt.Printf("Sent SIGTERM to process %d\n" , pid) }
使用场景
优雅关闭程序(SIGTERM)
重新加载配置(SIGHUP)
进程间通知(SIGUSR1/SIGUSR2)
调试和监控
注意事项
SIGKILL 和 SIGSTOP 不能被捕获或忽略
信号处理函数应该是可重入的
信号可能丢失,不适合用于关键数据传递
消息队列(Message Queue) 消息队列是消息的链接表,存储在系统内核中。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。
特点
异步通信 :发送者和接收者不需要同时存在
消息格式 :支持结构化消息
优先级 :可以设置消息优先级
持久性 :消息队列在内核中持久存在
消息队列工作流程 sequenceDiagram
participant 进程A as 进程A 发送者
participant 队列 as 消息队列
participant 进程B as 进程B 接收者
进程A->>队列: msgsnd() 发送消息
队列->>队列: 消息入队
Note over 队列: 消息存储在队列中
进程B->>队列: msgrcv() 接收消息
队列->>进程B: 返回消息
队列->>队列: 消息出队
Go 实现示例 System V 消息队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package mainimport ( "fmt" "syscall" "unsafe" ) const ( IPC_CREAT = 00001000 IPC_EXCL = 00002000 MSGMAX = 8192 ) type msgbuf struct { mtype int64 mtext [MSGMAX]byte } func main () { key := 1234 msgflg := IPC_CREAT | 0666 msqid, _, errno := syscall.Syscall(syscall.SYS_MSGGET, uintptr (key), uintptr (msgflg), 0 ) if errno != 0 { panic (fmt.Sprintf("msgget failed: %v" , errno)) } msg := msgbuf{ mtype: 1 , mtext: [MSGMAX]byte {}, } copy (msg.mtext[:], []byte ("Hello from message queue" )) _, _, errno = syscall.Syscall6(syscall.SYS_MSGSND, uintptr (msqid), uintptr (unsafe.Pointer(&msg)), uintptr (len ("Hello from message queue" )), 0 , 0 , 0 ) if errno != 0 { panic (fmt.Sprintf("msgsnd failed: %v" , errno)) } fmt.Printf("Message sent to queue %d\n" , msqid) }
使用第三方库(推荐) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "fmt" "github.com/zeromq/goczmq" ) func main () { pusher, err := goczmq.NewPush("inproc://example" ) if err != nil { panic (err) } defer pusher.Destroy() puller, err := goczmq.NewPull("inproc://example" ) if err != nil { panic (err) } defer puller.Destroy() message := []byte ("Hello from ZMQ" ) err = pusher.Send(message, goczmq.FlagNone) if err != nil { panic (err) } received, err := puller.Recv() if err != nil { panic (err) } fmt.Printf("Received: %s\n" , string (received)) }
使用场景
进程间异步消息传递
解耦生产者和消费者
需要消息持久化的场景
多对多通信模式
注意事项
消息队列有大小限制
需要手动管理消息队列的生命周期
消息可能丢失(取决于实现)
信号量(Semaphore) 信号量的本质是一种数据操作锁,用来负责数据操作过程中的互斥、同步等功能。信号量用来管理临界资源,它本身只是一种外部资源的标识,不具有数据交换功能。
信号量操作 信号量通过 PV 操作来控制:
P 操作 (wait/sleep/down):减少信号量,如果信号量为 0 则阻塞
V 操作 (signal/wake-up/up):增加信号量,唤醒等待的进程
PV 操作流程 flowchart TD
A[进程请求资源] --> B[执行P操作]
B --> C{信号量 > 0?}
C -->|是| D[信号量减1]
C -->|否| E[进程阻塞等待]
D --> F[获得资源]
E --> G[等待V操作]
G --> H[被唤醒]
H --> D
F --> I[使用资源]
I --> J[释放资源]
J --> K[执行V操作]
K --> L[信号量加1]
L --> M[唤醒等待进程]
style A fill:#ffcccc
style F fill:#ccffcc
style M fill:#ccccff
Go 实现示例 System V 信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package mainimport ( "fmt" "syscall" "unsafe" ) const ( IPC_CREAT = 00001000 IPC_EXCL = 00002000 GETVAL = 12 SETVAL = 16 SEM_UNDO = 0x1000 ) type sembuf struct { sem_num uint16 sem_op int16 sem_flg int16 } func main () { key := 5678 semflg := IPC_CREAT | 0666 semid, _, errno := syscall.Syscall(syscall.SYS_SEMGET, uintptr (key), uintptr (1 ), uintptr (semflg)) if errno != 0 { panic (fmt.Sprintf("semget failed: %v" , errno)) } arg := uintptr (1 ) _, _, errno = syscall.Syscall(syscall.SYS_SEMCTL, uintptr (semid), 0 , SETVAL, arg) if errno != 0 { panic (fmt.Sprintf("semctl SETVAL failed: %v" , errno)) } sbuf := sembuf{ sem_num: 0 , sem_op: -1 , sem_flg: SEM_UNDO, } _, _, errno = syscall.Syscall(syscall.SYS_SEMOP, uintptr (semid), uintptr (unsafe.Pointer(&sbuf)), 1 ) if errno != 0 { panic (fmt.Sprintf("semop P failed: %v" , errno)) } fmt.Println("Critical section" ) sbuf.sem_op = 1 _, _, errno = syscall.Syscall(syscall.SYS_SEMOP, uintptr (semid), uintptr (unsafe.Pointer(&sbuf)), 1 ) if errno != 0 { panic (fmt.Sprintf("semop V failed: %v" , errno)) } fmt.Println("Released" ) }
使用 Go 标准库 sync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "fmt" "sync" "time" ) var ( semaphore = make (chan struct {}, 3 ) wg sync.WaitGroup ) func worker (id int ) { defer wg.Done() semaphore <- struct {}{} defer func () { <-semaphore }() fmt.Printf("Worker %d: Working...\n" , id) time.Sleep(2 * time.Second) fmt.Printf("Worker %d: Done\n" , id) } func main () { for i := 1 ; i <= 10 ; i++ { wg.Add(1 ) go worker(i) } wg.Wait() fmt.Println("All workers completed" ) }
使用场景
控制并发访问数量
实现互斥锁
同步多个进程
资源池管理
注意事项
避免死锁(确保 P 和 V 操作配对)
注意信号量的初始值设置
System V 信号量需要手动清理
共享内存(Shared Memory) 共享内存是最高效的 IPC 方式,因为数据不需要在进程间复制,而是直接映射到多个进程的地址空间。
工作原理 graph TB
A[进程A] --> C[共享内存区域]
B[进程B] --> C
D[进程C] --> C
C --> E[内核空间]
A -.->|直接访问| C
B -.->|直接访问| C
D -.->|直接访问| C
style C fill:#ccffcc
style E fill:#ccccff
特点
最高性能 :数据不需要复制,直接访问
大容量 :可以共享大量数据
需要同步 :需要配合信号量或锁使用
持久性 :共享内存区域在内核中持久存在
Go 实现示例 System V 共享内存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "fmt" "syscall" "unsafe" ) const ( IPC_CREAT = 00001000 IPC_EXCL = 00002000 SHM_RDONLY = 010000 ) func main () { key := 9999 size := 4096 shmflg := IPC_CREAT | 0666 shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr (key), uintptr (size), uintptr (shmflg)) if errno != 0 { panic (fmt.Sprintf("shmget failed: %v" , errno)) } shmaddr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, uintptr (shmid), 0 , 0 ) if errno != 0 { panic (fmt.Sprintf("shmat failed: %v" , errno)) } shmBytes := (*[4096 ]byte )(unsafe.Pointer(shmaddr)) message := "Hello from shared memory" copy (shmBytes[:], []byte (message)) fmt.Printf("Written to shared memory: %s\n" , message) fmt.Printf("Shared memory ID: %d\n" , shmid) }
使用 mmap(推荐) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package mainimport ( "fmt" "os" "syscall" "unsafe" ) func main () { file, err := os.CreateTemp("" , "shm_*" ) if err != nil { panic (err) } defer os.Remove(file.Name()) defer file.Close() size := 4096 file.Truncate(int64 (size)) data, err := syscall.Mmap(int (file.Fd()), 0 , size, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) if err != nil { panic (err) } defer syscall.Munmap(data) message := "Hello from mmap" copy (data, []byte (message)) syscall.Msync(data, syscall.MS_SYNC) fmt.Printf("Written: %s\n" , string (data[:len (message)])) }
使用场景
大数据量传输
高性能要求的场景
多个进程需要访问相同数据
实时数据共享
注意事项
必须配合同步机制(信号量、锁)使用
需要注意内存对齐和字节序
共享内存需要手动清理
可能存在安全风险(需要权限控制)
套接字(Socket) 套接字是最通用的 IPC 方式,不仅可以在同一台机器上的进程间通信,还可以通过网络在不同机器间通信。
套接字类型 graph TB
A[套接字 Socket] --> B[Unix域套接字]
A --> C[网络套接字]
B --> B1[流式套接字 SOCK_STREAM]
B --> B2[数据报套接字 SOCK_DGRAM]
C --> C1[TCP 流式]
C --> C2[UDP 数据报]
C --> C3[原始套接字]
style A fill:#ffcccc
style B fill:#ccffcc
style C fill:#ccccff
Unix 域套接字 Unix 域套接字用于同一台机器上的进程间通信,性能比网络套接字更好。
Go 实现示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package mainimport ( "fmt" "net" "os" "os/signal" "syscall" "time" ) func server () { socketPath := "/tmp/unix.sock" os.Remove(socketPath) listener, err := net.Listen("unix" , socketPath) if err != nil { panic (err) } defer listener.Close() fmt.Println("Server listening on" , socketPath) conn, err := listener.Accept() if err != nil { panic (err) } defer conn.Close() buf := make ([]byte , 1024 ) n, err := conn.Read(buf) if err != nil { panic (err) } fmt.Printf("Server received: %s\n" , string (buf[:n])) conn.Write([]byte ("Hello from server" )) } func client () { socketPath := "/tmp/unix.sock" conn, err := net.Dial("unix" , socketPath) if err != nil { panic (err) } defer conn.Close() conn.Write([]byte ("Hello from client" )) buf := make ([]byte , 1024 ) n, err := conn.Read(buf) if err != nil { panic (err) } fmt.Printf("Client received: %s\n" , string (buf[:n])) } func main () { go server() time.Sleep(100 * time.Millisecond) go client() sigChan := make (chan os.Signal, 1 ) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan os.Remove("/tmp/unix.sock" ) }
TCP/UDP 套接字 TCP/UDP 套接字用于网络通信,Go 标准库提供了完善的网络编程支持。
TCP 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package mainimport ( "fmt" "net" "time" ) func tcpServer () { listener, err := net.Listen("tcp" , ":8080" ) if err != nil { panic (err) } defer listener.Close() fmt.Println("TCP Server listening on :8080" ) for { conn, err := listener.Accept() if err != nil { continue } go handleTCPConnection(conn) } } func handleTCPConnection (conn net.Conn) { defer conn.Close() buf := make ([]byte , 1024 ) n, err := conn.Read(buf) if err != nil { return } fmt.Printf("Received: %s\n" , string (buf[:n])) conn.Write([]byte ("Echo: " + string (buf[:n]))) } func tcpClient () { conn, err := net.Dial("tcp" , "localhost:8080" ) if err != nil { panic (err) } defer conn.Close() conn.Write([]byte ("Hello TCP" )) buf := make ([]byte , 1024 ) n, err := conn.Read(buf) if err != nil { panic (err) } fmt.Printf("Response: %s\n" , string (buf[:n])) } func main () { go tcpServer() time.Sleep(100 * time.Millisecond) tcpClient() }
使用场景
网络通信
跨机器进程通信
客户端-服务器架构
分布式系统
注意事项
需要考虑网络延迟和错误处理
TCP 是可靠的,UDP 是不可靠的
需要注意端口占用和防火墙设置
IPC 方式对比与选择 性能对比 graph LR
A[IPC性能] --> B[共享内存 最快]
A --> C[管道 快]
A --> D[消息队列 中等]
A --> E[套接字 较慢]
style B fill:#00ff00
style C fill:#90ff90
style D fill:#ffff90
style E fill:#ff9090
选择指南 flowchart TD
A[选择IPC方式] --> B{需要跨机器?}
B -->|是| C[套接字 Socket]
B -->|否| D{数据量大?}
D -->|是| E{需要持久化?}
D -->|否| F{需要同步?}
E -->|是| G[共享内存+信号量]
E -->|否| H[共享内存]
F -->|是| I[消息队列]
F -->|否| J{父子进程?}
J -->|是| K[管道 Pipe]
J -->|否| L[命名管道 FIFO]
style C fill:#ffcccc
style G fill:#ccffcc
style I fill:#ccccff
style K fill:#ffffcc
最佳实践
性能优先 :使用共享内存
简单通信 :使用管道或命名管道
异步消息 :使用消息队列
同步控制 :使用信号量
网络通信 :使用套接字
跨平台 :优先使用 Posix IPC
常见问题 1. 共享内存同步问题 问题 :多个进程同时访问共享内存可能导致数据竞争。
解决 :使用信号量或互斥锁进行同步。
2. 消息队列满 问题 :消息队列满了导致发送失败。
解决 :增加队列大小或使用非阻塞模式。
3. 管道阻塞 问题 :管道读写操作可能永久阻塞。
解决 :使用非阻塞 I/O 或设置超时。
4. 信号丢失 问题 :信号可能丢失,不适合关键数据传递。
解决 :使用其他 IPC 方式传递数据,信号仅用于通知。
总结 Go 语言支持多种 IPC 方式,每种方式都有其适用场景:
管道 :适合简单的父子进程通信
信号 :适合进程控制和通知
消息队列 :适合异步消息传递
信号量 :适合同步和互斥控制
共享内存 :适合高性能大数据量传输
套接字 :适合网络通信和跨机器通信
选择合适的 IPC 方式需要考虑性能、复杂度、持久性、同步需求等因素。
参考文献