gRPC streaming is the right answer for naturally-streaming data. Used wrong, it leaks goroutines and buffers. This post is the working set for Go gRPC streaming in production.
Server streaming
rpc StreamLogs(LogRequest) returns (stream LogLine);
func (s *server) StreamLogs(req *pb.LogRequest, stream pb.LogService_StreamLogsServer) error {
sub, err := s.broker.Subscribe(stream.Context(), req.Topic)
if err != nil { return err }
defer sub.Unsubscribe()
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case msg := <-sub.Messages():
if err := stream.Send(&pb.LogLine{Body: msg.Body}); err != nil {
return err
}
}
}
}
stream.Send returns error on client disconnect; the loop exits. defer sub.Unsubscribe ensures cleanup.
Client streaming
rpc UploadChunks(stream Chunk) returns (UploadResponse);
func (s *server) UploadChunks(stream pb.UploadService_UploadChunksServer) error {
var size int64
h := sha256.New()
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadResponse{
Bytes: size, Sha256: hex.EncodeToString(h.Sum(nil)),
})
}
if err != nil { return err }
size += int64(len(chunk.Data))
h.Write(chunk.Data)
}
}
io.EOF from Recv = client done sending; respond with SendAndClose.
Bidi streaming
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
func (s *server) Chat(stream pb.Chat_ChatServer) error {
g, ctx := errgroup.WithContext(stream.Context())
incoming := make(chan *pb.ChatMessage, 16)
// Reader
g.Go(func() error {
defer close(incoming)
for {
msg, err := stream.Recv()
if err == io.EOF { return nil }
if err != nil { return err }
select {
case incoming <- msg:
case <-ctx.Done(): return ctx.Err()
}
}
})
// Writer + processor
g.Go(func() error {
for msg := range incoming {
reply, err := s.processChat(ctx, msg)
if err != nil { return err }
if err := stream.Send(reply); err != nil { return err }
}
return nil
})
return g.Wait()
}
Two goroutines: one reads, one writes. errgroup propagates errors. Buffered channel for backpressure between them.
Cancellation
stream.Context() is cancelled when:
- Client disconnects.
- Deadline expires.
- Either side cancels.
Always select on <-ctx.Done() in your loops:
select {
case <-ctx.Done(): return ctx.Err()
case msg := <-source:
// ...
}
Without this: goroutine leaks when client disconnects.
Backpressure
Send blocks when the peer’s receive buffer is full (HTTP/2 flow control). You don’t need to manage buffers:
// This naturally backpressures — Send blocks if client is slow
for msg := range source {
if err := stream.Send(msg); err != nil { return err }
}
The bug to avoid:
// BAD: unbounded buffer
go func() {
for msg := range source {
bigBuffer = append(bigBuffer, msg) // OOM if client is slow
}
}()
Don’t buffer in your own goroutine. Let Send block.
Error handling
import "google.golang.org/grpc/status"
import "google.golang.org/grpc/codes"
if err != nil {
return status.Errorf(codes.Internal, "process failed: %v", err)
}
Use gRPC codes. Client gets code.Internal and message; can act accordingly.
For stream errors mid-stream, the client sees them via Recv returning the error.
Client side
stream, err := client.StreamLogs(ctx, &pb.LogRequest{Topic: "events"})
if err != nil { return err }
for {
msg, err := stream.Recv()
if err == io.EOF { return nil }
if err != nil { return err }
process(msg)
}
Same pattern: loop on Recv until EOF or error.
For client streaming:
stream, err := client.UploadChunks(ctx)
if err != nil { return err }
for _, chunk := range chunks {
if err := stream.Send(&pb.Chunk{Data: chunk}); err != nil { return err }
}
resp, err := stream.CloseAndRecv()
if err != nil { return err }
CloseAndRecv: tell server we’re done; get response.
Bidi client
stream, err := client.Chat(ctx)
if err != nil { return err }
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
for msg := range outgoing {
if err := stream.Send(msg); err != nil { return err }
}
return stream.CloseSend()
})
g.Go(func() error {
for {
reply, err := stream.Recv()
if err == io.EOF { return nil }
if err != nil { return err }
process(reply)
}
})
return g.Wait()
Symmetric to server.
Connection management
For long-lived streams, set keepalive:
import "google.golang.org/grpc/keepalive"
server := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
)
Without keepalive: TCP connections silently die behind LBs / NAT. Streams break.
Common mistakes
1. Goroutines without context check
go func() { for { ... } }() — runs forever after stream ends. Always check stream.Context().
2. Buffering in own goroutine
OOM under slow consumers. Let Send block.
3. No error from Recv
msg, _ := stream.Recv() // BAD
Recv returns errors that matter (EOF, network, server error). Always check.
4. Sending nil
stream.Send(nil) // panic
Validate; or set required fields explicitly.
5. Forgetting CloseSend on client streaming
Server waits forever for more data. Client must CloseSend (or CloseAndRecv) when done.
When NOT to use streaming
- Single response: unary is simpler.
- Small bounded list: unary with full response.
- Need browser support without proxy: use Connect, not pure gRPC streaming.
What I’d ship today
For Go gRPC services:
- Server streaming for log/event tailing.
- Client streaming for chunked uploads.
- Bidi for chat / collaborative.
- errgroup for cleanup discipline.
- Keepalive for long-lived streams.
- OTEL interceptors for tracing.
- Connect protocol for browser clients.
Read this next
If you want my Go gRPC streaming reference (server / client / bidi templates), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .