جریان هوا で BigQuery 操作 し て み


Translating…

こんにちは、みかみです。

Airflow から BigQuery に対して、いろいろ操作してみたい。

ということで。

やりたいこと

Airflow の DAG から、BigQuery に以下の処理を実行してみたい。

  • データセット一覧を取得
  • テーブル一覧を取得
  • テーブルデータを取得
  • テーブルにデータを insert
  • テーブルデータを update
  • テーブルデータを delete
  • テーブルデータを GCS に export

オペレータの実装内容を確認

やりたいことが実現できそうなオペレータは既存でもいろいろあるようですが、自分で拡張が必要になる場合に備えて、まずは既存オペレータの実装内容を確認してみます。

bigquery と名がつくオペレータをざっと見てみましたが、ほとんどのオペレータクラスは、コンストラクタと execute() メソッドを持っているようです。

DAG ファイルではオペレータクラスの作成と依存関係定義しかしていないことから、クラスインスタンスを作成してあげれば Airflow フレームワークが execute() メソッドを実行してくれるようです。

BigQueryGetDatasetOperator の処理内容を確認

もう少し詳細に、/airflow/contrib/operators/bigquery_operator.py に実装されている、BigQueryGetDatasetOperator のコードを確認してみます。

execute() メソッドが実行されると、BigQueryHook インスタンスが作成され、connection を作り、cursor を取得して get_dataset() メソッドを実行します。

bigquery_operator.py

(省略)  from airflow.contrib.hooks.bigquery_hook import BigQueryHook  (省略)  class BigQueryGetDatasetOperator(BaseOperator):  (省略)        template_fields = ('dataset_id', 'project_id')      ui_color = '#f0eee4'        @apply_defaults      def __init__(self,                   dataset_id,                   project_id=None,                   gcp_conn_id='google_cloud_default',                   delegate_to=None,                   *args, **kwargs):          self.dataset_id = dataset_id          self.project_id = project_id          self.gcp_conn_id = gcp_conn_id          self.delegate_to = delegate_to          super(BigQueryGetDatasetOperator, self).__init__(*args, **kwargs)        def execute(self, context):          bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,                                 delegate_to=self.delegate_to)          conn = bq_hook.get_conn()          cursor = conn.cursor()            self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)            return cursor.get_dataset(              dataset_id=self.dataset_id,              project_id=self.project_id)  (省略)  

BigQueryHook の中身も確認してみます。

実体は /airflow/contrib/hooks/bigquery_hook.py に実装されています。

bigquery_hook.pygoogleapiclient.discovery が import されていることから、どうやらここから googleapiclient を使用して GCP にアクセスしているようです。

BigQueryHookget_conn() メソッドでは、BigQueryConnection インスタンスを作成して返却しています。 BigQueryConnectioncursor() メソッドでは、BigQueryCursor インスタンスを返却。 BigQueryCursor クラスは BigQueryBaseCursor クラスを継承していました。

bigquery_hook.py

(省略)  from googleapiclient.discovery import build  (省略)  class BigQueryHook(GoogleCloudBaseHook, DbApiHook):      """      Interact with BigQuery. This hook uses the Google Cloud Platform      connection.      """      conn_name_attr = 'bigquery_conn_id'        def __init__(self,                   bigquery_conn_id='bigquery_default',                   delegate_to=None,                   use_legacy_sql=True,                   location=None):          super(BigQueryHook, self).__init__(              gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to)          self.use_legacy_sql = use_legacy_sql          self.location = location        def get_conn(self):          """          Returns a BigQuery PEP 249 connection object.          """          service = self.get_service()          project = self._get_field('project')          return BigQueryConnection(              service=service,              project_id=project,              use_legacy_sql=self.use_legacy_sql,              location=self.location,              num_retries=self.num_retries          )  (省略)  class BigQueryConnection(object):      """      BigQuery does not have a notion of a persistent connection. Thus, these      objects are small stateless factories for cursors, which do all the real      work.      """        def __init__(self, *args, **kwargs):          self._args = args          self._kwargs = kwargs        def close(self):          """ BigQueryConnection does not have anything to close. """        def commit(self):          """ BigQueryConnection does not support transactions. """        def cursor(self):          """ Return a new :py:class:`Cursor` object using the connection. """          return BigQueryCursor(*self._args, **self._kwargs)        def rollback(self):          raise NotImplementedError(              "BigQueryConnection does not have transactions")  (省略)  class BigQueryCursor(BigQueryBaseCursor):      """      A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249      implementation was used as a reference:        https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py      https://github.com/dropbox/PyHive/blob/master/pyhive/common.py      """        def __init__(self, service, project_id, use_legacy_sql=True, location=None, num_retries=5):          super(BigQueryCursor, self).__init__(              service=service,              project_id=project_id,              use_legacy_sql=use_legacy_sql,              location=location,              num_retries=num_retries          )          self.buffersize = None          self.page_token = None          self.job_id = None          self.buffer = []          self.all_pages_loaded = False  (省略)  

継承元の BigQueryBaseCursor のコードを確認してみると、get_dataset() メソッドで datasets().get() を実行していることが確認できました。

bigquery_hook.py

(省略)  class BigQueryBaseCursor(LoggingMixin):      """      The BigQuery base cursor contains helper methods to execute queries against      BigQuery. The methods can be used directly by operators, in cases where a      PEP 249 cursor isn't needed.      """        def __init__(self,                   service,                   project_id,                   use_legacy_sql=True,                   api_resource_configs=None,                   location=None,                   num_retries=5):            self.service = service          self.project_id = project_id          self.use_legacy_sql = use_legacy_sql          if api_resource_configs:              _validate_value("api_resource_configs", api_resource_configs, dict)          self.api_resource_configs = api_resource_configs               if api_resource_configs else {}          self.running_job_id = None          self.location = location          self.num_retries = num_retries  (省略)      def get_dataset(self, dataset_id, project_id=None):          """          Method returns dataset_resource if dataset exist          and raised 404 error if dataset does not exist            :param dataset_id: The BigQuery Dataset ID          :type dataset_id: str          :param project_id: The GCP Project ID          :type project_id: str          :return: dataset_resource                .. seealso::                  For more information, see Dataset Resource content:                  https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource          """            if not dataset_id or not isinstance(dataset_id, str):              raise ValueError("dataset_id argument must be provided and has "                               "a type 'str'. You provided: {}".format(dataset_id))            dataset_project_id = project_id if project_id else self.project_id            try:              dataset_resource = self.service.datasets().get(                  datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries)              self.log.info("Dataset Resource: %s", dataset_resource)          except HttpError as err:              raise AirflowException(                  'BigQuery job failed. Error was: {}'.format(err.content))            return dataset_resource  (省略)  

データセット一覧を取得

データセット一覧は、datasets().list() API で取得できそうです。

BigQueryBaseCursor クラスに、datasets().list() API をコールする get_datasets_list() メソッドがありました。

bigquery_hook.py

(省略)  class BigQueryBaseCursor(LoggingMixin):  (省略)      def get_datasets_list(self, project_id=None):          """          Method returns full list of BigQuery datasets in the current project            .. seealso::              For more information, see:              https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list            :param project_id: Google Cloud Project for which you              try to get all datasets          :type project_id: str          :return: datasets_list                Example of returned datasets_list: ::                       {                        "kind":"bigquery#dataset",                        "location":"US",                        "id":"your-project:dataset_2_test",                        "datasetReference":{                           "projectId":"your-project",                           "datasetId":"dataset_2_test"                        }                     },                     {                        "kind":"bigquery#dataset",                        "location":"US",                        "id":"your-project:dataset_1_test",                        "datasetReference":{                           "projectId":"your-project",                           "datasetId":"dataset_1_test"                        }                     }                  ]          """          dataset_project_id = project_id if project_id else self.project_id            try:              datasets_list = self.service.datasets().list(                  projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets']              self.log.info("Datasets List: %s", datasets_list)            except HttpError as err:              raise AirflowException(                  'BigQuery job failed. Error was: {}'.format(err.content))            return datasets_list  (省略)  

既存コードを grep してみましたが、get_datasets_list() を使っているオペレータはなさそうなので、他のオペレータクラスを参考にして、新規で作成してみます。

以下のクラスを /airflow/contrib/operators/bigquery_operator.py に追加しました。

※コード管理やメンテナンス性を考えると、拡張コードは別ディレクトリ、別ファイルに分けた方が良いと思いますが、今回は簡易性を優先して既存ファイルを更新しています。

bigquery_operator.py

class BigQueryGetDatasetListOperator(BaseOperator):        template_fields = ('project_id',)      ui_color = '#f0eee4'        @apply_defaults      def __init__(self,                   project_id=None,                   gcp_conn_id='google_cloud_default',                   delegate_to=None,                   *args, **kwargs):          self.project_id = project_id          self.gcp_conn_id = gcp_conn_id          self.delegate_to = delegate_to          super(BigQueryGetDatasetListOperator, self).__init__(*args, **kwargs)        def execute(self, context):          bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,                                 delegate_to=self.delegate_to)          conn = bq_hook.get_conn()          cursor = conn.cursor()            self.log.info('Start getting dataset list: %s', self.project_id)            return cursor.get_datasets_list(project_id=self.project_id)  

追加したオペレータを実行する DAG ファイルも追加しました。

example_bigquery_getdatasetlist_operator.py

# -*- coding: utf-8 -*-    from typing import Any    import airflow  from airflow import models    bigquery_operator = None  # type: Any  try:      from airflow.contrib.operators import bigquery_operator  except ImportError:      pass    if bigquery_operator is not None:      args = {          'owner': 'Airflow',          'start_date': airflow.utils.dates.days_ago(2)      }        dag = models.DAG(          dag_id='example_bigquery_getdatasetlist_operator', default_args=args,          schedule_interval=None)        get_dataset = bigquery_operator.BigQueryGetDatasetListOperator(          task_id='bigquery_getdatasetlist_example',          project_id='cm-da-mikami-yuki-258308',          dag=dag)  

現在、GCP の 対象プロジェクトには、airflow_testtest_s3 の2つのデータセットがある状態です。

DAG ファイルを dags ディレクトリに配置して、実行してみます。

正常に実行できたようです。

ログを確認してみると、取得したデータセットのリストが出力されています。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdatasetlist_operator/bigquery_getdatasetlist_example/2020-03-17T08:11:54.852025 00:00/1.log  [2020-03-17 08:12:05,305] {taskinstance.py:655} INFO - Dependencies all met for   [2020-03-17 08:12:05,312] {taskinstance.py:655} INFO - Dependencies all met for   [2020-03-17 08:12:05,312] {taskinstance.py:866} INFO -   --------------------------------------------------------------------------------  [2020-03-17 08:12:05,312] {taskinstance.py:867} INFO - Starting attempt 1 of 1  [2020-03-17 08:12:05,312] {taskinstance.py:868} INFO -   --------------------------------------------------------------------------------  [2020-03-17 08:12:05,322] {taskinstance.py:887} INFO - Executing  on 2020-03-17T08:11:54.852025 00:00  [2020-03-17 08:12:05,323] {standard_task_runner.py:53} INFO - Started process 12393 to run task  [2020-03-17 08:12:05,382] {logging_mixin.py:112} INFO - Running %s on host %s  ip-10-0-43-239.ap-northeast-1.compute.internal  [2020-03-17 08:12:05,399] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,399] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.  [2020-03-17 08:12:05,402] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,402] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest  [2020-03-17 08:12:05,701] {bigquery_operator.py:851} INFO - Start getting dataset list: cm-da-mikami-yuki-258308  [2020-03-17 08:12:05,703] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,703] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets?alt=json  [2020-03-17 08:12:06,214] {logging_mixin.py:112} INFO - [2020-03-17 08:12:06,213] {bigquery_hook.py:1809} INFO - Datasets List: [{'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': {'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'US'}, {'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': {'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'asia-northeast1'}]  [2020-03-17 08:12:06,225] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdatasetlist_operator, task_id=bigquery_getdatasetlist_example, execution_date=20200317T081154, start_date=20200317T081205, end_date=20200317T081206  [2020-03-17 08:12:15,303] {logging_mixin.py:112} INFO - [2020-03-17 08:12:15,303] {local_task_job.py:103} INFO - Task exited with return code 0  

以下、レスポンス部分の抜粋です。

Datasets List: [  {  'kind': 'bigquery#dataset',   'id': 'cm-da-mikami-yuki-258308:airflow_test',   'datasetReference': {  'datasetId': 'airflow_test',   'projectId': 'cm-da-mikami-yuki-258308'  },   'location': 'US'  },   {  'kind': 'bigquery#dataset',   'id': 'cm-da-mikami-yuki-258308:test_s3',   'datasetReference': {  'datasetId': 'test_s3',   'projectId': 'cm-da-mikami-yuki-258308'  },   'location': 'asia-northeast1'  }  ]  

データセットが正常に取得できました。

テーブル一覧を取得

テーブル一覧は、tables().list() API で取得できるようです。

既存コードを確認したところ、テーブル一覧取得用のメソッドはないようです。

BigQueryBaseCursor クラスに、tables().list() API をコールする get_table_list() メソッドを追加しました。

bigquery_hook.py

def get_table_list(self, dataset_id, project_id=None):            table_project_id = project_id if project_id else self.project_id            try:              tables_list = self.service.tables().list(                  projectId=table_project_id, datasetId=dataset_id).execute(num_retries=self.num_retries)              self.log.info("Tables List: %s", tables_list)            except HttpError as err:              raise AirflowException(                  'BigQuery job failed. Error was: {}'.format(err.content))            return tables_li

さらに、テーブルリスト取得用のオペレータと、オペレータを実行する DAG ファイルも追加します。

bigquery_operator.py

class BigQueryGetTableListOperator(BaseOperator):      template_fields = ('dataset_id', 'project_id')      ui_color = '#f0eee4'        @apply_defaults      def __init__(self,                   dataset_id,                   project_id=None,                   gcp_conn_id='google_cloud_default',                   delegate_to=None,                   *args, **kwargs):          self.dataset_id = dataset_id          self.project_id = project_id          self.gcp_conn_id = gcp_conn_id          self.delegate_to = delegate_to          super(BigQueryGetTableListOperator, self).__init__(*args, **kwargs)        def execute(self, context):          bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,                                 delegate_to=self.delegate_to)          conn = bq_hook.get_conn()          cursor = conn.cursor()            self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)            return cursor.get_table_list(              dataset_id=self.dataset_id,              project_id=self.project_id)  

example_bigquery_gettablelist_operator.py

# -*- coding: utf-8 -*-    from typing import Any    import airflow  from airflow import models    bigquery_operator = None  # type: Any  try:      from airflow.contrib.operators import bigquery_operator  except ImportError:      pass    if bigquery_operator is not None:      args = {          'owner': 'Airflow',          'start_date': airflow.utils.dates.days_ago(2)      }        dag = models.DAG(          dag_id='example_bigquery_gettablelist_operator', default_args=args,          schedule_interval=None)        get_dataset = bigquery_operator.BigQueryGetTableListOperator(          task_id='bigquery_gettablelist_example',          dataset_id='airflow_test',          project_id='cm-da-mikami-yuki-258308',          dag=dag)  

取得対象の airflow_test データセットには、name_[YYYY] テーブルが3つある状態です。

DAG を実行してログを確認してみます。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_gettablelist_operator/bigquery_gettablelist_example/2020-03-17T08:58:24.271847 00:00/1.log  [2020-03-17 08:58:27,986] {taskinstance.py:655} INFO - Dependencies all met for   [2020-03-17 08:58:27,994] {taskinstance.py:655} INFO - Dependencies all met for   [2020-03-17 08:58:27,994] {taskinstance.py:866} INFO -   --------------------------------------------------------------------------------  [2020-03-17 08:58:27,994] {taskinstance.py:867} INFO - Starting attempt 1 of 1  [2020-03-17 08:58:27,994] {taskinstance.py:868} INFO -   --------------------------------------------------------------------------------  [2020-03-17 08:58:28,003] {taskinstance.py:887} INFO - Executing  on 2020-03-17T08:58:24.271847 00:00  [2020-03-17 08:58:28,005] {standard_task_runner.py:53} INFO - Started process 14010 to run task  [2020-03-17 08:58:28,079] {logging_mixin.py:112} INFO - Running %s on host %s  ip-10-0-43-239.ap-northeast-1.compute.internal  [2020-03-17 08:58:28,106] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,105] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.  [2020-03-17 08:58:28,111] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,111] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest  [2020-03-17 08:58:28,417] {bigquery_operator.py:879} INFO - Start getting dataset: cm-da-mikami-yuki-258308:airflow_test  [2020-03-17 08:58:28,421] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,421] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables?alt=json  [2020-03-17 08:58:28,789] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,789] {bigquery_hook.py:1437} INFO - Tables List: {'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [{'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880'}, 'type': 'TABLE', 'creationTime': '1584434061049'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000'}, 'type': 'TABLE', 'creationTime': '1584102254437'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018'}, 'type': 'TABLE', 'creationTime': '1584434135714'}], 'totalItems': 3}  [2020-03-17 08:58:28,801] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_gettablelist_operator, task_id=bigquery_gettablelist_example, execution_date=20200317T085824, start_date=20200317T085827, end_date=20200317T085828  

取得したテーブルリストは以下です。

Tables List: {  'kind': 'bigquery#tableList',   'etag': '4rK75YI14CTPa28HhVLyEQ==',   'tables': [  {  'kind': 'bigquery#table',   'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880',   'tableReference': {  'projectId': 'cm-da-mikami-yuki-258308',   'datasetId': 'airflow_test',   'tableId': 'name_1880'  },   'type': 'TABLE',   'creationTime': '1584434061049'  },   {  'kind': 'bigquery#table',   'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000',   'tableReference': {  'projectId': 'cm-da-mikami-yuki-258308',   'datasetId': 'airflow_test',   'tableId': 'name_2000'  },   'type': 'TABLE',   'creationTime': '1584102254437'  },   {  'kind': 'bigquery#table',   'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018',   'tableReference': {  'projectId': 'cm-da-mikami-yuki-258308',   'datasetId': 'airflow_test',   'tableId': 'name_2018'  },   'type': 'TABLE',   'creationTime': '1584434135714'  }  ],   'totalItems': 3  }  

期待通り、3つのテーブルが取得できました。

テーブルデータを取得

テーブルデータを取得するためには、tabledata().list() をコールすれば良さそうです。

既存の /airflow/contrib/operators/bigquery_get_data.py ファイルにある BigQueryGetDataOperator が使えそうです。

DAG ファイルを追加して動作確認してみます。

example_bigquery_getdata_operator.py

# -*- coding: utf-8 -*-    from typing import Any    import airflow  from airflow import models    bigquery_get_data = None  # type: Any  try:      from airflow.contrib.operators import bigquery_get_data  except ImportError:      pass    if bigquery_get_data is not None:      args = {          'owner': 'Airflow',          'start_date': airflow.utils.dates.days_ago(2)      }        dag = models.DAG(          dag_id='example_bigquery_getdata_operator', default_args=args,          schedule_interval=None)        get_dataset = bigquery_get_data.BigQueryGetDataOperator(          task_id='bigquery_getdata_example',          dataset_id='airflow_test',          table_id='name_2000',          max_results='10',          dag=dag)  

完了したようなので、ログを確認してみると。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_                         getdata_operator / bigquery_getdata_example / 2020-03-17T09: 40: 56.917329   00: 00 / 1.log  [2020-03-17 09: 41: 06،239] {taskinstance.py:655} INFO - وابستگی های همه برای  ملاقات کرد  [2020-03-17 09: 41: 06،246] {taskinstance.py:655} INFO - وابستگی های همه برای   [2020-03-17 09: 41: 06،246] {taskinstance.py:866 F INFO -  -------------------------------------------------- ------------------------------  [2020-03-17 09: 41: 06،246] {taskinstance.py:867} INFO - تلاش شروع 1 از 1  [2020-03-17 09: 41: 06،247] {taskinstance.py:868} INFO -  -------------------------------------------------- ------------------------------  [2020-03-17 09: 41: 06،255] {taskinstance.py:887} INFO - اعدام  on 2020-03-17T09: 40: 56.917329   00: 00  [2020-03-17 09: 41: 06،257] {standard_task_runner.py:53} INFO - فرایند 15511 برای اجرای کار شروع شد  [2020-03-17 09: 41: 06،316] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان  ip-10-0-43-239.ap-شمال شرقی-1.compute.internal  [2020-03-17 09: 41: 06،330] {bigquery_get_data.py:92} INFO - واکشی داده ها از:  [2020-03-17 09: 41: 06،330] {bigquery_get_data.py:94} INFO - بانک اطلاعاتی: airflow_test؛ جدول: name_2000؛ حداکثر نتایج: 10  [2020-03-17 09: 41: 06،335] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 06،335] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest  [2020-03-17 09: 41: 06،676] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 06،676] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data؟maxResults=10&alt=json  [2020-03-17 09: 41: 07،286] {bigquery_get_data.py:106} INFO - تعداد ردیف های استخراج شده: 29772  [2020-03-17 09: 41: 07،297] {taskinstance.py:1048} INFO - علامت گذاری کار به عنوان SUCCESS.dag_id = مثال_bigquery_getdata_operator، task_id = bigquery_getdata_example، اجرای_date = 20200317T094056، start_date = 20200  [2020-03-17 09: 41: 16،236] {logging_mixin.py:112} INFO - [2020-03-17 09: 41: 16،236] {local_task_job.py:103} INFO - کار با کد بازگشت 0 خارج شد  کد / 

行 数 の 取得 は 実 ー ブ と と 一致 し て る る の 、 、 正常 る 実。。

در が، デ ー タ の 中 身 も 確認 し た い .. の で، متن BigQueryGetDataOperator tt> متن に 取得 し た デ ー タ を 出力 す る ロ グ を 追加 し て، 再度 実 行 し て み ま す. P>

bigquery_get_data.py

  (省略)  کلاس BigQueryGetDataOperator (BaseOperator):  省略)      اجرای اجرا (خود ، زمینه):          self.log.info ("واگذاری داده ها از:")          self.log.info ('بانک اطلاعات:٪ s؛ جدول:٪ s؛ حداکثر نتایج:٪ s'،                        self.dataset_id ، self.table_id ، self.max_results)            قلاب = BigQueryHook (bigquery_conn_id = self.bigquery_conn_id ،                              delegate_to = self.delegate_to)            conn = hook.get_conn ()          مکان نما = conn.cursor ()          پاسخ = cursor.get_tabledata (مجموعه داده_آد = self.dataset_id ،                                          table_id = self.table_id ،                                          max_results = self.max_results ،                                          select_fields = self.selected_fields)            self.log.info ('تعداد ردیف های استخراج شده:٪ s' ، پاسخ ['totalRows'])          ردیف = پاسخ ['ردیف']  # میکامی را اضافه کنید          self.log.info ('داده:٪ s' ، ردیف)            Table_data = []          برای نوع_ ردیف در ردیف ها:              single_row = []              برای زمینه های در dict_row ['f']:                  single_row.append (زمینه ها ['v'])              table_data.append (single_row)            جدول بازگشت  省略)  کد / 

今 度 は ロ グ に 取得 し デ ー タ も 出力 さ れ ま し。

  *** خواندن پرونده محلی: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:53:54.223409 00:00/1 .لاگ  [2020-03-17 09: 55: 10،380] {taskinstance.py:655} INFO - وابستگی های همه برای  ملاقات کرد  [2020-03-17 09: 55: 10،387] {taskinstance.py:655 F INFO - وابستگی های همه برای  ملاقات کرد  [2020-03-17 09: 55: 10،387] {taskinstance.py:866} INFO -  -------------------------------------------------- ------------------------------  [2020-03-17 09: 55: 10،387] {taskinstance.py:867} INFO - تلاش شروع 1 از 1  [2020-03-17 09: 55: 10،387] {taskinstance.py:868} INFO -  -------------------------------------------------- ------------------------------  [2020-03-17 09: 55: 10،396] {taskinstance.py:887} INFO - اعدام  on 2020-03-17T09: 53: 54.223409   00: 00  [2020-03-17 09: 55: 10،398] {standard_task_runner.py:53} INFO - فرایند 16017 را برای اجرای کار آغاز کرد  [2020-03-17 09: 55: 10،456] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان  ip-10-0-43-239.ap-شمال شرقی-1.compute.internal  [2020-03-17 09: 55: 10،469] {bigquery_get_data.py:92} INFO - واکشی داده ها از:  [2020-03-17 09: 55: 10،469] {bigquery_get_data.py:94} INFO - بانک اطلاعاتی: airflow_test؛ جدول: name_2000؛ حداکثر نتایج: 10  [2020-03-17 09: 55: 10،474] {logging_mixin.py:112} INFO - [2020-03-17 09: 55: 10،474] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest  [2020-03-17 09: 55: 10،796] {logging_mixin.py:112} INFO - [2020-03-17 09: 55: 10،796] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data؟maxResults=10&alt=json  [2020-03-17 09: 55: 12،168] {bigquery_get_data.py:106} INFO - تعداد کل ردیف های استخراج شده: 29772  [2020-03-17 09: 55: 12،168] {bigquery_get_data.py:109} INFO - Data: [{'f': [{'v': 'Emily'}، {'v': 'F'}، {'v': '25956'}]} ، {'f': [{'v': 'Hannah'}، {'v': 'F'}، {'v': '23082'}]}، {'f': [{'v': 'Madison'}، {'v': 'F'}، {'v': '19968'}]}، {'f': [{'v': ' اشلی '} ، {' v ':' F '} ، {' v ':' 17997 '}]} ، {' f ': [{' v ':' Sarah '}، {' v ':' F ' } ، {'v': '17702'}]} ، f f ': [{' v ':' Alexis '}، {' v ':' F '}، {' v ':' 17629 '}] } ، {'f': [{'v': 'Samantha'} ، {'v': 'F'}، {'v': '17265'}]}، {'f': [{'v' : 'Jessica'}، {'v': 'F'}، {'v': '15709'}]}، {'f': [{'v': 'Elizabeth'}، {'v': ' F '}، {' v ':' 15099 '}]}، f f': [{'v': 'Taylor'}، {'v': 'F'}، {v ':' 15078 ' }]}]  [2020-03-17 09: 55: 12،180] {taskinstance.py:1048} INFO - علامت گذاری وظیفه به عنوان SUCCESS.dag_id = مثال_bigquery_getdata_operator، task_id = bigquery_getdata_example، اجرای_date = 20200317T095354، start_date = 20200317T  کد / 
  data: [  {'f': [{'v': 'Emily'}، {'v': 'F'}، {'v': '25956'}]}،  {'f': [{'v': 'Hannah'}، {'v': 'F'}، {'v': '23082'}]}،  {'f': [{'v': 'Madison'}، {'v': 'F'}، {'v': '19968'}]}،  {'f': [{'v': 'Ashley'}، {'v': 'F'}، {'v': '17997'}]}،  {'f': [{'v': 'Sarah'}، {'v': 'F'}، {'v': '17702'}]}،  {'f': [{'v': 'Alexis'}، {'v': 'F'}، {'v': '17629'}]}،  {'f': [{'v': 'Samantha'}، {'v': 'F'}، {'v': '17265'}]}،  {'f': [{'v': 'Jessica'}، {'v': 'F'}، {'v': '15709'}]}،  {'f': [{'v': 'Elizabeth'}، {'v': 'F'}، {'v': '15099'}]}،  {'f': [{'v': 'Taylor'}، {'v': 'F'}، {'v': '15078'}]  ]  کد / 

こ の API は カ ラ ム 名 で は よ き が 、 で す が 、 ひ と ま ず 、 テ

در نتیجه ردیف های تکراری بازنمایی مبتنی بر REST از این داده ، یک سری از اشیاء JSON f ، v را برای نشان دادن زمینه ها و مقادیر به کار می برد.

テ ー ブ ル に デ ー タ を insert / update / حذف

BigQuery 対 し て 、 DAG か ら 自由 に SQL ク エ リ を 実 行 す る に は ど う す る の と 思 っ て 調 た と こ こ tt BigQueryOperator SQL を 渡 し て あ げ ば 実 行 で き で で す。 p

name_2000 テ ー ブ ル tt name = 'Mikami'

example_bigquery_sql_operator.py

  # - * - کدگذاری: utf-8 - * -    از تایپ واردات    واردات جریان هوا  از مدل های واردات جریان هوا    bigquery_operator = هیچ یک از انواع: هر نوع  تلاش كردن:      از airflow.contrib.operators وارد bigquery_operator می شود  به غیر از ImportError:      عبور    اگر bigquery_operator موجود نیست:      استدلال = {          "صاحب": "جریان هوا" ،          'start_date': airflow.utils.dates.days_ago (2)      }        dag = مدل.DAG (          dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان،          sched_interval = هیچ کدام)        query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) "      bigquery_sql = bigquery_operator.BigQueryOperator (          task_id = 'bigquery_sql_example'،          sql = پرس و جو ،          use_legacy_sql = نادرست ،          خنجر = خنجر)  کد / 

正常 終了 し た よ う GCP の 管理 コ ン ソ ー ル か ら デ ー タ を し し て み。

ち ゃ ん と DAG ら レ コ ー ド の 追加 が 行 わ れ ま し た。

続 い て 先 ほ ど ど درج し た レ コ ー tt を 、 name = 'Yuki' で بروزرسانی し て み ま す。

DAG を ァ イ ル の SQL を بروزرسانی ク エ リ に 変 更 し て 実 行 し て ま ま す。 p

example_bigquery_sql_operator.py

  (省略)      dag = مدل.DAG (          dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان،          sched_interval = هیچکدام)    # query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) "      query = "UPDATE` cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = "Yuki" WHERE name = "Mikami" and جنسیت = "F" و تعداد = 1 "      bigquery_sql = bigquery_operator.BigQueryOperator (          task_id = 'bigquery_sql_example'،          sql = پرس و جو ،          use_legacy_sql = نادرست ،          خنجر = خنجر)  کد / 

DAG 、 正常 終了 し た の で 、 GCP 管理 コ ン ソ ー ル か ら را انتخاب کنید ク エ リ 実 し し て デ ー タ て み。 p p。

正常 に بروزرسانی さ れ た こ と が で き ま し た。

さ ら に 、 先 ほ ど ど حذف و به روز رسانی し た レ コ ー ド を حذف し ま す。

example_bigquery_sql_operator.py

  (省略)      dag = مدل.DAG (          dag_id = 'shembull_bigquery_sql_operator'، default_args = آرگومان،          sched_interval = هیچکدام)    # query = "INSERT INTO` cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami'، 'F'، 1) "  # query = "به روز رسانی` cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and جنسیت = 'F' and count = 1 "      query = "DELETE FROM" cm-da-mikami-yuki-258308.airflow_test.name_2000` WHERE name = 'Yuki' and جنسیت = 'F' and count = 1 "      bigquery_sql = bigquery_operator.BigQueryOperator (          task_id = 'bigquery_sql_example'،          sql = پرس و جو ،          use_legacy_sql = نادرست ،          خنجر = خنجر)  کد / 

DAG 再度 行 後 、 再度 GCP 管理 コ ン ソ ー か ら ク エ エ 実 行 し て デ ー タ を し て デ ー タ を 確認 し て。 p

حذف も 、 期待 通 り 実 行 で き ま し。。

テ ー ブ ル デ ー タ を GCS に eksport す る

最後 に 、 テ ー ブ ル C ー タ を GCS に フ ァ イ ル 出力 し て み ま す。 p

/airflow/contrib/operators/bigquery_to_gcs.py BigQueryToCloudStorageOperator 、 使 え ば 、 DAG だ け 実 行 で で p p p

name_2000 テ ー ブ ル の デ ー タ を tt gs: //test-cm-mikami/test_export/exp_name_2000.txt に صادرات す る 、 の の AG ル ル、 実 行 し て み す す。 p

example_bigquery_to_gcs.py

  # - * - کدگذاری: utf-8 - * -    از تایپ واردات    واردات جریان هوا  از مدل های واردات جریان هوا    bigquery_to_gcs = نوع # هیچکدام: هر  تلاش كردن:      از airflow.contrib.operators bigquery_to_gcs را وارد می کنید  به غیر از ImportError:      عبور    اگر bigquery_to_gcs نباشد:      استدلال = {          "صاحب": "جریان هوا" ،          'start_date': airflow.utils.dates.days_ago (2)      }        dag = مدل.DAG (          dag_id = 'shembull_bigquery_to_gcs_operator'، default_args = آرگومان،          sched_interval = هیچکدام)        get_dataset = bigquery_to_gcs.BigQueryToCloudStorageOperator (          task_id = 'bigquery_to_gcs_example'،          Source_project_dataset_table = 'cm-da-mikami-yuki-258308.airflow_test.name_2000'،          destination_cloud_storage_uris = 'gs: //test-cm-mikami/test_export/exp_name_2000.txt'،          خنجر = خنجر)  کد / 

実 行 前 、 GCS の 対 象 パ ス に は 何 も フ ァ ァ イ ル が い い 状態 で す。

DAG 行 が 正常 に 完了 し こ と を 確認 し て

  *** خواندن پرونده محلی: /home/ec2-user/airflow/logs/example_bigquery_to_gcs_operator/bigquery_to_gcs_example/2020-03-17T11:34:24.397858 00:00/1 .لاگ  [2020-03-17 11: 34: 42،238] {taskinstance.py:655} INFO - وابستگی های همه برای  on 2020-03-17T11: 34: 24.397858   00: 00  [2020-03-17 11: 34: 42،257] {standard_task_runner.py:53} INFO - فرایند 19929 را برای اجرای کار آغاز کرد  [2020-03-17 11: 34: 42،315] {logging_mixin.py:112} INFO -٪ s در حال اجرا٪ s میزبان  ip-10-0-43-239.ap-شمال شرقی-1.compute.internal  [2020-03-17 11: 34: 42،329] {bigquery_to_gcs.py:93} INFO - اجرای عصاره cm-da-mikami-yuki-258308.airflow_test.name_2000 به: gs: // test-cm-mikami / test_export / ارزان_name_2000.txt  [2020-03-17 11: 34: 42،334] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 42،334] {کشف.py:275} INFO - URL درخواست شده: دریافت https: / /www.googleapis.com/discovery/v1/apis/bigquery/v2/rest  [2020-03-17 11: 34: 42،645] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 42،644] {کشف.py:894 F INFO - URL درخواست شده: POST https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs؟alt=json  [2020-03-17 11: 34: 43،677] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 43،677] {کشف.py:894} INFO - URL درخواست شده: دریافت https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY؟location=US&alt=json  [2020-03-17 11: 34: 44،198] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 44،198] {bigquery_hook.py:1347} INFO - در انتظار کار برای تکمیل: cm- da-mikami-yuki-258308، job_NptEvLRIZj6NzF6O8fy2mgtoMlKY  [2020-03-17 11: 34: 49،204] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 49،203] {کشف.py:894} INFO - URL مورد درخواست: GET https: / /bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY؟location=US&alt=json  [2020-03-17 11: 34: 49،493] {taskinstance.py:1048} INFO - علامت گذاری وظیفه به عنوان SUCCESS.dag_id = مثال_bigquery_to_gcs_operator، task_id = bigquery_to_gcs_example، operation_date = 20200317T113424، start_date = 20200317T113424، start_date = 20200317T113424، start2date_ 203  [2020-03-17 11: 34: 52،237] {logging_mixin.py:112} INFO - [2020-03-17 11: 34: 52،236] {local_task_job.py:103} INFO - کار با کد بازگشت 0 خارج شد  کد / 

GCP ら コ ン ソ ー ル か ら GCS の フ ァ イ ル を 確認 し て み と と

フ ァ イ ル が 出力 さ れ て い ま す。

出力 フ ァ イ ル を ダ し ロ ー ド し て 中 身 を 確認 し て み と と

  نام ، جنس ، تعداد  لیزت ، اف ، 256  مونسرات ، ف ، 256  کالین ، ف ، 256  جولیان ، ف ، 512  تایلر ، اف ، 512  کیلا ، ف ، 13312  مدیسون ، اف ، 19968  کیرا ، ف ، 257  رندی ، ف ، 257  Reanna، F، 257  اینگرید ، ف ، 257  ارین ، ف ، 257  الکسوس ، اف ، 1281  省略)  کد / 

صادرات p ー ー ブ ル ー タ が 正常 に صادرات さ れ た こ と が 確認 で き ま し。。

ま と め (所 感 h

در جریان هوا の BigQuery مشاهده 関 連 の オ ペ レ ー タ で は، BigQuery مشاهده API を 使 っ て BigQuery مشاهده に ア ク セ ス し て い て، 実 際 に ど の API を コ ー ル し て い る の か は متن bigquery_hook.py tt> متن を 確認 す れ ば 良 さ そ うで す。

ま た 、 既存 に は な い 処理 も 、 BigQuery API の 仕 様 を 確認 し な が ら 自由 に 実 装 で き。 ppp

他 に tt BashOperator gcloud CLI も 簡 単 に 実 行 で き る の 、 fl جریان هوا 経 由 で の BigQuery 関 連 の 操作 に る こ こ p p p p p p

参考

٪٪ مورد_read_more_button ٪٪