这是官方样例:
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%starrocks%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
官方样例中指出:
只支持衍生例和where,是否支持在数据写入starRocks表之前,进行一遍sql过滤,就类似于sqoop里面的sql过滤功能…使数据的提取更加灵活