ラボの設定手順と要件
アカウントと進行状況を保護します。このラボを実行するには、常にシークレット ブラウジング ウィンドウとラボの認証情報を使用してください。

Cloud Composer 3 の概要

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 入門
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
このコンテンツはまだモバイル デバイス向けに最適化されていません。
快適にご利用いただくには、メールで送信されたリンクを使用して、デスクトップ パソコンでアクセスしてください。

概要

ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータ内で有益な情報を見つけるために使用されます。Google Cloud には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、広く利用されているオープンソース ワークフロー ツールである Apache Airflow のホスト型バージョンです。

このラボでは、Google Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルの存在を確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、クラスタを削除します。

演習内容

  • Google Cloud コンソールを使用して Cloud Composer 環境を作成する

  • Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する

  • 保存されたワードカウント ジョブの結果を表示する

設定と要件

ラボの設定

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

タスク 1. 環境のタスク

  1. Google Cloud コンソールのタイトルバーで、[Cloud Shell をアクティブにする] をクリックします。プロンプトが表示されたら、[続行] をクリックします。

  2. 次のコマンドを実行して、Compute デベロッパー サービス アカウントに Composer ワーカー ロールを割り当てます。

export PROJECT_ID=$(gcloud config get-value project) export PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)") gcloud projects add-iam-policy-binding {{{project_0.project_id}}} \ --member=serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/composer.worker
  1. 次のコマンドを実行して、プロジェクトで必要な API がスムーズに有効化されるようにします。
gcloud services disable composer.googleapis.com gcloud services disable artifactregistry.googleapis.com gcloud services disable container.googleapis.com gcloud services enable artifactregistry.googleapis.com gcloud services enable container.googleapis.com gcloud services enable composer.googleapis.com

タスク 2. Cloud Composer 環境を作成する

このセクションでは、Cloud Composer 環境を作成します。

注: 次に進む前に必ずここまでの手順をすべて完了し、必要な API を有効にしてください。これらが完了していないと、Cloud Composer 環境は作成できません。
  1. Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、[プロダクトとページ] セクションの [Composer] をクリックします。

  2. [環境を作成] をクリックして [Composer 3] を選択します。環境を以下のように設定します。

プロパティ
名前 highcpu
ロケーション
イメージのバージョン composer-3-airflow-n.n.n-build.n(注: 利用可能なイメージの中で最も大きい番号のイメージを選択)
サービス アカウント xxxxx-compute@developer.gserviceaccount.com
復元力モード 標準的な復元力
Airflow データベース ゾーン
  1. [環境リソース] で [] を選択します。

その他の設定はすべてデフォルトのままにします。

  1. [作成] をクリックします。

コンソールの [環境] ページで、環境の名前の左側に緑色のチェックマークが表示されていれば、環境の作成プロセスは完了しています。

設定が完了するまでには、15~30 分かかることがあります。その間にラボの作業を進めておいてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Composer 環境を作成する

Cloud Storage バケットを作成する

プロジェクトで Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力として使用されます。

  1. ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[+ 作成] をクリックします。

  2. ユニバーサルに一意の名前(プロジェクト ID など)をバケットに付けてから、[作成] をクリックします。[公開アクセスの防止] というメッセージが表示されたら、[確認] をクリックします。

この Cloud Storage バケット名は後ほど Airflow 変数として使用するため、メモしておいてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Storage バケットを作成する

タスク 3. Airflow と基本コンセプト

Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しておきましょう。

Airflow とは、ワークフローの作成、スケジュール設定、モニタリングをプログラムで実行するためのプラットフォームです。

Airflow は、ワークフローをタスクの有向非循環グラフ(DAG)として作成するために使用します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。

基本コンセプト

DAG

有向非巡回グラフとは、実行するすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。

演算子

単一のタスクの記述であり、通常はアトミックです。たとえば、BashOperator は bash コマンドの実行に使用されます。

タスク

演算子のパラメータ化されたインスタンスであり、DAG 内のノードです。

タスク インスタンス

タスクの特定の実行であり、DAG、タスク、時点を表します。これには、実行中、成功、失敗、スキップなどの状態があります。

コンセプトの詳細については、基本コンセプトに関するドキュメントをご覧ください。

タスク 4. ワークフローを定義する

これから使用するワークフローについて確認しておきましょう。Cloud Composer ワークフローは、DAG で構成されます。DAG は、Airflow の DAG_FOLDER に配置される標準の Python ファイルで定義されます。Airflow は各ファイル内のコードを実行して、動的に DAG オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローが 1 対 1 で対応している必要があります。

以下に、hadoop_tutorial.py ワークフロー コード(DAG とも呼ばれる)を示します。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop wordcount example, and deletes the cluster. This DAG relies on three Airflow variables https://airflow.apache.org/concepts.html#variables * gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster. * gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be created. * gce_region - Google Compute Engine region where Cloud Dataproc cluster should be created. * gcs_bucket - Google Cloud Storage bucket to used as output for the Hadoop jobs from Dataproc. See https://cloud.google.com/storage/docs/creating-buckets for creating a bucket. """ import datetime import os from airflow import models from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitJobOperator, ) from airflow.utils.trigger_rule import TriggerRule # 毎日実行する cron 式を使用して DAG のスケジュールを定義する DAILY_SCHEDULE = '0 0 * * *' # 毎日午前 0 時に実行 # Cloud Dataproc ジョブの出力ファイル output_file = os.path.join( models.Variable.get('gcs_bucket'), 'wordcount', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # すべての Dataproc クラスタで利用可能な Hadoop の wordcount の例へのパス WORDCOUNT_JAR = ( 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' ) # Cloud Dataproc ジョブに渡す引数 wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file] yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # 開始日を前日にすると、Cloud Storage バケットで # 検出されるとすぐに DAG が開始する 'start_date': yesterday, # 失敗時や再試行時にメールを送信するには、「email」引数を自分のメール アドレスに設定して # メールを有効にする 'email_on_failure': False, 'email_on_retry': False, # タスクが失敗した場合は、5 分以上待ってから 1 回再試行する 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } with models.DAG( dag_id='composer_hadoop_tutorial', # 「dag_id」を明示的に使用することが推奨される schedule_interval=DAILY_SCHEDULE, # 新しい cron 式 default_args=default_dag_args) を DAG として使用 # Cloud Dataproc クラスタを作成する create_dataproc_cluster = DataprocCreateClusterOperator( task_id='create_dataproc_cluster', # スケジュール設定された日付を追加して、クラスタに一意の名前を付ける # https://airflow.apache.org/code.html#default-variables を参照 cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', num_workers=2, region=models.Variable.get('gce_region'), zone=models.Variable.get('gce_zone'), image_version='2.0', master_machine_type='e2-standard-2', worker_machine_type='e2-standard-2') # Cloud Dataproc クラスタのマスターノードにインストールされている # Hadoop のワードカウントの例を実行する run_dataproc_hadoop = DataprocSubmitJobOperator( task_id='run_dataproc_hadoop', region=models.Variable.get('gce_region'), job={ "placement": {"cluster_name": 'composer-hadoop-tutorial-cluster-{{ ds_nodash }}'}, "hadoop_job": { "main_jar_file_uri": WORDCOUNT_JAR, "args": wordcount_args } } ) # Cloud Dataproc クラスタを削除する delete_dataproc_cluster = DataprocDeleteClusterOperator( task_id='delete_dataproc_cluster', project_id=models.Variable.get('gcp_project'), region=models.Variable.get('gce_region'), cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', # trigger_rule を ALL_DONE に設定すると、Dataproc ジョブが失敗した場合でも # クラスタが削除される trigger_rule=TriggerRule.ALL_DONE) # DAG の依存関係を定義する create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

3 つのワークフロー タスクをオーケストレーションするために、DAG は次の演算子をインポートします。

  1. DataprocCreateClusterOperator: Cloud Dataproc クラスタを作成します。
  2. DataprocSubmitJobOperator: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。
  3. DataprocDeleteClusterOperator: Compute Engine の利用料金が発生しないようにするために、クラスタを削除します。

タスクは順番に実行されます。これは、ファイルの次の部分で確認できます。

# DAG の依存関係を定義する create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

この DAG の名前は composer_hadoop_tutorial で、1 日に 1 回実行されます。

with models.DAG( 'composer_hadoop_tutorial', # 1 日に 1 回の DAG の実行を続行する schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

default_dag_args に渡される start_dateyesterday に設定されているため、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。

タスク 5. 環境情報を表示する

  1. [Composer] に戻り、環境のステータスを確認します。

  2. 環境が作成されたら、環境の名前(highcpu)をクリックして詳細を表示します。

[環境の構成] タブで、Airflow ウェブ UI の URL、GKE クラスタ、バケットに保存されている DAG のフォルダへのリンクなどの情報を確認できます。

注: Cloud Composer によってスケジュールが設定されるのは、/dags フォルダ内のワークフローのみです。

タスク 6. Airflow UI を使用する

コンソールで Airflow ウェブ インターフェースにアクセスするには:

  1. [環境] ページに戻ります。
  2. 環境の [Airflow ウェブサーバー] 列で、[Airflow] をクリックします。
  3. ラボの認証情報をクリックします。
  4. 新しいブラウザ ウィンドウで Airflow ウェブ インターフェースが開きます。

タスク 7. Airflow 変数を設定する

Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。

  1. Airflow インターフェースのメニューバーで [Admin] > [Variables] を選択します。

  2. + アイコンをクリックして新しいレコードを追加します。

変数の追加

  1. gcp_projectgcs_bucketgce_zonegce_region という Airflow 変数を作成します。 変数を 1 つ作成するごとに [Save] をクリックします。
Key Val Details
gcp_project このラボで使用している Google Cloud Platform プロジェクト。
gcs_bucket gs:// Dataproc の Hadoop ジョブの出力を保存するバケット。
gce_zone Cloud Dataproc クラスタを作成する Compute Engine ゾーン。
gce_region Cloud Dataproc クラスタを作成する Compute Engine リージョン。

[Save] をクリックします。1 つ目の変数を追加したら、2 つ目と 3 つ目の変数でも同じ手順を繰り返します。完了すると、変数テーブルは次のようになります。

変数リスト

タスク 8. DAG を Cloud Storage にアップロードする

DAG をアップロードするには:

  1. Cloud Shell で次のコマンドを実行し、環境の作成時に自動的に作成された Cloud Storage バケットに hadoop_tutorial.py ファイルのコピーをアップロードします。

  2. 次のコマンドの <DAGs_folder_path> を DAG のフォルダへのパスに置き換えます。

gcloud storage cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py <DAGs_folder_path>
  • このパスは [Composer] で確認できます。
  • 前の手順で作成した環境をクリックし、[環境の構成] タブをクリックして環境の詳細を確認します。
  • [DAG のフォルダ] を見つけて、パスをコピーします。

DAG のフォルダのパス

ファイルをアップロードするための変更後のコマンドは、次のようになります。

gcloud storage cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py gs://{{{project_0.default_region|REGION}}}-highcpu-0682d8c0-bucket/dags
  1. ファイルが DAG ディレクトリにアップロードされた後にバケットで dags フォルダを開くと、[バケットの詳細] の [オブジェクト] タブが表示されます。

バケットの詳細

DAG ファイルが DAG のフォルダに追加されると、Cloud Composer によって DAG が Airflow に追加され、自動的にスケジュールが設定されます。DAG の変更は 3~5 分以内に行われます。

composer_hadoop_tutorial DAG のタスク ステータスは、Airflow ウェブ インターフェースで確認できます。

注: インターフェースに「The scheduler does not appear to be running...」などのメッセージが表示されても無視してかまいません。 Airflow ウェブ インターフェースは、DAG の進行状況に応じて更新されます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG を Cloud Storage にアップロードする

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなかったら、このワークフローの名前が DAG のリストに表示され、ワークフローがすぐに実行されるようキューに追加されます。

  1. 必ず、Airflow ウェブ インターフェースの [DAGs] タブを開いておきます。このプロセスが完了するまでには数分かかります。ブラウザを更新して、最新情報が表示されるかどうかを確認してください。

  2. Airflow で composer_hadoop_tutorial をクリックして DAG の詳細ページを開きます。このページには、ワークフローのタスクと依存関係がいくつか示されています。

  3. ツールバーで [グラフ] をクリックします。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

  4. [更新] をクリックして最新情報を表示すると、プロセスを囲む線の色がステータスに応じて変わるのを確認できます。

注: Dataproc クラスタがすでに存在している場合は、create_dataproc_cluster グラフィックをクリックしてもう一度ワークフローを実行します。ステータスが [成功] になったら、[クリア] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。
  1. create_dataproc_cluster のステータスが [実行中] に変わったら、ナビゲーション メニュー > [Dataproc] に移動して次の操作を行います。

    • [クラスタ] で、クラスタの作成と削除をモニタリングします。ワークフローによって作成されたクラスタは一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフローのタスクで削除されます。
    • [ジョブ] で、Apache Hadoop ワードカウント ジョブをモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  2. Dataproc のステータスが [実行中] になったら、Airflow に戻って [更新] をクリックすると、クラスタが完成したことを確認できます。

run_dataproc_hadoop プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount フォルダでワードカウントの結果を確認します。

  1. DAG ですべての手順が完了すると、それぞれを囲む線の色が濃い緑になります。また、作成した Dataproc クラスタが削除されます。

お疲れさまでした

Cloud Composer ワークフローを実行しました。

次のステップ

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2025 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。