【データ準備入門】Cloud Functionsを用いたデータ準備・作成

データ準備・作成

なぜCloud Functionsが必要か

 データinputにおいてデータ連携を作成していく中で、とり得る手段の一つにCloudFunctions等を用いたプログラムによるデータ準備・作成があります。基本的にサービスDBなどから分析環境へ連携を行う場合はパイプラインで問題ありませんが、それでは対応できないような個別性の高い連携において必要になってきます。そういった背景からパイプラインなどと比較して利用頻度が少ない場合が多く、それによって費用面などからサーバレスとの相性が良い事が多いです。

ユースケース

 パイプラインで対応できない個別性の高いユースケースとしては以下のようなケースが考えられます。

  • 外部APIなどを用いたデータ取得(例:日本郵便株式会社のサイトから郵便番号を取得する)
  • クローリングなどデータ取得作業が必要
  • BQに直接データ連携ができず、前処理が必要となる場合(例:個人名が含まれており、社内規約でクラウドDBへの連携が認められていない)

プログラムで実装するため対応できるユースケースはとても多いですが、自由度が高いため管理が難しく乱立すると運用コストが上がっていまします。今回はそういった点に注意しつつ、クローリングデータをBigQueryに入れる場合を想定したサンプルをいくつか載せようと思います。

クローリングでデータを準備するケース

 実際にCloudFunctionsでクローリングを行う際は要件定義が必要になりますが、今回はサンプルということなので共通的に発生する注意点についてあげて、具体的なコードについて記載したいと思います。実際に利用する際は個別の要件に合わせて、カスタマイズして利用してください。

注意点1:個別性が高く技術負債になりやすい

 まず最初の注意点がユースケースの章においてもあげた自由度の高さがあります。例えばデータ連携を行うためにCloudFunctionsをコンソールから作成し、コードを書き始めるとすぐ連携を行うことができますが、「コード管理されていない・作成背景や要件が不明・バージョン管理できない…」など様々な面で技術負債化が進んでします。そのためGitでコード管理を行いつつIaCやCI/CDによる管理を行う必要があります。またその際に運用側ので最低限必要となる通知やリトライの実装も忘れないように行いましょう。

注意点2:DBによっては同時接続数を考慮する

 2つ目は接続先DBによってはDB連携を行う際に同時接続数やSQL同時実行数に気をつけて行う必要があるという点です。今回BigQueryへの接続となるため上限はが気になることは少ないですが、例えばAmazonRDSであれば設定によって低い同時接続数が定められている場合があります。他の連携への影響も及ぼしかねないため、同時実行を行ってスケールできるようにする場合はCloudFunctions自体の同時実行数に気をつけるようにしてください。

注意点3:障害時などを考慮し汎用性をもたせておく

 注意点1にある通り個別性が高くなりやすいため、ちょっとした変更などで改修が必要になると運用コストが高くなってしまいます。例えばクローリングの開始時間や対象ページのページ数、ログインユーザー名などが変わるたびにCloudFunctionsが必要だと、少しの変更でも大きく工数がかかってしまいます。そのためキック時に渡す環境変数で切り替えれるように設計したりある程度汎用性をもたせて作る必要があります。

実際のコード例

 今回サンプルで作成したものは以下のような構成で作成いたしました。CloudFunctionsにデプロイするコードはzipで圧縮しておく必要があるため、source配下にまとめています。

├── cloud_function.tf                # Cloud Functionsのterraformコード
├── source                           # Cloud Functionsで実行するコードを格納するフォルダ
│   ├── bin
│   │   ├── chromedriver             # クローリングで利用するドライバー
│   │   └── headless-chromium
│   ├── lib
│   │   ├── BigQuery.py              # BQ接続用ライブラリ
│   │   └── ChromeSelenium.py        # selenium利用用ライブラリ
│   ├── main.py                      # メインで実行する関数
│   └── requirements.txt
└── variable.tf

 cloud_function.tfは以下のようなものを準備しています。archive_fileを用いているため圧縮処理を別途書く必要がなく、またfilemd5を用いることでファイルの中身が変わった際に、再度CloudFunctionsがデプロイシノされるようにしています。そしてmax_instance_countを設定することで同時に大量のCloudFunctionが実行されないようにすると同時に、pubsubを用いてqueueに実行予定を貯めて行けるようになっています。

data "google_storage_bucket" "bucket" {
  name     = "XXXXXXXXXXXXXXXX"
}

data "archive_file" "function_archive" {
  type        = "zip"
  source_dir  = "${path.module}/source"
  output_path = "${path.module}/source/cloud_function.zip"
}

resource "google_storage_bucket_object" "archive" {
  name   = "cloud_function_${filemd5(data.archive_file.function_archive.output_path)}.zip"
  bucket = data.google_storage_bucket.bucket.name
  source = data.archive_file.function_archive.output_path
}

resource "google_cloudfunctions2_function" "function" {
  name        = replace(var.name, "_", "-")
  location    = "asia-northeast1"
  description = var.name

  build_config {
    runtime = "python39"
    entry_point = "start"  # Set the entry point 
    environment_variables = {
      PROJECT_ID = "XXXXXXXXXXXXXXXX"
      DATASET_ID = google_bigquery_dataset.XXXXXXXXXXXXXXXX.dataset_id
      TABLE_ID = google_bigquery_table.XXXXXXXXXXXXXXXX.table_id
    }
    source {
      storage_source {
        bucket = data.google_storage_bucket.bucket.name
        object = google_storage_bucket_object.archive.name
      }
    }
  }

  service_config {
    max_instance_count  = 1
    available_memory    = "1024M"
    timeout_seconds     = 540
  }

  labels = {
    zip_md5 = filemd5(data.archive_file.function_archive.output_path)
  }

  event_trigger {
    trigger_region = "us-central1"
    event_type     = "google.cloud.pubsub.topic.v1.messagePublished"
    pubsub_topic   = google_pubsub_topic.topic.id
    retry_policy   = "RETRY_POLICY_RETRY"
  }
}

resource "google_pubsub_topic" "topic" {
  name = var.name
}

 ChromeSelenium.pyは以下のようなコードにしてheadlessで実行できるようにしています。headlessを設定することで画面のないCloudFunctionsでもchromeドライバを用いたクローリングを行うことができます。

import os

from fake_useragent import UserAgent
from selenium import webdriver

import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

class ChromeSelenium:

    def __init__(self):
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        ...
        options.add_argument('user-agent='+UserAgent().random)
        self.driver = webdriver.Chrome(options=options)

 今回はCloudBuildにてデプロイを行っているため、terraformを実行する前にpythonのライブラリをインストールしておく必要があります。

substitutions:
  _TERRAFORM_VERSION_: 1.1.9
steps:

  - id: 'build functions'
    name: python
    entrypoint: pip
    args: ["install", "-r", "./source/requirements.txt", "-t", "./source/"]
  ...

timeout: 3600s

まとめ

 今回CloudFunctionsを用いてクローリングデータをBigQueryに連携するケースを想定し、注意点と共にいくつかのサンプルコードを記載させていただきました。サンプルで作成したものはCloudFunctionsをコンソール上で作成するよりも開発に時間がかかりましたが、インフラ含めすべてコードで管理できており、技術負債が残りにくい設計となっています。デプロイを自動化することで今後発生する可能性の高い画面変更によるクローリングコードの変更にも対応しやすくなっています。

 弊社ではパイプラインに限らず柔軟なデータinputのカタチに対応していますので、もし興味がある方は是非お問い合わせからご連絡いただけますと幸いです。最後まで読んでいただき誠にありがとうございます。

本記事内容をご利用される前にご確認お願いします

  • SinkCapitalギルドメンバーが記載したギルド内部向け記事を一般公開したものです
  • 記載内容の情報鮮度や正確性に問題があった場合も、本記事の内容を利用したことによるあらゆる問題には一切責任を負いかねます
  • 要望や指摘、意見等あれば気軽にコメントいただけると幸いです
  • 弊社にご興味のあるかたは是非弊社HP相談フォームを御覧ください

コメント

タイトルとURLをコピーしました