创建 ROUTINE 任务
1## 文档 https://docs.starrocks.io/zh/docs/introduction/StarRocks_intro/#routine-load
2CREATE ROUTINE LOAD test_iceberg.test1 ON test_iceberg
3COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted),
4// where可选,可以预过滤iceberg文件,同时也会传递给Starrocks be做过滤,因此这里的字段应该同时存在于Starrocks于iceberg表中
5WHERE event_time > "2022-01-01 00:00:00"
6PROPERTIES
7(
8"desired_concurrent_number" = "10",
9"max_error_number" = "1000"
10)
11FROM ICEBERG
12(
13"iceberg_catalog_type"="EXTERNAL_CATALOG",
14"iceberg_database" = "your_iceberg_database",
15"iceberg_table" = "your_iceberg_table",
16// 可选,可以预过滤iceberg文件。与where不同之处是,不会传递给Starrocks be做过滤,因此这里的字段只要存在于iceberg表中即可
17"iceberg_where_expr" = "event_time > '2022-01-01 00:00:00'",
18// 可选值为FROM_EARLIEST或FROM_LATEST
19"iceberg_consume_position" = "FROM_LATEST",
20// 可选,在iceberg_consume_position=FROM_EARLIEST时生效。对于iceberg v2表,强烈建议配置这个参数,除非需要完全从头开始读
21"property.read_iceberg_snapshots_after_timestamp" = "1673595411640",
22// 可选,默认2G
23"property.plan_split_size" = "2147483648"
24)
25// 可选,非必填,默认不写时通过be直接访问hdfs
26// WITH BROKER "broker";
27
28
29CREATE ROUTINE LOAD xxxx.test_iceberg ON xxx_iceberg
30COLUMNS (partition_time = from_unixtime(process_time), wuid, pv_id, pos_id, adgroup_id, process_time, ad_filters, filtered_flow_control_rules,position_id)
31PROPERTIES
32(
33"desired_concurrent_number" = "10",
34"max_error_number" = "1000"
35)
36FROM ICEBERG
37(
38"iceberg_catalog_type"="EXTERNAL_CATALOG",
39"iceberg_database" = "ams_diagnose_iceberg",
40"iceberg_table" = "targeting_search_track_log",
41"iceberg_consume_position" = "FROM_LATEST",
42"property.read_iceberg_snapshots_after_timestamp" = "1742392952000",
43"property.plan_split_size" = "2147483648"
44)
查询 ROUTINE
1SHOW ROUTINE LOAD FROM datacube;
2
3# https://iwiki.woa.com/p/4009120977?from=iWiki_search
4# https://iwiki.woa.com/p/4008662947
5# https://iwiki.woa.com/p/4007423099?from=iWiki_search
6# https://help.aliyun.com/zh/emr/emr-serverless-starrocks/user-guide/routine-load
修改 ROUTINE
1ALTER ROUTINE LOAD FOR test_iceberg.test1
2FROM ICEBERG
3(
4 "property.plan_split_size" = "536870912"
5);
6
7RESUME ROUTINE LOAD FOR datacube.test_kv_iceberg;
8PAUSE ROUTINE LOAD FOR datacube.test_kv_iceberg;
9STOP ROUTINE LOAD FOR datacube.test_iceberg_ts;