領収書の一括発行機能をGoのイテレータできれいに実装できた

こんにちは、ソフトウェア開発統括部の伊藤です。 フルスタックエンジニアとしてアプリとバックエンドそれぞれのチームに参加して開発をしています。

タクシーアプリ『GO』では、2025年12月のアップデートで領収書の発行機能を大きくアップデートしました。 確定申告の時期だったということもあり、多くの方にご利用いただいておりますが、その裏側にはいくつかの技術的ハードルがありました。

今回、領収書の発行機能をアップデートするにあたって、Go 1.23で導入されたイテレータを活用することで複雑な処理を効率的に記述できたので紹介したいと思います。

領収書の発行機能アップデート

今回の領収書の発行機能アップデートにあたり、重要視されたポイントの1つに、一度に大量の領収書を発行するニーズを満たすということがありました。

これまでの『GO』アプリでは、発行したい履歴を一つずつ個別に選択する必要があり、選択可能な上限数も十分ではありませんでした。そのため、たくさんご利用いただいている方や、確定申告で大量の発行が必要な方にとって、作業に手間がかかる点が課題となっていました。

今回のアップデートにより、1ヶ月単位でまとめて一括選択したり、絞り込み結果を元に一括選択したり、一括選択した履歴を一部対象外にするといった細かい操作をすることができるようになりました。また、絞り込み条件から最大1年分の領収書を直接一括発行することもできるようになっています。

発行処理の概要

領収書発行はこれまで、アプリからのリクエスト時に、領収書生成サービスへリクエストしてPDFを生成、メール送信までをアプリからの処理に対して同期的に行っていました。

今回のアップデートでは、まずこれをCloud Tasksを使用してバックグラウンド処理化しました。

バックグラウンド処理は大きく分けて「対象特定」「発行」「集約・送信」の3ステップに分けて実行しています。

図1: 発行処理の全体フロー

  • 対象特定: 発行する領収書のIDを特定する。発行するIDを振り分け、その後の発行ステップを一定バッチごとに複数立ち上げる。
  • 発行: 領収書生成サービスを呼び出し、PDFを生成し一時保存する。処理量に応じて複数起動する。
  • 集約・送信: 発行ステップで発行された領収書を集約し、添付ファイルやダウンロードページ用のZIPファイルを作成し、メール送信する。

領収書はPDFを添付ファイルにしたメール送信を基本としていますが、メールのサイズが一定量を超えるとメール容量の上限にあたる場合があります。 そのため、一度に一定数以上の領収書が作成される場合は発行された領収書をZIPにまとめて、ダウンロードページからダウンロードしていただく仕組みも用意しました。

月末・月初や確定申告時期など、領収書発行の負荷が高まる時期に備え、Cloud Tasksキューは専用のものを用意し同時実行数制限を設定することで、負荷が高まりすぎないように制御しています。

対象特定での課題

まず今回問題になったのは、領収書発行対象となる履歴のIDを特定することです。 『GO』アプリでは、通常のタクシー注文のほかに『GOエコノミー』やEXCLUSIVE MEMBERSHIPといった異なる系統の履歴をまとめて1つの履歴として集約しています。 この処理自体はUNIONを使って各系統のテーブルを1つのクエリにまとめることで一括取得を実現しています。

ただ、『GO』アプリのバックエンドではO/Rマッパーが使用されていて、複数行の結果が返る時は配列として一括して読み込まれます。1年分などを指定して実行した場合、1年分のIDが一括で読み込まれてしまい、メモリが不足することが容易に想像できました。

そのため、逐次読み込みする仕組みを作るため、当初以下のようなシグネチャのメソッドを作りました。

// メソッドシグネチャ
RawQueryInBatches(model any, batchSize int, batchFn func(models any) error, query string, args ...any) error

// こういう書き味
err := repo.RawQueryInBatches(result, 10, func(models any) error {
    items := models.([]*model.Item)

    // ここでitemsを使って処理をする
 
    return nil
}, query, param1, param2 /* ... */)

これを書きながら「悪くはないんだけど何か違和感あるなー」と感じていました。元のRawQueryがRawQuery(models any, query string, args ...any) errorであることから、大きいブロックの後にqueryやparamsが来るのが違和感の主な要因でしょうか。 このメソッドのPull Requestを出した時に「明日、Goを1.24にアップデートするのでイテレータが使えるようになって、ちょっとシュッとかけるかも」とアドバイスをもらいました。なんとタイムリーな。

Goのイテレータ

Goのイテレータ(range over func)は、簡単にいうと for item := range ... のrangeの部分にfuncを指定できるようになったものです。

range の部分には iter.Seq[V any] iter.Seq2[K, V any] のシグネチャをもつfuncを使用することができます(以後この記事では、iter.Seqを返す、rangeに指定できるfuncを、iter関数と呼ぶことにします)。

  • iter関数は、yieldと呼ばれるfuncを引数に持つfuncを戻り値にします。
  • iter関数内からデータが揃った時にyieldを呼ぶことで、forブロック内の処理に値を渡すことができます。
  • forブロック内でbreakやreturnなどでループが中断された場合は、yieldの戻り値がfalseになります。この戻り値を元にiter関数側の処理を中断させることができます。

これにより、大量のデータをストリーミングする処理を書きやすくできます。

// メソッド定義
func SampleItems() iter.Seq[int] {
    return func(yield func(int) bool) {
        if !yield(1) {
            fmt.Printf("break in 1\n")
            return
        }
        if !yield(2) {
            fmt.Printf("break in 2\n")
            return
        }
        if !yield(3){
            fmt.Printf("break in 3\n")
            return
        }
    }
}

// 記述例
for item := range SampleItems() {
    fmt.Printf("Item: %d\n", item)
    if item == 2 {
        break // iter関数で呼び出したyieldがfalseを返すので、後続処理を停止することができる
    }
}

/**
実行結果:

Item: 1
Item: 2
break in 2
*/

今回は、iter.Seq2[K, V any] の第2引数をerrorにすることで、ループ処理中のエラーをrangeの中で受け取ることができそうだなと思い、一旦そのように使うようにしています。

対象特定でのイテレータ

早速、別ブランチで進んでいたGo 1.24アップデートを作業ブランチに取り込んで、以下のようなシグネチャに書き換えてみました。

// メソッドシグネチャ
RawQueryInBatches(model any, batchSize int, query string, args ...any) iter.Seq2[any, error]

// こういう書き味
for batch, err := range repo.RawQueryInBatches(&model.Item{}, batchSize, query, param1, param2) {
    if err != nil {
        return err
    }
 
    items := batch.([]*model.Item)

    // ここでitemsを使って処理をする
}

元のRawQueryのシグネチャと一致したこともあり、だいぶすっきりしたように感じます。

ダウンロード用ZIPファイルの作成

イテレータの強力さをより強く感じたのはダウンロード用ZIPファイルの作成時でした。

図1: 発行処理の全体フローで説明した通り、「発行」ステップでは、領収書生成サービスにリクエストして領収書のPDFを作成します。一度のリクエストで複数の領収書を発行でき、tarで集約されて保存されます。一度に発行できる上限数は以前の同期実行の時から変更されていないので、大量に領収書を発行する場合は処理を分割して並列で発行処理を実行しています。

「集約・送信」ステップには、一定以上の領収書が一度に発行された場合にダウンロードページを作成するため、複数の「発行」ステップで作られたPDFを、1つのZIPファイルにまとめる処理があります。

ここで問題になるのは、tarファイルからPDFファイルを読み取りながら、zipファイルをS3に保存する処理です。 簡単に作るなら、一度tarファイルをローカルに保存して展開し、それをzipファイルにまとめ直す方法がとれますが、この方法では実行環境に一時保存のためのストレージが必要となります。 今回は取り扱うファイル数やサイズが大きく、一時保存のストレージを用意するのはコストパフォーマンス観点で難しいので、S3から読み込んだものをそのまま書き込んでいく逐次処理で実現することにしました。

この処理を愚直に書くとネストが深くなりがちですが、ここでイテレータを適用することで最終的に以下のような形ですっきり記述することができるようになりました。

// 書き込み先のZIPファイルを開いた上で
err := i.StreamWritePDFFilesToZip(ctx, zipFileKey, func(writeFile func(file PDFFile) error) error {
    // 読み込みするtarファイルのリストからPDFを順次読み込むイテレータで順次取得して、書き込んでいく
    for pdfFile, err := range i.StreamPDFFilesFromStorage(ctx, tarFiles) {
        if err != nil {
            return fmt.Errorf("failed to stream PDF files from storage: %w", err)
        }

        if err := writeFile(pdfFile); err != nil {
            return fmt.Errorf("failed to write PDF file to zip: %w", err)
        }
    }
 
    return nil
})

読み込み処理:iter関数を使ったtarファイル内からのPDFファイルの列挙

各メソッドの処理の詳細を見ていきましょう。

まずは読み込みの部分です。PDFはtarファイルとしてまとまっているので、S3のGetObjectの戻り値のBodyをそのままtar.Readerに渡して、直接展開しながらダウンロードすることができます。ファイルを1つ得るごとにイテレータをyieldしていきます。

// StreamPDFFilesFromStorage は、指定されたkeysにある複数のtarファイルに入っているPDFファイルを順次返すイテレータを返す
func (i *ReceiptInteractor) StreamPDFFilesFromStorage(ctx context.Context, keys []string) iter.Seq2[PDFFile, error] {
    return func(yield func(PDFFile, error) bool) {
        for _, key := range keys {
            // 取得するファイルの情報
            input := &s3.GetObjectInput{
                Bucket: aws.String(i.s3Bucket),
                Key:    aws.String(key),
            }

            // S3のオブジェクト取得リクエスト、内容の読み取りはdata.Bodyでio.ReadCloserからストリームできる
            data, err := i.client.GetObjectWithContext(ctx, input)
            if err != nil {
                yield(nil, fmt.Errorf("failed to download tar file from s3: %w", err))
                return
            }
         
            // tarからPDFを読み込んで、yieldに流していく(falseが返ったら中断する)
            if !i. yieldPDFsFromTar(data.Body, yield) {
                return
            }
        }
    }
}

// yieldPDFsFromTar は、tarファイルのストリームから読み取ったPDFをyieldに流す。渡されたio.ReadCloserは処理が終わったら閉じる。中断するときはfalseを返す。
func (i *ReceiptInteractor) yieldPDFsFromTar(body io.ReadCloser, yield func(PDFFile, error) bool) bool {
    defer body.Close()

    // tar.Readerを作成、中からファイルを列挙する
    tarReader := tar.NewReader(body)
    for {
        // 次のファイルのヘッダを読み取り。ファイル終端になってたらループ終了する。
        header, err := tarReader.Next()
        if err == io.EOF {
            // End of archive
            break
        }
        if err != nil {
            yield(nil, fmt.Errorf("failed to read header from tar: %w", err))
            return false
        }
     
        // 今回はシステム間連携でファイル形式がPDFとしてわかっているので省略されているが、PDF以外のファイルが入る余地があるのであればここでチェックの必要がある

        // ファイル内容を読み取る
        b, err := io.ReadAll(tarReader)
        if err != nil {
            yield(nil, fmt.Errorf("failed to read file from tar: %w", err))
            return false
        }

        // 得られたファイル内容をyieldに渡す
        if !yield(PDFFile{
            FileName: header.Name,
            Bytes: b,
        }, nil) {
            return false
        }
    }

    return true
}

書き込み処理: S3へのマルチパートアップロード

次に書き込みです。S3へのマルチパートアップロードを処理するストリームにzip.Writerを被せて、渡されたファイルを順次書き込んでいきます。

// StreamWritePDFFilesToZip は、指定されたfileKeyにZIPファイルを直接書き込みます。
func (i *ReceiptInteractor) StreamWritePDFFilesToZip(ctx context.Context, fileKey string, writeFile func(func(file PDFFile) error) error) error {
    // マルチパートアップロードを開始
    return i.MultipartUpload(ctx, &s3.CreateMultipartUploadInput{
        Bucket:      aws.String(i.s3Bucket),
        Key:         aws.String(fileKey),
        ContentType: aws.String("application/zip"),
    }, func(writer io.Writer) error {
        // マルチパートアップロードのwriterに、zip.Writerを被せてZIPを書き込めるようにする
        zipWriter := zip.NewWriter(writer)
        defer zipWriter.Close()

        // ZIPファイルに保存するファイルのタイムスタンプを決めておく
        now := time.Now()

        // 引数で渡される処理関数を起動し、渡されたファイルを順次書き込んでいく
        err := writeFile(func(file PDFFile) error {
            // ZIPファイルのヘッダを書き込む
            header := &zip.FileHeader{
                Name:     filepath.Base(file.FileName),
                Method:   zip.Deflate,
                Modified: now,
            }
            f, err := zipWriter.CreateHeader(header)
            if err != nil {
                return fmt.Errorf("failed to create zip entry: %w", err)
            }

            // ファイルの内容を書き込み
            _, err = f.Write(file.Bytes)
            if err != nil {
                return fmt.Errorf("failed to write zip entry: %w", err)
            }
            return nil
        })
        if err != nil {
            return fmt.Errorf("failed to write receipt files to zip: %w", err)
        }

        return nil
    })
}

マルチパートアップロードの本体処理では、io.Pipeを用いて書き込まれた内容がそのままS3側に書き込むようにしています。1

func (i *ReceiptInteractor) MultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput, writerFunc func(writer io.Writer) error) error {
    const (
        // マルチパートアップロードの最小パートサイズは5MB
        minUploadPartSize = 5 * 1024 * 1024
    )

    destBucket := input.Bucket
    destKey := input.Key

    // S3にマルチパートアップロードの開始をリクエスト
    createOutput, err := i.client.CreateMultipartUploadWithContext(ctx, input)
    if err != nil {
        return fmt.Errorf("failed to create multipart upload: %w", err)
    }
    uploadId := createOutput.UploadId

    // 処理失敗時のクリーンアップ
    // 失敗時はマルチパートアップロードを中止をS3にリクエストする
    var writerErr error
    var uploadErr error
    defer func() {
        if err != nil || writerErr != nil || uploadErr != nil {
            cleanupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
            _, _ = i.client.AbortMultipartUploadWithContext(cleanupCtx, &s3.AbortMultipartUploadInput{
                Bucket:   destBucket,
                Key:      destKey,
                UploadId: uploadId,
            })
        }
    }()

    // io.Pipeでwriterの出力をS3に直接ストリーム
    // 呼び出し元からの書き込みストリームと、S3へ書き込みをする読み込みストリームの2つができるので、書き込みと読み込みそれぞれの処理をゴルーチンで並走させる
    reader, writer := io.Pipe()
    var wg sync.WaitGroup

    // 書き込みするゴルーチン
    // 関数で渡される書き込み処理を呼び出す
    wg.Add(1)
    go func() {
        defer func() {
            _ = writer.CloseWithError(writerErr)
            wg.Done()
        }()

        writerErr = writerFunc(writer)
    }()

    // S3にアップロードするゴルーチン
    var completedParts []*s3.CompletedPart
    var partNum int64 = 1

    wg.Add(1)
    go func() {
        defer func() {
            _ = reader.CloseWithError(uploadErr)
            wg.Done()
        }()

        buf := make([]byte, minUploadPartSize)

        for {
            // ストリーム末尾になるまで、1ループごとにマルチパートのサイズで読み込む
            n, err := io.ReadFull(reader, buf)
            if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) {
                uploadErr = fmt.Errorf("failed to read from pipe: %w", err)
                return
            }
            if n > 0 {
                // マルチパートのうち1つのパートをS3にアップロードする
                uploadPartOutput, err := i.client.UploadPartWithContext(ctx, &s3.UploadPartInput{
                    Bucket:     destBucket,
                    Key:        destKey,
                    UploadId:   uploadId,
                    PartNumber: aws.Int64(partNum),
                    Body:       bytes.NewReader(buf[:n]),
                })
                if err != nil {
                    uploadErr = fmt.Errorf("failed to upload part %d: %w", partNum, err)
                    return
                }
                completedParts = append(completedParts, &s3.CompletedPart{
                    PartNumber: aws.Int64(partNum),
                    ETag:       uploadPartOutput.ETag,
                })
                partNum++
            }
            if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
                break
            }
        }
    }()

    wg.Wait()

    if writerErr != nil && uploadErr != nil {
        return fmt.Errorf("writer/upload error: %w", errors.Join(writerErr, uploadErr))
    }
    if uploadErr != nil {
        return fmt.Errorf("upload error: %w", uploadErr)
    }
    if writerErr != nil {
        return fmt.Errorf("writer error: %w", writerErr)
    }

    // S3にマルチパートアップロードの完了処理をリクエストする
    _, err = i.client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
        Bucket:          destBucket,
        Key:             destKey,
        UploadId:        uploadId,
        MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
    })
    if err != nil {
        return fmt.Errorf("failed to complete multipart upload: %w", err)
    }

    return nil
}

こうすることで、複数のtarファイルに入っているPDFファイルを、1つのzipファイルに集約する処理をすっきりと書くことができました。

今後の改善ポイント

今回のコードではyieldPDFsFromTarにあるとおり、ファイル単位でio.ReadAllでバイト列として読み込んでおり、ファイル単位での逐次処理にはできています。ただ、PDFをそのままメモリに載せてしまうためその分のメモリが必要になります。 これは既存コードとの兼ね合いで今回はこのように実装しましたが、yieldする際にバイト列として読み込まずio.ReaderとしてPDFFileに載せることでより完全なストリーム処理とすることができると考えています。

おわりに

今回は『GO』アプリの領収書発行機能でGoのイテレータを活用した例をご紹介しました。特に2つめのZIPファイルへの集約の処理ではイテレータによってループ処理の記述改善をより強く感じることができました。


  1. 今回のコードでは書き込みと読み込みそれぞれの処理に対してゴルーチンを起動させて並走させています。しかし、書き込み部分だけゴルーチンを立ち上げ、結果をチャネルから受け取ることでsync.WaitGroupによる待ち合わせが不要になり、よりシンプルに書くことができます。