Firestoreの書き込み速度の検証

結論

1つのcollectionに大量のdocumentを一括書き込みする際、500件のバッチ書き込みを20並列で実行したところ、以下の時間で完了した。

----------2022-05-11-10.11.13

Firestoreの書き込み制限

Firestoreは書き込み時にいくつかの制限があります。詳しくはhttps://firebase.google.com/docs/firestore/quotas にあります。

今回気になったのは4番目の制限です。ある条件下では1つのcollectionに対するdocumentの書き込みは 1秒間に500件が上限、という事です。

※2022/05/11 追記

制限について、以下のように変更されていました。「できなくは無いがパフォーマンスが悪くなるかも」という閾値のようです。

----------2022-05-11-10.14.04

あるシステムのパフォーマンス調査をしていて、この上限のせいで遅くなってるかも?という話になりました。書き込み速度について調べていると以下の記事を見つけました。

Firestoreに負荷試験(Loadroid)してみた+補足 - Qiita
こちらの記事は,Firebase.yebisu #1のLTに補足解説を加えたものになります.発表スライド当日お話できなかった,実施する際の注意などもありますので,ごさしゅーください. tl;dr 高頻度のドキュメント作成(自...

この記事では10秒間で70,000件のdocumentの登録に成功されたようです。負荷を少しずつ上げていけば 500/s の制限を超えられるみたいです。少しずつ負荷を上げるとスケールできることは以下に書いてあります。

Cloud Firestore のベスト プラクティス | Firebase

どのような条件の時に 500/s の制限に引っかかるのか気になったので試してみることにしました。

環境

  • Google Compute Engine、e2-medium (2 vCPU、4GB メモリ)
  • Ubuntu 20.04 LTS
  • Docker 19.03.13
  • Golang 1.14.9
  • cloud.google.com/go/firestore 1.3.0

負荷を徐々に上げてみる

上の記事を参考に、以下のような処理を書いて試してみました。

500件のバッチ書き込みを実行
=> 500件のバッチ書き込みを2並列で実行
=> 500件のバッチ書き込みを4並列で実行
=> 500件のバッチ書き込みを8並列で実行
=> (10,000件まで繰り返す)

コード全体は以下になります。3つの変数(一度のバッチ書き込みの件数、登録件数、初回書き込み時の並列数)を変えられるように実装しました。

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
"cloud.google.com/go/firestore"
)
var (
size = flag.Int("size", 0, "chunk size")
total = flag.Int("total", 0, "total count")
multi = flag.Int("multi", 0, "multi")
)
func main() {
flag.Parse()
ctx := context.Background()
client, err := firestore.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
log.Fatal(err)
}
data := generateDocumentData(client)
fmt.Println("create")
exec(ctx, client, data, create)
fmt.Println("update")
exec(ctx, client, data, update)
// exec(ctx, client, data, updateIncrement)
// exec(ctx, client, data, updateArrayUnion)
// exec(ctx, client, data, updateServerTimestamp)
fmt.Println("read")
read(ctx, client.Collection("articles"))
// fmt.Println("delete")
// exec(ctx, client, data, delete)
}
func panicIf(err error) {
if err != nil {
panic(err)
}
}
func generateDocumentData(client *firestore.Client) []map[*firestore.DocumentRef]map[string]interface{} {
limit := *size
totalCount := *total
count := totalCount/limit + 1
data := make([]map[*firestore.DocumentRef]map[string]interface{}, 0, count)
col := client.Collection("articles")
for j := 0; j < count; j++ {
min := limit * j
max := limit * (j + 1)
if max > totalCount {
max = totalCount
}
if min == max {
break
}
results := make(map[*firestore.DocumentRef]map[string]interface{}, limit)
for i := min; i < max; i++ {
doc := col.NewDoc()
results[doc] = map[string]interface{}{
"title": "title_" + strconv.FormatInt(int64(i), 10),
"tags": []string{"tag_1", "tag_2", "tag_3"},
"author_id": "author_id_" + strconv.FormatInt(int64(i), 10),
"view_count": i,
"is_published": true,
"text": "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789",
// "update_timestamp": firestore.ServerTimestamp,
}
}
data = append(data, results)
}
return data
}
func exec(ctx context.Context, client *firestore.Client, data []map[*firestore.DocumentRef]map[string]interface{},
execFunc func(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{})) {
col := client.Collection("articles")
fmt.Println("start")
start := time.Now()
multi := *multi
dataLen := len(data)
current := 0
for {
wg := &sync.WaitGroup{}
for i := 0; i < multi; i++ {
articles := data[current]
wg.Add(1)
go func() {
execFunc(ctx, client, col, articles)
wg.Done()
}()
current++
if dataLen == current {
break
}
}
wg.Wait()
if dataLen == current {
break
}
multi *= 2
}
fmt.Println(time.Now().Sub(start))
fmt.Println("end")
last, err := col.OrderBy("view_count", firestore.Desc).Limit(1).Documents(ctx).GetAll()
panicIf(err)
if len(last) > 0 {
fmt.Println(last[0].Data()["view_count"])
}
}
func create(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for doc, article := range articles {
batch.Create(doc, article)
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func update(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for col, article := range articles {
batch.Update(col, []firestore.Update{
{Path: "title", Value: article["title"].(string) + "_update"},
{Path: "view_count", Value: article["view_count"].(int) + 10000},
})
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func updateIncrement(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for col := range articles {
batch.Update(col, []firestore.Update{
{Path: "view_count", Value: firestore.Increment(10000)},
})
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func updateArrayUnion(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for col := range articles {
batch.Update(col, []firestore.Update{
{Path: "tags", Value: firestore.ArrayUnion("tag_3", "tag_4")},
})
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func updateServerTimestamp(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for col := range articles {
batch.Update(col, []firestore.Update{
{Path: "update_timestamp", Value: firestore.ServerTimestamp},
})
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func delete(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{}) {
batch := client.Batch()
for col := range articles {
batch.Delete(col)
}
_, err := batch.Commit(ctx)
panicIf(err)
}
func read(ctx context.Context, col *firestore.CollectionRef) {
fmt.Println("start")
start := time.Now()
last, err := col.Where("view_count", ">=", 10000).Documents(ctx).GetAll()
fmt.Println(time.Now().Sub(start))
fmt.Println("end")
fmt.Println("count: ", len(last))
panicIf(err)
}

3つの変数のうち登録件数は10,000件で固定し、一度のバッチ書き込みの件数、初回書き込み時の並列数を適当に変えて時間を測ってみました。結果は以下になります。

----------2022-05-11-10.05.15

500/s を大きく超えて最大 9407/s 出てました。実はそんなに気にしなくても大丈夫なのでしょうか。連続で試行したので既に十分スケールしていたのかもしれません。

500/s に引っかかるのはどんな条件?

公式ドキュメントには条件として以下のように書かれています。

Maximum write rate to a collection in which documents contain sequential values in an indexed field

Increment 関数を使った更新が影響するのかと思い試しました。ついでにArrayUnion や ServerTimestamp も試してみました。登録件数は10,000件で固定です。結果は以下になります。

----------2022-05-11-10.05.58

殆ど変わりませんでした。逆にどうやったら引っかかるのでしょうか?

そもそも負荷を徐々にあげる必要があるのか?

色々試したくなってきたので、最初から負荷を高めで固定してみました。

func exec2(ctx context.Context, client *firestore.Client, data []map[*firestore.DocumentRef]map[string]interface{},
execFunc func(ctx context.Context, client *firestore.Client, col *firestore.CollectionRef, articles map[*firestore.DocumentRef]map[string]interface{})) {
col := client.Collection("articles")
fmt.Println("start")
start := time.Now()
multi := *multi
limit := make(chan struct{}, multi)
wg := &sync.WaitGroup{}
for _, articles := range data {
limit <- struct{}{}
wg.Add(1)
go func(articles map[*firestore.DocumentRef]map[string]interface{}) {
execFunc(ctx, client, col, articles)
<-limit
wg.Done()
}(articles)
}
wg.Wait()
fmt.Println(time.Now().Sub(start))
fmt.Println("end")
last, err := col.OrderBy("view_count", firestore.Desc).Limit(1).Documents(ctx).GetAll()
panicIf(err)
if len(last) > 0 {
fmt.Println(last[0].Data()["view_count"])
}
}

単純な並列化に変更しました。こちらも登録件数は10,000件で固定しています。結果は以下になります。

----------2022-05-11-10.07.12

さらに速くなりました。本当に計測方法が合っているのか不安になってきました。一応更新後に件数を確認し、更新されていることを確認しました。こちらも連続で試行していたので、既に十分スケールしていたのかもしれません。

以後の計測は負荷を徐々に上げず、単純な並列化で試しています。

削除の速度も測りました。あまり重視していないので1条件だけです。(最初から測定しておけばよかった)

----------2022-05-11-10.08.29

更新より少し遅いくらいでした。

最大何件まで成功するか

登録件数を変えていくつか試してみました。効果があるか分かりませんが、1条件ごとに2,3時間おいて試しました。結果は以下になります。

----------2022-05-11-10.09.36

40,000件は成功、50,000件は登録中にエラーとなりました。エラーが起きた後に確認しましたが、49,000件くらいまでは登録できていました。

リクエスト数を2倍にしてみる

最後にインスタンスをもう一台立て、2つのインスタンスでほぼ同時に実行してみました。

----------2022-05-11-10.10.16

40,000件 * 2 までは遅くなっているものの成功、50,000件 * 2 の更新で両方エラーになりました。思ったよりだいぶ耐えました。

まとめ

予想以上に書き込み可能件数が多かったです。自分で調べておいて何ですが合っているか不安なので、どこか間違ってたら教えていただけると非常に助かります。