
Translating…
こんにちは、みかみです。
Airflow から BigQuery に対して、いろいろ操作してみたい。
ということで。
やりたいこと
Airflow の DAG から、BigQuery に以下の処理を実行してみたい。
- データセット一覧を取得
- テーブル一覧を取得
- テーブルデータを取得
- テーブルにデータを insert
- テーブルデータを update
- テーブルデータを delete
- テーブルデータを GCS に export
オペレータの実装内容を確認
やりたいことが実現できそうなオペレータは既存でもいろいろあるようですが、自分で拡張が必要になる場合に備えて、まずは既存オペレータの実装内容を確認してみます。
bigquery と名がつくオペレータをざっと見てみましたが、ほとんどのオペレータクラスは、コンストラクタと execute() メソッドを持っているようです。
DAG ファイルではオペレータクラスの作成と依存関係定義しかしていないことから、クラスインスタンスを作成してあげれば Airflow フレームワークが execute() メソッドを実行してくれるようです。
BigQueryGetDatasetOperator の処理内容を確認
もう少し詳細に、/airflow/contrib/operators/bigquery_operator.py に実装されている、BigQueryGetDatasetOperator のコードを確認してみます。
execute() メソッドが実行されると、BigQueryHook インスタンスが作成され、connection を作り、cursor を取得して get_dataset() メソッドを実行します。
bigquery_operator.py
(省略) from airflow.contrib.hooks.bigquery_hook import BigQueryHook (省略) class BigQueryGetDatasetOperator(BaseOperator): (省略) template_fields = ('dataset_id', 'project_id') ui_color = '#f0eee4' @apply_defaults def __init__(self, dataset_id, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetDatasetOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id) return cursor.get_dataset( dataset_id=self.dataset_id, project_id=self.project_id) (省略)
BigQueryHook の中身も確認してみます。
実体は /airflow/contrib/hooks/bigquery_hook.py に実装されています。
bigquery_hook.py で googleapiclient.discovery が import されていることから、どうやらここから googleapiclient を使用して GCP にアクセスしているようです。
BigQueryHook の get_conn() メソッドでは、BigQueryConnection インスタンスを作成して返却しています。 BigQueryConnection の cursor() メソッドでは、BigQueryCursor インスタンスを返却。 BigQueryCursor クラスは BigQueryBaseCursor クラスを継承していました。
bigquery_hook.py
(省略) from googleapiclient.discovery import build (省略) class BigQueryHook(GoogleCloudBaseHook, DbApiHook): """ Interact with BigQuery. This hook uses the Google Cloud Platform connection. """ conn_name_attr = 'bigquery_conn_id' def __init__(self, bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=True, location=None): super(BigQueryHook, self).__init__( gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to) self.use_legacy_sql = use_legacy_sql self.location = location def get_conn(self): """ Returns a BigQuery PEP 249 connection object. """ service = self.get_service() project = self._get_field('project') return BigQueryConnection( service=service, project_id=project, use_legacy_sql=self.use_legacy_sql, location=self.location, num_retries=self.num_retries ) (省略) class BigQueryConnection(object): """ BigQuery does not have a notion of a persistent connection. Thus, these objects are small stateless factories for cursors, which do all the real work. """ def __init__(self, *args, **kwargs): self._args = args self._kwargs = kwargs def close(self): """ BigQueryConnection does not have anything to close. """ def commit(self): """ BigQueryConnection does not support transactions. """ def cursor(self): """ Return a new :py:class:`Cursor` object using the connection. """ return BigQueryCursor(*self._args, **self._kwargs) def rollback(self): raise NotImplementedError( "BigQueryConnection does not have transactions") (省略) class BigQueryCursor(BigQueryBaseCursor): """ A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249 implementation was used as a reference: https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py """ def __init__(self, service, project_id, use_legacy_sql=True, location=None, num_retries=5): super(BigQueryCursor, self).__init__( service=service, project_id=project_id, use_legacy_sql=use_legacy_sql, location=location, num_retries=num_retries ) self.buffersize = None self.page_token = None self.job_id = None self.buffer = [] self.all_pages_loaded = False (省略)
継承元の BigQueryBaseCursor のコードを確認してみると、get_dataset() メソッドで datasets().get() を実行していることが確認できました。
bigquery_hook.py
(省略) class BigQueryBaseCursor(LoggingMixin): """ The BigQuery base cursor contains helper methods to execute queries against BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ def __init__(self, service, project_id, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5): self.service = service self.project_id = project_id self.use_legacy_sql = use_legacy_sql if api_resource_configs: _validate_value("api_resource_configs", api_resource_configs, dict) self.api_resource_configs = api_resource_configs if api_resource_configs else {} self.running_job_id = None self.location = location self.num_retries = num_retries (省略) def get_dataset(self, dataset_id, project_id=None): """ Method returns dataset_resource if dataset exist and raised 404 error if dataset does not exist :param dataset_id: The BigQuery Dataset ID :type dataset_id: str :param project_id: The GCP Project ID :type project_id: str :return: dataset_resource .. seealso:: For more information, see Dataset Resource content: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource """ if not dataset_id or not isinstance(dataset_id, str): raise ValueError("dataset_id argument must be provided and has " "a type 'str'. You provided: {}".format(dataset_id)) dataset_project_id = project_id if project_id else self.project_id try: dataset_resource = self.service.datasets().get( datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries) self.log.info("Dataset Resource: %s", dataset_resource) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return dataset_resource (省略)
データセット一覧を取得
データセット一覧は、datasets().list() API で取得できそうです。
BigQueryBaseCursor クラスに、datasets().list() API をコールする get_datasets_list() メソッドがありました。
bigquery_hook.py
(省略) class BigQueryBaseCursor(LoggingMixin): (省略) def get_datasets_list(self, project_id=None): """ Method returns full list of BigQuery datasets in the current project .. seealso:: For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list :param project_id: Google Cloud Project for which you try to get all datasets :type project_id: str :return: datasets_list Example of returned datasets_list: :: { "kind":"bigquery#dataset", "location":"US", "id":"your-project:dataset_2_test", "datasetReference":{ "projectId":"your-project", "datasetId":"dataset_2_test" } }, { "kind":"bigquery#dataset", "location":"US", "id":"your-project:dataset_1_test", "datasetReference":{ "projectId":"your-project", "datasetId":"dataset_1_test" } } ] """ dataset_project_id = project_id if project_id else self.project_id try: datasets_list = self.service.datasets().list( projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets'] self.log.info("Datasets List: %s", datasets_list) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return datasets_list (省略)
既存コードを grep してみましたが、get_datasets_list() を使っているオペレータはなさそうなので、他のオペレータクラスを参考にして、新規で作成してみます。
以下のクラスを /airflow/contrib/operators/bigquery_operator.py に追加しました。
※コード管理やメンテナンス性を考えると、拡張コードは別ディレクトリ、別ファイルに分けた方が良いと思いますが、今回は簡易性を優先して既存ファイルを更新しています。
bigquery_operator.py
class BigQueryGetDatasetListOperator(BaseOperator): template_fields = ('project_id',) ui_color = '#f0eee4' @apply_defaults def __init__(self, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetDatasetListOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset list: %s', self.project_id) return cursor.get_datasets_list(project_id=self.project_id)
追加したオペレータを実行する DAG ファイルも追加しました。
example_bigquery_getdatasetlist_operator.py
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_operator = None # type: Any try: from airflow.contrib.operators import bigquery_operator except ImportError: pass if bigquery_operator is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_getdatasetlist_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_operator.BigQueryGetDatasetListOperator( task_id='bigquery_getdatasetlist_example', project_id='cm-da-mikami-yuki-258308', dag=dag)
現在、GCP の 対象プロジェクトには、airflow_test と test_s3 の2つのデータセットがある状態です。
DAG ファイルを dags ディレクトリに配置して、実行してみます。
正常に実行できたようです。
ログを確認してみると、取得したデータセットのリストが出力されています。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdatasetlist_operator/bigquery_getdatasetlist_example/2020-03-17T08:11:54.852025 00:00/1.log [2020-03-17 08:12:05,305] {taskinstance.py:655} INFO - Dependencies all met for [2020-03-17 08:12:05,312] {taskinstance.py:655} INFO - Dependencies all met for [2020-03-17 08:12:05,312] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:12:05,312] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 08:12:05,312] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:12:05,322] {taskinstance.py:887} INFO - Executing on 2020-03-17T08:11:54.852025 00:00 [2020-03-17 08:12:05,323] {standard_task_runner.py:53} INFO - Started process 12393 to run task [2020-03-17 08:12:05,382] {logging_mixin.py:112} INFO - Running %s on host %s ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 08:12:05,399] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,399] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2020-03-17 08:12:05,402] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,402] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 08:12:05,701] {bigquery_operator.py:851} INFO - Start getting dataset list: cm-da-mikami-yuki-258308 [2020-03-17 08:12:05,703] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,703] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets?alt=json [2020-03-17 08:12:06,214] {logging_mixin.py:112} INFO - [2020-03-17 08:12:06,213] {bigquery_hook.py:1809} INFO - Datasets List: [{'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': {'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'US'}, {'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': {'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'asia-northeast1'}] [2020-03-17 08:12:06,225] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdatasetlist_operator, task_id=bigquery_getdatasetlist_example, execution_date=20200317T081154, start_date=20200317T081205, end_date=20200317T081206 [2020-03-17 08:12:15,303] {logging_mixin.py:112} INFO - [2020-03-17 08:12:15,303] {local_task_job.py:103} INFO - Task exited with return code 0
以下、レスポンス部分の抜粋です。
Datasets List: [ { 'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': { 'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308' }, 'location': 'US' }, { 'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': { 'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308' }, 'location': 'asia-northeast1' } ]
データセットが正常に取得できました。
テーブル一覧を取得
テーブル一覧は、tables().list() API で取得できるようです。
既存コードを確認したところ、テーブル一覧取得用のメソッドはないようです。
BigQueryBaseCursor クラスに、tables().list() API をコールする get_table_list() メソッドを追加しました。
bigquery_hook.py
def get_table_list(self, dataset_id, project_id=None): table_project_id = project_id if project_id else self.project_id try: tables_list = self.service.tables().list( projectId=table_project_id, datasetId=dataset_id).execute(num_retries=self.num_retries) self.log.info("Tables List: %s", tables_list) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return tables_li
さらに、テーブルリスト取得用のオペレータと、オペレータを実行する DAG ファイルも追加します。
bigquery_operator.py
class BigQueryGetTableListOperator(BaseOperator): template_fields = ('dataset_id', 'project_id') ui_color = '#f0eee4' @apply_defaults def __init__(self, dataset_id, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetTableListOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id) return cursor.get_table_list( dataset_id=self.dataset_id, project_id=self.project_id)
example_bigquery_gettablelist_operator.py
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_operator = None # type: Any try: from airflow.contrib.operators import bigquery_operator except ImportError: pass if bigquery_operator is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_gettablelist_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_operator.BigQueryGetTableListOperator( task_id='bigquery_gettablelist_example', dataset_id='airflow_test', project_id='cm-da-mikami-yuki-258308', dag=dag)
取得対象の airflow_test データセットには、name_[YYYY] テーブルが3つある状態です。
DAG を実行してログを確認してみます。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_gettablelist_operator/bigquery_gettablelist_example/2020-03-17T08:58:24.271847 00:00/1.log [2020-03-17 08:58:27,986] {taskinstance.py:655} INFO - Dependencies all met for [2020-03-17 08:58:27,994] {taskinstance.py:655} INFO - Dependencies all met for [2020-03-17 08:58:27,994] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:58:27,994] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 08:58:27,994] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:58:28,003] {taskinstance.py:887} INFO - Executing on 2020-03-17T08:58:24.271847 00:00 [2020-03-17 08:58:28,005] {standard_task_runner.py:53} INFO - Started process 14010 to run task [2020-03-17 08:58:28,079] {logging_mixin.py:112} INFO - Running %s on host %s ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 08:58:28,106] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,105] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2020-03-17 08:58:28,111] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,111] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 08:58:28,417] {bigquery_operator.py:879} INFO - Start getting dataset: cm-da-mikami-yuki-258308:airflow_test [2020-03-17 08:58:28,421] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,421] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables?alt=json [2020-03-17 08:58:28,789] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,789] {bigquery_hook.py:1437} INFO - Tables List: {'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [{'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880'}, 'type': 'TABLE', 'creationTime': '1584434061049'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000'}, 'type': 'TABLE', 'creationTime': '1584102254437'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018'}, 'type': 'TABLE', 'creationTime': '1584434135714'}], 'totalItems': 3} [2020-03-17 08:58:28,801] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_gettablelist_operator, task_id=bigquery_gettablelist_example, execution_date=20200317T085824, start_date=20200317T085827, end_date=20200317T085828
取得したテーブルリストは以下です。
Tables List: { 'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [ { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880' }, 'type': 'TABLE', 'creationTime': '1584434061049' }, { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000' }, 'type': 'TABLE', 'creationTime': '1584102254437' }, { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018' }, 'type': 'TABLE', 'creationTime': '1584434135714' } ], 'totalItems': 3 }
期待通り、3つのテーブルが取得できました。
テーブルデータを取得
テーブルデータを取得するためには、tabledata().list() をコールすれば良さそうです。
既存の /airflow/contrib/operators/bigquery_get_data.py ファイルにある BigQueryGetDataOperator が使えそうです。
DAG ファイルを追加して動作確認してみます。
example_bigquery_getdata_operator.py
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_get_data = None # type: Any try: from airflow.contrib.operators import bigquery_get_data except ImportError: pass if bigquery_get_data is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_getdata_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_get_data.BigQueryGetDataOperator( task_id='bigquery_getdata_example', dataset_id='airflow_test', table_id='name_2000', max_results='10', dag=dag)
完了したようなので、ログを確認してみると。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_ getdata_operator / bigquery_getdata_example / 2020-03-17T09: 40: 56.917329 00: 00 / 1.log [2020-03-17 09: 41: 06،239] {taskinstance.py:655} INFO - وابستگی های همه برای ملاقات کرد [2020-03-17 09: 41: 06،246] {taskinstance.py:655} INFO - وابستگی های همه برای [2020-03-17 09: 41: 06،246] {taskinstance.py:866 F INFO - -------------------------------------------------- ------------------------------ [2020-03-17 09: 41: 06،246] {taskinstance.py:867} INFO - تلاش شروع 1 از 1 [2020-03-17 09: 41: 06،247] {taskinstance.py:868} INFO - -------------------------------------------------- ------------------------------ [2020-03-17 09: 41: 06،255] {taskinstance.py:887} INFO - اعدام on 2020-03-17T09: 40: 56.917329 00: 00 [2020-03-17 09: 41: 06،257] {standard_task_runner.py:53} INFO - فرایند 15511 برای اجرای کار شروع شد [2020-03-17 09: 41: 06،316] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان ip-10-0-43-239.ap-شمال شرقی-1.compute.internal [2020-03-17 09: 41: 06،330] {bigquery_get_data.py:92} INFO - واکشی داده ها از: [2020-03-17 09: 41: 06،330] {bigquery_get_data.py:94} INFO - بانک اطلاعاتی: airflow_test؛ جدول: name_2000؛ حداکثر نتایج: 10 [2020-03-17 09: 41: 06،335] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 06،335] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 09: 41: 06،676] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 06،676] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data؟maxResults=10&alt=json [2020-03-17 09: 41: 07،286] {bigquery_get_data.py:106} INFO - تعداد ردیف های استخراج شده: 29772 [2020-03-17 09: 41: 07،297] {taskinstance.py:1048} INFO - علامت گذاری کار به عنوان SUCCESS.dag_id = مثال_bigquery_getdata_operator، task_id = bigquery_getdata_example، اجرای_date = 20200317T094056، start_date = 20200 [2020-03-17 09: 41: 16،236] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 16،236] {local_task_job.py:103} INFO - کار با کد بازگشت 0 خارج شد کد /
行 数 の 取得 は 実 ー ブ と と 一致 し て る る の 、 、 正常 る 実。。
در が، デ ー タ の 中 身 も 確認 し た い .. の で، متن BigQueryGetDataOperator tt> متن に 取得 し た デ ー タ を 出力 す る ロ グ を 追加 し て، 再度 実 行 し て み ま す. P>
bigquery_get_data.py
(省略) کلاس BigQueryGetDataOperator (BaseOperator): 省略) اجرای اجرا (خود ، زمینه): self.log.info ("واگذاری داده ها از:") self.log.info ('بانک اطلاعات:٪ s؛ جدول:٪ s؛ حداکثر نتایج:٪ s'، self.dataset_id ، self.table_id ، self.max_results) قلاب = BigQueryHook (bigquery_conn_id = self.bigquery_conn_id ، delegate_to = self.delegate_to) conn = hook.get_conn () مکان نما = conn.cursor () پاسخ = cursor.get_tabledata (مجموعه داده_آد = self.dataset_id ، table_id = self.table_id ، max_results = self.max_results ، select_fields = self.selected_fields) self.log.info ('تعداد ردیف های استخراج شده:٪ s' ، پاسخ ['totalRows']) ردیف = پاسخ ['ردیف'] # میکامی را اضافه کنید self.log.info ('داده:٪ s' ، ردیف) Table_data = [] برای نوع_ ردیف در ردیف ها: single_row = [] برای زمینه های در dict_row ['f']: single_row.append (زمینه ها ['v']) table_data.append (single_row) جدول بازگشت 省略) کد /
今 度 は ロ グ に 取得 し デ ー タ も 出力 さ れ ま し。
*** خواندن پرونده محلی: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:53:54.223409 00:00/1 .لاگ [2020-03-17 09: 55: 10،380] {taskinstance.py:655} INFO - وابستگی های همه برای ملاقات کرد [2020-03-17 09: 55: 10،387] {taskinstance.py:655 F INFO - وابستگی های همه برای ملاقات کرد [2020-03-17 09: 55: 10،387] {taskinstance.py:866} INFO - -------------------------------------------------- ------------------------------ [2020-03-17 09: 55: 10،387] {taskinstance.py:867} INFO - تلاش شروع 1 از 1 [2020-03-17 09: 55: 10،387] {taskinstance.py:868} INFO - -------------------------------------------------- ------------------------------ [2020-03-17 09: 55: 10،396] {taskinstance.py:887} INFO - اعدام on 2020-03-17T09: 53: 54.223409 00: 00 [2020-03-17 09: 55: 10،398] {standard_task_runner.py:53} INFO - فرایند 16017 را برای اجرای کار آغاز کرد [2020-03-17 09: 55: 10،456] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان ip-10-0-43-239.ap-شمال شرقی-1.compute.internal [2020-03-17 09: 55: 10،469] {bigquery_get_data.py:92} INFO - واکشی داده ها از: [2020-03-17 09: 55: 10،469] {bigquery_get_data.py:94} INFO - بانک اطلاعاتی: airflow_test؛ جدول: name_2000؛ حداکثر نتایج: 10 [2020-03-17 09: 55: 10،474] {logging_mixin.py:112} INFO - [2020-03-17 09: 55: 10،474] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 09: 55: 10،796] {logging_mixin.py:112} INFO - [2020-03-17 09: 55: 10،796] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data؟maxResults=10&alt=json [2020-03-17 09: 55: 12،168] {bigquery_get_data.py:106} INFO - تعداد کل ردیف های استخراج شده: 29772 [2020-03-17 09: 55: 12،168] {bigquery_get_data.py:109} INFO - Data: [{'f': [{'v': 'Emily'}، {'v': 'F'}، {'v': '25956'}]} ، {'f': [{'v': 'Hannah'}، {'v': 'F'}، {'v': '23082'}]}، {'f': [{'v': 'Madison'}، {'v': 'F'}، {'v': '19968'}]}، {'f': [{'v': ' اشلی '} ، {' v ':' F '} ، {' v ':' 17997 '}]} ، {' f ': [{' v ':' Sarah '}، {' v ':' F ' } ، {'v': '17702'}]} ، f f ': [{' v ':' Alexis '}، {' v ':' F '}، {' v ':' 17629 '}] } ، {'f': [{'v': 'Samantha'} ، {'v': 'F'}، {'v': '17265'}]}، {'f': [{'v' : 'Jessica'}، {'v': 'F'}، {'v': '15709'}]}، {'f': [{'v': 'Elizabeth'}، {'v': ' F '}، {' v ':' 15099 '}]}، f f': [{'v': 'Taylor'}، {'v': 'F'}، {v ':' 15078 ' }]}] [2020-03-17 09: 55: 12،180] {taskinstance.py:1048} INFO - علامت گذاری وظیفه به عنوان SUCCESS.dag_id = مثال_bigquery_getdata_operator، task_id = bigquery_getdata_example، اجرای_date = 20200317T095354، start_date = 20200317T کد /
data: [ {'f': [{'v': 'Emily'}، {'v': 'F'}، {'v': '25956'}]}، {'f': [{'v': 'Hannah'}، {'v': 'F'}، {'v': '23082'}]}، {'f': [{'v': 'Madison'}، {'v': 'F'}، {'v': '19968'}]}، {'f': [{'v': 'Ashley'}، {'v': 'F'}، {'v': '17997'}]}، {'f': [{'v': 'Sarah'}، {'v': 'F'}، {'v': '17702'}]}، {'f': [{'v': 'Alexis'}، {'v': 'F'}، {'v': '17629'}]}، {'f': [{'v': 'Samantha'}، {'v': 'F'}، {'v': '17265'}]}، {'f': [{'v': 'Jessica'}، {'v': 'F'}، {'v': '15709'}]}، {'f': [{'v': 'Elizabeth'}، {'v': 'F'}، {'v': '15099'}]}، {'f': [{'v': 'Taylor'}، {'v': 'F'}، {'v': '15078'}] ] کد /
こ の API は カ ラ ム 名 で は よ き が 、 で す が 、 ひ と ま ず 、 テ
در نتیجه ردیف های تکراری بازنمایی مبتنی بر REST از این داده ، یک سری از اشیاء JSON f ، v را برای نشان دادن زمینه ها و مقادیر به کار می برد.
テ ー ブ ル に デ ー タ を insert / update / حذف
BigQuery 対 し て 、 DAG か ら 自由 に SQL ク エ リ を 実 行 す る に は ど う す る の と 思 っ て 調 た と こ こ tt BigQueryOperator SQL を 渡 し て あ げ ば 実 行 で き で で す。 p
name_2000 テ ー ブ ル tt name = 'Mikami'
example_bigquery_sql_operator.py
# - * - کدگذاری: utf-8 - * - از تایپ واردات واردات جریان هوا از مدل های واردات جریان هوا bigquery_operator = هیچ یک از انواع: هر نوع تلاش كردن: از airflow.contrib.operators وارد bigquery_operator می شود به غیر از ImportError: عبور اگر bigquery_operator موجود نیست: استدلال = { "صاحب": "جریان هوا" ، 'start_date': airflow.utils.dates.days_ago (2) } dag = مدل.DAG ( dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان، sched_interval = هیچ کدام) query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) " bigquery_sql = bigquery_operator.BigQueryOperator ( task_id = 'bigquery_sql_example'، sql = پرس و جو ، use_legacy_sql = نادرست ، خنجر = خنجر) کد /
正常 終了 し た よ う GCP の 管理 コ ン ソ ー ル か ら デ ー タ を し し て み。
ち ゃ ん と DAG ら レ コ ー ド の 追加 が 行 わ れ ま し た。
続 い て 先 ほ ど ど درج し た レ コ ー tt を 、 name = 'Yuki' で بروزرسانی し て み ま す。
DAG を ァ イ ル の SQL を بروزرسانی ク エ リ に 変 更 し て 実 行 し て ま ま す。 p
example_bigquery_sql_operator.py
(省略) dag = مدل.DAG ( dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان، sched_interval = هیچکدام) # query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) " query = "UPDATE` cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = "Yuki" WHERE name = "Mikami" and جنسیت = "F" و تعداد = 1 " bigquery_sql = bigquery_operator.BigQueryOperator ( task_id = 'bigquery_sql_example'، sql = پرس و جو ، use_legacy_sql = نادرست ، خنجر = خنجر) کد /
DAG 、 正常 終了 し た の で 、 GCP 管理 コ ン ソ ー ル か ら را انتخاب کنید ク エ リ 実 し し て デ ー タ て み。 p p。
正常 に بروزرسانی さ れ た こ と が で き ま し た。
さ ら に 、 先 ほ ど ど حذف و به روز رسانی し た レ コ ー ド を حذف し ま す。
example_bigquery_sql_operator.py
(省略) dag = مدل.DAG ( dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان، sched_interval = هیچکدام) # query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) " # query = "به روز رسانی` cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and جنسیت = 'F' and count = 1 " query = "DELETE FROM" cm-da-mikami-yuki-258308.airflow_test.name_2000` WHERE name = 'Yuki' and جنسیت = 'F' and count = 1 " bigquery_sql = bigquery_operator.BigQueryOperator ( task_id = 'bigquery_sql_example'، sql = پرس و جو ، use_legacy_sql = نادرست ، خنجر = خنجر) کد /
DAG 再度 行 後 、 再度 GCP 管理 コ ン ソ ー か ら ク エ エ 実 行 し て デ ー タ を し て デ ー タ を 確認 し て。 p
حذف も 、 期待 通 り 実 行 で き ま し。。
テ ー ブ ル デ ー タ を GCS に eksport す る
最後 に 、 テ ー ブ ル C ー タ を GCS に フ ァ イ ル 出力 し て み ま す。 p
/airflow/contrib/operators/bigquery_to_gcs.py BigQueryToCloudStorageOperator 、 使 え ば 、 DAG だ け 実 行 で で p p p
name_2000 テ ー ブ ル の デ ー タ を tt gs: //test-cm-mikami/test_export/exp_name_2000.txt に صادرات す る 、 の の AG ル ル、 実 行 し て み す す。 p
example_bigquery_to_gcs.py
# - * - کدگذاری: utf-8 - * - از تایپ واردات واردات جریان هوا از مدل های واردات جریان هوا bigquery_to_gcs = نوع # هیچکدام: هر تلاش كردن: از airflow.contrib.operators bigquery_to_gcs را وارد می کنید به غیر از ImportError: عبور اگر bigquery_to_gcs نباشد: استدلال = { "صاحب": "جریان هوا" ، 'start_date': airflow.utils.dates.days_ago (2) } dag = مدل.DAG ( dag_id = 'shembull_bigquery_to_gcs_operator'، default_args = آرگومان، sched_interval = هیچکدام) get_dataset = bigquery_to_gcs.BigQueryToCloudStorageOperator ( task_id = 'bigquery_to_gcs_example'، Source_project_dataset_table = 'cm-da-mikami-yuki-258308.airflow_test.name_2000'، destination_cloud_storage_uris = 'gs: //test-cm-mikami/test_export/exp_name_2000.txt'، خنجر = خنجر) کد /
実 行 前 、 GCS の 対 象 パ ス に は 何 も フ ァ ァ イ ル が い い 状態 で す。
DAG 行 が 正常 に 完了 し こ と を 確認 し て
*** خواندن پرونده محلی: /home/ec2-user/airflow/logs/example_bigquery_to_gcs_operator/bigquery_to_gcs_example/2020-03-17T11:34:24.397858 00:00/1 .لاگ [2020-03-17 11: 34: 42،238] {taskinstance.py:655} INFO - وابستگی های همه برای on 2020-03-17T11: 34: 24.397858 00: 00 [2020-03-17 11: 34: 42،257] {standard_task_runner.py:53} INFO - فرایند 19929 را برای اجرای کار آغاز کرد [2020-03-17 11: 34: 42،315] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان ip-10-0-43-239.ap-شمال شرقی-1.compute.internal [2020-03-17 11: 34: 42،329] {bigquery_to_gcs.py:93} INFO - اجرای عصاره cm-da-mikami-yuki-258308.airflow_test.name_2000 به: gs: // test-cm-mikami / test_export / ارزان_name_2000.txt [2020-03-17 11: 34: 42،334] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 42،334] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 11: 34: 42،645] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 42،644] {کشف.py:894 F INFO - URL درخواست شده: POST https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs؟alt=json [2020-03-17 11: 34: 43،677] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 43،677] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY؟location=US&alt=json [2020-03-17 11: 34: 44،198] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 44،198] {bigquery_hook.py:1347} INFO - در انتظار کار برای تکمیل: cm- da-mikami-yuki-258308، job_NptEvLRIZj6NzF6O8fy2mgtoMlKY [2020-03-17 11: 34: 49،204] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 49،203] {کشف.py:894} INFO - URL مورد درخواست: GET https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY؟location=US&alt=json [2020-03-17 11: 34: 49،493] {taskinstance.py:1048} INFO - علامت گذاری وظیفه به عنوان SUCCESS.dag_id = مثال_bigquery_to_gcs_operator، task_id = bigquery_to_gcs_example، operation_date = 20200317T113424، start_date = 20200317T113424، start_date = 20200317T113424، start2date_ 203 [2020-03-17 11: 34: 52،237] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 52،236] {local_task_job.py:103} INFO - کار با کد بازگشت 0 خارج شد کد /
GCP ら コ ン ソ ー ル か ら GCS の フ ァ イ ル を 確認 し て み と と
フ ァ イ ル が 出力 さ れ て い ま す。
出力 フ ァ イ ル を ダ し ロ ー ド し て 中 身 を 確認 し て み と と
نام ، جنس ، تعداد لیزت ، اف ، 256 مونسرات ، ف ، 256 کالین ، ف ، 256 جولیان ، ف ، 512 تایلر ، اف ، 512 کیلا ، ف ، 13312 مدیسون ، اف ، 19968 کیرا ، ف ، 257 رندی ، ف ، 257 Reanna، F، 257 اینگرید ، ف ، 257 ارین ، ف ، 257 الکسوس ، اف ، 1281 省略) کد /
صادرات p ー ー ブ ル ー タ が 正常 に صادرات さ れ た こ と が 確認 で き ま し。。
ま と め (所 感 h
در جریان هوا の BigQuery مشاهده 関 連 の オ ペ レ ー タ で は، BigQuery مشاهده API を 使 っ て BigQuery مشاهده に ア ク セ ス し て い て، 実 際 に ど の API を コ ー ル し て い る の か は متن bigquery_hook.py tt> متن を 確認 す れ ば 良 さ そ うで す。
ま た 、 既存 に は な い 処理 も 、 BigQuery API の 仕 様 を 確認 し な が ら 自由 に 実 装 で き。 ppp
他 に tt BashOperator gcloud CLI も 簡 単 に 実 行 で き る の 、 fl جریان هوا 経 由 で の BigQuery 関 連 の 操作 に る こ こ p p p p p p
参考
٪٪ مورد_read_more_button ٪٪