koukiblog

たぶんweb系の話題

GCPのPubSubをGoでsubscribeするときにgraceful shutdownのようなことをする

GCPのPubSubのTopicをGoでsubscribeするとき、ライブラリ( https://github.com/googleapis/google-cloud-go ) を使えば下記のように簡単に記述できます

client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
    log.Fatal(err)
}

sub := client.Subscription("subscription1")
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
    fmt.Println(m.Data) // process message 
    m.Ack() // Acknowledge that we've consumed the message.
})
if err != nil {
    log.Println(err)
}

しかしこれではTopicのメッセージを処理中に、コンテナが停止するなど何らかの理由でサーバーが停止するときに処理は行なっているもののAckを返していない、という状態になってしまいます。

シグナルハンドリングを行い、停止までに一定の猶予を設けることで、webサーバでよくあるgraceful shutdownのような挙動にするには、こんな感じにします

cctx, cancel := context.WithCancel(ctx)
sub := client.Subscription("subscription1")
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
  fmt.Println(m.Data) // process message 
  m.Ack() // Acknowledge that we've consumed the message.
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
// Start shutdown process
cancel()
// Waiting pubsub receive shutdown 2 seconds..
time.Sleep(2000 * time.Millisecond)

Receive関数渡したcontextを、SIGINTをハンドリング待ってから、cancelし、その完了を一定期間待つことで実現できます。この例では2秒待っていますがどのぐらい待つのかは処理内容によって考える必要があります。 メッセージの処理に時間かかり、処理中に設定したタイムアウト期間を越えることが想定される場合は、Receive関数に渡す無名関数内で必要に応じてcontext.Doneを確認するとよいと思います(あまりないと思いますが)