人工智能:让生活更美好
百度 报道称,其他钢铁出口国也在极力争取美国总统唐纳德·特朗普本月宣布的钢铝关税的豁免权,他们大都避免就该关税计划与特朗普公开争执,而马尔姆斯特伦在布鲁塞尔与华盛顿紧张关系日益加剧之际将直接赶赴美国。Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.
For more information about the task visit Dataplex production documentation <Product documentation
Create a Task?
Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.
A simple task configuration can look as followed:
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
With this configuration we can create the task both synchronously & asynchronously:
DataplexCreateTaskOperator
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
Delete a task?
To delete a task you can use:
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
List tasks?
To list tasks you can use:
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
Get a task?
To get a task you can use:
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
Wait for a task?
To wait for a task created asynchronously you can use:
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
Create a Lake?
Before you create a dataplex lake you need to define its body.
For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.
A simple task configuration can look as followed:
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
With this configuration we can create the lake:
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
Delete a lake?
To delete a lake you can use:
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
Create or update a Data Quality scan?
Before you create a Dataplex Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Dataplex create data quality API.
A simple Data Quality scan configuration can look as followed:
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = {
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
With this configuration we can create or update the Data Quality scan:
DataplexCreateOrUpdateDataQualityScanOperator
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
Get a Data Quality scan?
To get a Data Quality scan you can use:
DataplexGetDataQualityScanOperator
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Delete a Data Quality scan?
To delete a Data Quality scan you can use:
DataplexDeleteDataQualityScanOperator
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Run a Data Quality scan?
You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:
DataplexRunDataQualityScanOperator
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
To check that running Dataplex Data Quality scan succeeded you can use:
DataplexDataQualityJobStatusSensor
.
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
Get a Data Quality scan job?
To get a Data Quality scan job you can use:
DataplexGetDataQualityScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Create a zone?
Before you create a Dataplex zone you need to define its body.
For more information about the available fields to pass when creating a zone, visit Dataplex create zone API.
A simple zone configuration can look as followed:
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
With this configuration we can create a zone:
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
Delete a zone?
To delete a zone you can use:
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
)
Create a asset?
Before you create a Dataplex asset you need to define its body.
For more information about the available fields to pass when creating a asset, visit Dataplex create asset API.
A simple asset configuration can look as followed:
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
With this configuration we can create the asset:
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
Delete a asset?
To delete a asset you can use:
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)