Kafka To StarRocks

【详述】python 生成数据通过kafkaproducer 发送 数据 StarRocks解析数据 错误
【背景】做过哪些操作?
【业务影响】
【StarRocks版本】2.1.3

Ds.py
#--coding:utf-8--
import datetime
import random
import json
import time
import string

from kafka import KafkaProducer
from kafka.errors import KafkaError
producer=KafkaProducer(bootstrap_servers=[‘10.240.102.4:9092’,‘10.240.102.6:9092’,‘10.240.102.10:9092’],value_serializer=lambda m:json.dumps
(m).encode(‘GBK’))
topic=‘kafka’

def simulate_passenger():
#模拟客流数据,关键字段有车站id,车站名,发生时间,设备id,客流量,进站还是出站
try:
n=0
while True:
sample_string = ‘abcdefghigjlmnopqrstuvwxtz’
dic={}
dic[‘stationId’]=n
dic[‘time’]=datetime.datetime.now().strftime("%Y%m%d")
dic[‘stationName’]=‘S1’
dic[‘equipmentId’]=random.randint(100,400)
dic[‘passengerNum’]=random.randint(0,100)
dic[‘inOrOut’]=random.randint(0,1)
producer.send(topic=topic,value=dic)
time.sleep(2)
n=n+1
print(dic)
print(n)
except KafkaError as e:
print(e)
finally:
producer.close()

if name==‘main’:
simulate_passenger()

生成json 数据无误


StarRocks 消费出错!

CREATE TABLE ods_user_log (
stationId varchar(65533) NOT NULL COMMENT “车站ID”,
time datetime NOT NULL COMMENT “到站时间”,
stationName bigint(20) NOT NULL COMMENT “车站名称”,
equipmentId varchar(65533) NULL COMMENT “设备ID”,
passengerNum varchar(65533) NULL COMMENT “客流量”,
inOrOut varchar(65533) NULL COMMENT “进站还是出站”
) ENGINE=OLAP
PRIMARY KEY(stationId, time)
COMMENT “OLAP”
DISTRIBUTED BY HASH(stationId) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

CREATE ROUTINE LOAD load_kafka_sr_k ON ods_user_log
COLUMNS(stationId,time,stationName,equipmentId,passengerNum,inOrOut)
PROPERTIES
(
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“max_batch_rows” = “300000”,
“max_batch_size” = “209715200”,
“strict_mode” = “false”,
“format” = “json”
)FROM KAFKA
(
“kafka_broker_list”= “10.240.102.4:9092,10.240.102.6:9092,10.240.102.10:9092”,
“kafka_topic” = “kafka”,
“property.kafka_default_offsets” = “OFFSET_END”,
“property.enable.auto.commit”=“True”

你好,我们是starrocks哈,如果是starrocks问题,请把标题和问题中的doris字段修改,谢谢。
报错是因为stationName定义的bigint,但是数据源是字符串,类型强制转换为Null了。

改了,用得是StarRocks 习惯性写了 Ds了 哈哈哈
但是为啥又解析多出一个0 呢

你分析得没有错误 确实似乎SR字段类型不匹配强制转换导致后面一系列问题。提个建议 这种错误类型 直接跟数据库一样抛出 字段类型不匹配 能节省很多问题排查时间,这个问题花了一个下午都没找出来,异常提示明确点 使用成本会低很多

感谢建议,主要是strict_mode设置为false了,设置为true的情况下应该会报错类型不匹配。

最后一个字段是表示主键模型的写入语义,默认upsert为0,delete为1