Introduction
Unix Domain Sockets are an Interprocess Communication(IPC) mechanism that is available on Linux/OSX/BSD/Windows systems.
Go has support for unix sockets via the net.Dial() and net.Listen() calls. However this lacks higher level wrappers that provide:
- Authentication/Security
- Chunking
- Message streaming
- RPCs
As such I am releasing a set of packages to provide these on Linux/OSX systems.
TLDR
- Packages to do IPC via unix sockets made easy
- Raw sockets support
- Chunking support
- Streaming support (Protocol Buffers/JSON)
- RPC support (Protocol Buffers/JSON)
- Support OSX/Linux
- Provides authentication
- Benchmarked proto RPC client/server against gRPC
- Faster in most cases
- Way less allocations
- Find here: https://github.com/johnsiilver/golib/tree/master/ipc/uds
Table of Contents
- IPC Choices
- Examples
a. Raw Stream
b. Bi-Directionaly Streaming with JSON
c. RPCs with Protocol Buffers - Benchmarks
IPC Choices
There are a few flavors of IPC that a user can choose from:
- Unix Domain Sockets
- Various Message Queuing (like SystemV message queues)
- Shared memory (shm)
- Using the IP stack on loopbacks
- ...
For the highest speeds, I'd recommend a message queue. There are not a lot of benchmarks for these in recent years I could find, but all of the ones around 10 years ago point to shared memory being the fastest (by a large margin), message queues next, unix sockets and finally IP on loopback.
shm is painful to use and I have shied away from it. While I was working at Google, I would see notes in major packages about possibly using shm in the future. I noticed that never happened in any of those packages and I'm guessing it wasn't necessary and it was painful to implement. Maybe I just haven't found the best wrapper for shm yet, but using it always looks like "YIKES".
Message Queues come in different flavors, but don't seem to have ubiquitous support. Linux and most BSDs provides sysv queues, but OSX and Windows doesn't. If speed is paramount and you are on supported systems, maybe message queues are the way to go.
Unix sockets work on all OSs (though not all features are shared) and are faster than the IP stack. In addition, with some local knowledge you can pull authentication information from connections in addition to file rights for security. With IP, you must deal with this on your own.
If unix sockets are fast enough for your internal processing and you are using Linux/OSX, this package is for you (I may add support for BSD and Windows).
Examples
Using the raw stream
Package: github.com/johnsiilver/golib/tree/master/ipc/uds
Example: github.com/johnsiilver/golib/tree/master/ipc/uds/example
Use the raw stream if you want to implement your own io where you just need io.ReadWriteCloser types. This is probably most helpful when forwarding content you aren't reasoning about. Most users will at least want to use the higher level chunking on top of this package.
An example server that returns the current time in UTF-8 UTC every 10 seconds.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/google/uuid"
"github.com/johnsiilver/golib/ipc/uds"
)
func main() {
socketAddr := filepath.Join(os.TempDir(), uuid.New().String())
cred, _, err := uds.Current()
if err != nil {
panic(err)
}
// This will set the socket file to have a uid and gid of whatever the
// current user is. 0770 will be set for the file permissions (though on some
// systems the sticky bit gets set, resulting in 1770.
serv, err := uds.NewServer(socketAddr, cred.UID.Int(), cred.GID.Int(), 0770)
if err != nil {
panic(err)
}
fmt.Println("Listening on socket: ", socketAddr)
// This listens for a client connecting and returns the connection object.
for conn := range serv.Conn() {
conn := conn
// We spinoff handling of this connection to its own goroutine and
// go back to listening for another connection.
go func() {
// We are checking the client's user ID to make sure its the same
// user ID or we reject it. Cred objects give you the user's
// uid/gid/pid for filtering.
if conn.Cred.UID.Int() != cred.UID.Int() {
log.Printf("unauthorized user uid %d attempted a connection", conn.Cred.UID.Int())
conn.Close()
return
}
// Write to the stream every 10 seconds until the connection closes.
for {
if _, err := conn.Write([]byte(fmt.Sprintf("%s\n", time.Now().UTC()))); err != nil {
conn.Close()
return
}
time.Sleep(10 * time.Second)
}
}()
}
}
A example client that connects to the server and reads what the server sends to stdout:
package main
import (
"flag"
"fmt"
"io"
"os"
"github.com/johnsiilver/golib/ipc/uds"
)
var (
addr = flag.String("addr", "", "The path to the unix socket to dial")
)
func main() {
flag.Parse()
if *addr == "" {
fmt.Println("did not pass --addr")
os.Exit(1)
}
cred, _, err := uds.Current()
if err != nil {
panic(err)
}
// Connects to the server at socketAddr that must have the file uid/gid of
// our current user and one of the os.FileMode specified.
client, err := uds.NewClient(*addr, cred.UID.Int(), cred.GID.Int(), []os.FileMode{0770, 1770})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// client implements io.ReadWriteCloser and this will print to the screen
// whatever the server sends until the connection is closed.
io.Copy(os.Stdout, client)
}
The server will print out the socket is listening on and you can pass it to the client via --addr.
Bi-Directionaly Streaming with JSON
Package: github.com/johnsiilver/golib/tree/master/ipc/uds/highlevel/json/stream
Example: github.com/johnsiilver/golib/tree/master/ipc/uds/highlevel/json/stream/example
Note: We also support bi-directional proto streaming, works the same but with protos.
Note: I'm going to lose the boilerplate for this example forward. Full code is provided in links.
This example is a simple server where the client sends random words to the server and the server will return a set of 10 words in a summary JSON message after it has received them.
This is of course silly, but it shows Bi-Directional full duplex JSON streaming.
Here's the server:
udsServ, err := uds.NewServer(socketAddr, cred.UID.Int(), cred.GID.Int(), 0770)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("Listening on socket: ", socketAddr)
for conn := range udsServ.Conn() {
conn := conn
go func() {
// Cred checks
...
// This is a summary message of the last 10 words we have received. We will
// reuse this.
sum := messages.Sum{
Sum: make([]string, 0, 10),
}
// This wraps our conn in a stream client for reading/writing JSON.
streamer, err := stream.New(conn)
if err != nil {
log.Println(err)
conn.Close()
return
}
// Receive 10 words from the client and then send back a list of the last 10
// we got on this conn.
for {
m := messages.Word{}
// Receive a JSON message from the stream.
if err := streamer.Read(&m); err != nil {
if err != io.EOF {
log.Println(err)
}
conn.Close()
return
}
// Add the contained word to our summary.
sum.Sum = append(sum.Sum, m.Word)
// Sends back the sum if we have 10 words sent to us. We don't wait for
// the write, we immediately start waiting for our next values.
if len(sum.Sum) == 10 {
sendSum := sum
sum = messages.Sum{Sum: make([]string, 0, 10)}
go func() {
if err := streamer.Write(sendSum); err != nil {
conn.Close()
return
}
}()
}
}
}()
}
Here's the client:
// Connects to the server at socketAddr that must have the file uid/gid of
// our current user and one of the os.FileMode specified.
client, err := uds.NewClient(*addr, cred.UID.Int(), cred.GID.Int(), []os.FileMode{0770, 1770})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
streamer, err := stream.New(client)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
wg := sync.WaitGroup{}
wg.Add(2)
babbler := babble.NewBabbler()
// Client writes.
go func() {
defer wg.Done()
for {
m := messages.Word{Word: babbler.Babble()}
if err := streamer.Write(m); err != nil {
if err != io.EOF {
log.Println(err)
}
return
}
time.Sleep(500 * time.Millisecond)
}
}()
// Client reads.
go func() {
defer wg.Done()
sum := messages.Sum{}
for {
if err := streamer.Read(&sum); err != nil {
if err != io.EOF {
log.Println(err)
}
return
}
fmt.Printf("Sum message: %s\n", strings.Join(sum.Sum, " "))
}
}()
wg.Wait()
RPCs with Protocol Buffers
Package: github.com/johnsiilver/golib/tree/master/ipc/uds/highlevel/proto/rpc
Example: github.com/johnsiilver/golib/tree/master/ipc/uds/highlevel/proto/rpc/example
Note: We also support RPCs using JSON, works the same but with JSON.
This is a simple RPC mechanism. You call a RPC method along with a proto reprsenting a request. The server responds by calling an internal method, giving the request message a look over and responding with a protocol buffer.
Each call is syncronous, but you can make multiple calls at the same time. gRPC like.
Our example will be a server that prints out one of several famous quotes it has when a client queries it.
Server code:
serv, err := rpc.NewServer(socketAddr, cred.UID.Int(), cred.GID.Int(), 0770)
if err != nil {
panic(err)
}
fmt.Println("Listening on socket: ", socketAddr)
// We can reuse our requests to get better allocation performance.
reqPool := sync.Pool{
New: func() interface{} {
return &pb.QuoteReq{}
},
}
// We can reuse our responses to get better allocation performance.
respPool := sync.Pool{
New: func() interface{} {
return &pb.QuoteResp{}
},
}
// Register a method to handle calls for "quote". I did this inline, normally
// you do this in its own func block.
serv.RegisterMethod(
"quote",
func(ctx context.Context, req []byte) (resp []byte, err error) {
reqpb := reqPool.Get().(*pb.QuoteReq)
defer func() {
reqpb.Reset()
reqPool.Put(reqpb)
}()
// Get the request.
if err := proto.Unmarshal(req, reqpb); err != nil {
return nil, err
}
resppb := respPool.Get().(*pb.QuoteResp)
defer func() {
resppb.Reset()
respPool.Put(resppb)
}()
resppb.Quote = quotes[rand.Intn(len(quotes))]
return proto.Marshal(resppb)
},
)
// This blocks until the server stops.
serv.Start()
Client code:
// Connects to the server at socketAddr that must have the file uid/gid of
// our current user and one of the os.FileMode specified.
client, err := rpc.New(*addr, cred.UID.Int(), cred.GID.Int(), []os.FileMode{0770, 1770})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp := pb.QuoteResp{}
if err := client.Call(ctx, "quote", &pb.QuoteReq{}, &resp); err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("Quote: ", resp.Quote)
Benchmarks
So I certainly haven't benched everything. But I did bench the proto RPC, as it uses the chunking rpc package, which uses chunked streams.
I compared this to gRPC as it provides a managed RPC mechansim over unix sockets. gRPC was my chosen mechanism for doing IPC on unix sockets in the past.
No matter what my internal settings, I would beat gRPC at 10kB and double their performance in the 102 kB size. To get better performance in large sizes, I had to add some kernel buffer space over the defaults, which lead to close to double performance.
But the real killer here is allocations. This decimates gRPC in heap allocation reduction. Keep this in mind for high performance applications. If you delve deep into making Go fast, everything points at one thing: keeping your allocations down. Once you leave the micro-benchmark world (like these benchmarks), your app starts crawling if the GC has to deal with lots of objects. The key to that is buffer reuse and gRPC's design for ease of use hurts that ability and gRPC supports a lot more plugable behavior.
To be fair to gRPC on the speed, it is possible that you could make some buffer adjustments that would tune this. I didn't go looking.
Benchmark platform was OSX running on an 8-core Macbook Pro, Circa 2019. You can guess that your Threadripper Linux box will do much better.
Test Results(uds):
==========================================================================
[Speed]
[16 Users][10000 Requests][1.0 kB Bytes] - min 100.729µs/sec, max 10.029661ms/sec, avg 348.084µs/sec, rps 45751.37
[16 Users][10000 Requests][10 kB Bytes] - min 282.04µs/sec, max 8.067866ms/sec, avg 685.19µs/sec, rps 23269.75
[16 Users][10000 Requests][102 kB Bytes] - min 1.512654ms/sec, max 12.380839ms/sec, avg 2.528536ms/sec, rps 6314.61
[16 Users][10000 Requests][1.0 MB Bytes] - min 9.33996ms/sec, max 65.578487ms/sec, avg 20.282241ms/sec, rps 788.33
[Allocs]
[10000 Requests][1.0 kB Bytes] - allocs 330,858
[10000 Requests][10 kB Bytes] - allocs 354,272
[10000 Requests][102 kB Bytes] - allocs 415,754
[10000 Requests][1.0 MB Bytes] - allocs 523,738
Test Results(grpc):
==========================================================================
[Speed]
[16 Users][10000 Requests][1.0 kB Bytes] - min 59.624µs/sec, max 3.571806ms/sec, avg 305.171µs/sec, rps 51137.15
[16 Users][10000 Requests][10 kB Bytes] - min 93.19µs/sec, max 2.397846ms/sec, avg 875.864µs/sec, rps 18216.72
[16 Users][10000 Requests][102 kB Bytes] - min 1.221068ms/sec, max 8.495421ms/sec, avg 4.434272ms/sec, rps 3606.63
[16 Users][10000 Requests][1.0 MB Bytes] - min 21.448849ms/sec, max 54.920306ms/sec, avg 34.307985ms/sec, rps 466.28
[Allocs]
[10000 Requests][1.0 kB Bytes] - allocs 1,505,165
[10000 Requests][10 kB Bytes] - allocs 1,681,061
[10000 Requests][102 kB Bytes] - allocs 1,947,529
[10000 Requests][1.0 MB Bytes] - allocs 3,250,309
Benchmark Guide:
[# Users] = number of simultaneous clients
[# Requests] = the total number of requests we sent
[# Bytes] = the size of our input and output requests
min = the minimum seen RTT
max = the maximum seen RTT
av = the average seen RTT
rps = requests per second