【GO】基于共享内存的跨进程通信方案

使用Golang实现了基于共享内存的跨进程通信,高效通信

【GO】基于共享内存的跨进程通信方案

By img Microanswer Create at:Mar 24, 2025, 2:34:27 PM 

Tags: go 跨进程通信 IPC 共享内存

使用Golang实现了基于共享内存的跨进程通信,高效通信


一、背景

最近遇到一新需求,需要将一个功能模块单独提取出来作为一个项目实现,然后这个功能模块可以被编译为一个单独运行的应用,且此应用中的功能可以被其它应用调用,其它应用可以是任意语言实现的。

我希望此功能模块能够高效的为其它应用提供交互能力,为此我不愿使用web、socket 等方案来实现,这不仅效率低,延迟还高。于是我打算使用基于共享内存的方法实现。

二、原理

在 golang 中,我们可以通过 syscall.CreateFileMappingsyscall.MapViewOfFile 这两个函数来实现共享内存的创建和映射到应用中读取内存地址。基于这两个函数就可以开始我们的共享内存实现跨进程通信了。

通过共享内存来实现通信,本质上就是两个不同的应用,都可以拿到这块内存的地址,然后读取里面的内容。要实现相互之间的通信,就必须制定一个相互之间的读写沟通规范,从而实现你读我写或我读你写的这么一个状态。下面简单的描述一下本代码实现的沟通规范:

// 口口口口口口口口 | 口 | 口口口口口口口口 | 口 | 口口口口 | 口口口口口口口......口口口口口口口
// 时间戳8字节|读写状态1字节|数据体大小8字节|数据体状态1字节|预留4字节|数据体区(任意字节)

上面一个“口”表示一个字节,这些所有“口”构成一整个共享内存区域的内容。其中被划分为了几个功能区域。

  1. 时间戳区域,占用8个字节:我们定义每10毫秒将发送方的当前时间写入该8个字节中,这样接收方可以通过判断这8个字节读取出来的时间戳和当前时间戳是否小于10毫秒,如果大于了,说明发送方没有运行或断线了。
  2. 读写状态区域,占用1个字节:这里只需要保存 0 和 1 两个值,因此使用1个字节搓搓有余了,0 表示后面的“数据体”的内容无意义,里面的数据没用。1 表示里面的数据是发送方写进去的,并且等待接收方将其读取,接收方读取完成后将该区域的值又设置为 0。发送方发现是0了,认为接收方已读取,并重新写入新值。
  3. 数据体大小区域,占用8字节:这里保存了数据体区域的实际数据长度。
  4. 数据体状态区域,占用1字节:这里保存了一个非常有用的状态,1=数据体的数据是否要发送的数据的部分数据;2=数据体的数据是否是要发送的数据结束部分;这在发送相当大的内容时非常有用,接收方和发送方都会多次分片发送来确保一个很大的数据能够完整被发送和接受。

三、代码实现

下面,是整个代码实现上面的逻辑。

此代码适用于Windows环境,如果是Linux下,那么创建共享内存的实现方式有所不同,你如果要用,需要自己修改那部分代码。
import (
	"encoding/binary"
	"fmt"
	"slices"
	"syscall"
	"time"
	"unsafe"
)

// 口口口口口口口口|口|口口口口口口口口|口|口口口口|口口口口口口口口口口口口口口口口口口口口口口口口口口口口口口口口口
// 时间戳|读写状态|数据体大小|数据体状态|预留4byte|数据体区

const (
	RW_STATUS_NOTHING = 0
	RW_STATUS_WAITING = 1

	DATA_TYPE_THUNK = 1
	DATA_TYPE_FULL  = 2
)

// uint642bytes uint64 转为
func int642bytes(i int64) []byte {
	b := make([]byte, 8)
	binary.BigEndian.PutUint64(b, uint64(i))
	return b
}
func bytes2int64(b []byte) int64 {
	return int64(binary.BigEndian.Uint64(b))
}

// 时间戳 偏移量
func _timePosition() (int64, int64) {
	return 0, 8
}

// 读写状态区 偏移量
func _RWstatusPosition() (int64, int64) {
	_, end := _timePosition()
	return end, end + 1
}

// 数据体大小区 偏移量
func _DataLenPosition() (int64, int64) {
	_, end := _RWstatusPosition()
	return end, end + 8
}

// 数据体状态区 偏移量
func _DataStatusPosition() (int64, int64) {
	_, end := _DataLenPosition()
	return end, end + 1
}

// 预留去空白区 偏移量
func _EmptyPosition() (int64, int64) {
	_, end := _DataStatusPosition()
	return end, end + 4
}

type MemSender struct {
	ID           string // 共享内存的 ID,一般这个应该唯一,多个Sender不能用同一个ID。
	DataAreaSize int64  // 数据体区大小

	shareMemSize int64 // 整个共享内存区大小。
	isWatching   bool
	handle       syscall.Handle
	memAddr      uintptr

	needSendData chan []byte

	destoryed bool
}

// Send 发送数据
func (s *MemSender) Send(data []byte) {
	s.needSendData <- slices.Clone(data)
}

func (s *MemSender) Close() {
	s.destoryed = true

	err := syscall.UnmapViewOfFile(s.memAddr)
	if err != nil {
		fmt.Println(err)
	}
	err = syscall.CloseHandle(s.handle)
	if err != nil {
		fmt.Println(err)
	}

}

// 数据体区 偏移量
func (s *MemSender) _DataPosition() (int64, int64) {
	_, end := _EmptyPosition()
	return end, s.shareMemSize
}

// 进行时间戳更新
func (s *MemSender) _update() {
	data := (*[1 << 30]byte)(unsafe.Pointer(s.memAddr))[:s.shareMemSize]
	for !s.destoryed {
		now := time.Now().UnixMilli()

		// 第一位,写入当前时间
		start, end := _timePosition()
		copy(data[start:end], int642bytes(now))

		//fmt.Printf("%v\r", now)

		// 每 10 毫秒进行一次。
		time.Sleep(10 * time.Millisecond)
	}
}

// 进行数据检测并发送
func (s *MemSender) _run() {
	data := (*[1 << 30]byte)(unsafe.Pointer(s.memAddr))[:s.shareMemSize]

	// 开始运行时,将数据体内的数据设置为无意义。
	rwStatusStart, rwStatusEnd := _RWstatusPosition()
	data[rwStatusStart:rwStatusEnd][0] = RW_STATUS_NOTHING

	for !s.destoryed {

		sendData := <-s.needSendData

		dataPositionStart, _ := s._DataPosition()
		total := int64(len(sendData))

		processedLen := int64(0)
		startOfNeedData := int64(0)
		//fmt.Printf("来数据了:%v\n", sendData)

		// 还没发完,就持续发送。
		for processedLen < total && !s.destoryed {

			// 先看数据体的读写状态,如果是待读取的,那么不要做任何事情,等待
			if data[rwStatusStart:rwStatusEnd][0] == RW_STATUS_WAITING {
				time.Sleep(1 * time.Millisecond)
				continue
			}

			// 不是待读取,说明数据已经被读取,此时写入新数据
			nextData := sendData[startOfNeedData:min(startOfNeedData+s.DataAreaSize, total)]
			currentLength := int64(len(nextData))
			copy(data[dataPositionStart:dataPositionStart+currentLength], nextData)

			// 写入本次数据体的大小
			lenPositionStart, lenPositionEnd := _DataLenPosition()
			copy(data[lenPositionStart:lenPositionEnd], int642bytes(currentLength))

			processedLen += currentLength
			startOfNeedData += currentLength

			// 写入 数据体是否到达尾部
			isEnd := processedLen >= total
			isEndStart, isEndEnd := _DataStatusPosition()
			if isEnd {
				data[isEndStart:isEndEnd][0] = DATA_TYPE_FULL
			} else {
				data[isEndStart:isEndEnd][0] = DATA_TYPE_THUNK
			}

			// 标记数据体的数据待读取
			data[rwStatusStart:rwStatusEnd][0] = RW_STATUS_WAITING

			//fmt.Printf("已写出%v:%v\n", currentLength, nextData)
		}

		//fmt.Printf("已发送完一波:%v\n", processedLen)
	}
}

func (s *MemSender) Watching() error {
	if s.isWatching {
		return nil
	}

	s.needSendData = make(chan []byte)
	s.shareMemSize = 8 /*时间戳区*/ + 1 /*读写状态区*/ + 8 /* 1个数据大小区战8字节 */ + 1 /*数据体状态区*/ + 4 /*预留4byte*/ + s.DataAreaSize /* 数据区 */

	file, err := syscall.UTF16PtrFromString("Local\\" + s.ID)

	if err != nil {
		return err
	}

	s.handle, err = syscall.CreateFileMapping(0, nil, syscall.PAGE_READWRITE, 0, uint32(s.shareMemSize), file)

	if err != nil {
		return err
	}

	s.memAddr, err = syscall.MapViewOfFile(s.handle, syscall.FILE_MAP_WRITE, 0, 0, uintptr(s.shareMemSize))

	if err != nil {
		return err
	}

	s.isWatching = true
	go s._update()
	go s._run()

	return nil
}

func NewMemSender(ID string) *MemSender {
	s := &MemSender{
		ID:           ID,
		DataAreaSize: 1024 * 1024, // // 默认 1mb 数据区大小
	}

	err := s.Watching()

	if err != nil {
		panic(err)
	}

	return s
}

type MemReceiver struct {
	ID           string // 指定要从哪个ID的共享内存发送器进行接收
	DataAreaSize int64

	shareMemSize int64
	isPending    bool
	receiveChan  chan []byte

	handle  syscall.Handle
	memAddr uintptr

	senderOffLine bool
	destoryed     bool
}

// 数据体区 偏移量
func (s *MemReceiver) _DataPosition() (int64, int64) {
	_, end := _EmptyPosition()
	return end, s.shareMemSize
}

func (s *MemReceiver) IsSenderOffLine() bool {
	return s.senderOffLine
}

func (s *MemReceiver) Pending() error {
	if s.isPending {
		return nil
	}
	s.receiveChan = make(chan []byte)
	s.shareMemSize = 8 /*时间戳区*/ + 1 /*读写状态区*/ + 8 /* 1个数据大小区战8字节 */ + 1 /*数据体状态区*/ + 4 /*预留4byte*/ + s.DataAreaSize /* 数据区 */

	file, err := syscall.UTF16PtrFromString("Local\\" + s.ID)

	if err != nil {
		return err
	}

	s.handle, err = syscall.CreateFileMapping(0, nil, syscall.PAGE_READWRITE, 0, uint32(s.shareMemSize), file)

	if err != nil {
		return err
	}

	s.memAddr, err = syscall.MapViewOfFile(s.handle, syscall.FILE_MAP_WRITE, 0, 0, uintptr(s.shareMemSize))

	if err != nil {
		return err
	}

	s.isPending = true

	go s._update()
	go s._run()

	return nil
}

func (s *MemReceiver) GetData() []byte {
	return <-s.receiveChan
}

func (s *MemReceiver) Close() {
	s.destoryed = true
	err := syscall.UnmapViewOfFile(s.memAddr)
	if err != nil {
		fmt.Println(err)
	}

	err = syscall.CloseHandle(s.handle)
	if err != nil {
		fmt.Println(err)
	}

}

func (s *MemReceiver) _update() {

	data := (*[1 << 30]byte)(unsafe.Pointer(s.memAddr))[:s.shareMemSize]
	for !s.destoryed {
		// 第一位,获取记录的是发送器写入的当前时间,将其获取出来,如果这个时间和此时超过了10毫秒,说明发送器可能已被关闭。
		start, end := _timePosition()

		senderLastUpdateTime := bytes2int64(data[start:end])

		s.senderOffLine = time.Now().UnixMilli()-senderLastUpdateTime > 10

		// 每 10 毫秒进行一次。
		time.Sleep(10 * time.Millisecond)
	}

}
func (s *MemReceiver) _run() {
	data := (*[1 << 30]byte)(unsafe.Pointer(s.memAddr))[:s.shareMemSize]

	// 开始接受数据
	rwStatusStart, rwStatusEnd := _RWstatusPosition()

	for !s.destoryed {
		// 先看数据体的读写状态,如果是无意义的,那么不要做任何事情,等待
		if data[rwStatusStart:rwStatusEnd][0] == RW_STATUS_NOTHING {
			time.Sleep(1 * time.Millisecond)
			continue
		}
		// 读取数据体大小
		lenPositionStart, lenPositionEnd := _DataLenPosition()

		// 定义一个变量保存收到的数据
		recData := make([]byte, 0)

		// 读取数据是否达到结尾
		isEndStart, isEndEnd := _DataStatusPosition()
		for !s.destoryed {

			// 再看数据体的读写状态,如果是无意义的,那么不要做任何事情,等待
			if data[rwStatusStart:rwStatusEnd][0] == RW_STATUS_NOTHING {
				time.Sleep(10 * time.Millisecond)
				continue
			}

			dataLength := bytes2int64(data[lenPositionStart:lenPositionEnd])
			//fmt.Printf("dataLength: %v\n", dataLength)

			dataPositionStart, _ := s._DataPosition()

			recData = append(recData, data[dataPositionStart:dataPositionStart+dataLength]...)

			// 读取完成数据,将读写状态区,修改为 无意义,这样 发送器就会发送下一个数据。
			data[rwStatusStart:rwStatusEnd][0] = RW_STATUS_NOTHING

			if data[isEndStart:isEndEnd][0] == DATA_TYPE_FULL {
				// 接收完成,退出循环
				break
			}

		}

		s.receiveChan <- recData
	}
}

func NewMemReceiver(ID string) *MemReceiver {
	mr := &MemReceiver{
		ID:           ID,
		DataAreaSize: 1024 * 1024,
	}

	err := mr.Pending()

	if err != nil {
		panic(err)
	}

	return mr
}

以上代码适用于Windows环境,如果是Linux下,那么创建共享内存的实现方式有所不同,你如果要用,需要自己修改那部分代码。

四、使用方法

这个实现使得我们在实现跨进程通信时变得非常简单:

A 应用中的代码

sender := NewMemSender("com_myapp_a")
sender.Send([]byte("Hello App B"))
time.Sleep(60 * time.Second)

B 应用中的代码

// com_myapp_a 这个值必须和发送方保持一致,相当于是大家一起在这个频道上进行通信。
receiver := NewMemReceiver("com_myapp_a")
data := receiver.GetData()
fmt.Println(string(data)) // 输出:Hello App B

如果你启动 A 应用后,在启动 B 应用,那么你将会看到B应用输出对应发过来的消息。

如果你只启动了B,没启动A,那么 B 会卡在 GetData 这里等待发送消息。

如果发送方多次调用 Send 函数发送数据,而接收方没有读取出数据,那么发送方会在第二次发送时卡住,应为第一次发送的内容还没被读取。

Full text complete, Reproduction please indicate the source. Help you? Not as good as one:
Comment(Comments need to be logged in. You are not logged in.)
You need to log in before you can comment.

Comments (0 Comments)