【详述】python 生成数据通过kafkaproducer 发送 数据 StarRocks解析数据 错误
【背景】做过哪些操作?
【业务影响】
【StarRocks版本】2.1.3
Ds.py
#--coding:utf-8--
import datetime
import random
import json
import time
import string
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer=KafkaProducer(bootstrap_servers=[‘10.240.102.4:9092’,‘10.240.102.6:9092’,‘10.240.102.10:9092’],value_serializer=lambda m:json.dumps
(m).encode(‘GBK’))
topic=‘kafka’
def simulate_passenger():
#模拟客流数据,关键字段有车站id,车站名,发生时间,设备id,客流量,进站还是出站
try:
n=0
while True:
sample_string = ‘abcdefghigjlmnopqrstuvwxtz’
dic={}
dic[‘stationId’]=n
dic[‘time’]=datetime.datetime.now().strftime("%Y%m%d")
dic[‘stationName’]=‘S1’
dic[‘equipmentId’]=random.randint(100,400)
dic[‘passengerNum’]=random.randint(0,100)
dic[‘inOrOut’]=random.randint(0,1)
producer.send(topic=topic,value=dic)
time.sleep(2)
n=n+1
print(dic)
print(n)
except KafkaError as e:
print(e)
finally:
producer.close()
if name==‘main’:
simulate_passenger()
生成json 数据无误
StarRocks 消费出错!
CREATE TABLE ods_user_log (
stationId varchar(65533) NOT NULL COMMENT “车站ID”,
time datetime NOT NULL COMMENT “到站时间”,
stationName bigint(20) NOT NULL COMMENT “车站名称”,
equipmentId varchar(65533) NULL COMMENT “设备ID”,
passengerNum varchar(65533) NULL COMMENT “客流量”,
inOrOut varchar(65533) NULL COMMENT “进站还是出站”
) ENGINE=OLAP
PRIMARY KEY(stationId, time)
COMMENT “OLAP”
DISTRIBUTED BY HASH(stationId) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);
CREATE ROUTINE LOAD load_kafka_sr_k ON ods_user_log
COLUMNS(stationId,time,stationName,equipmentId,passengerNum,inOrOut)
PROPERTIES
(
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“max_batch_rows” = “300000”,
“max_batch_size” = “209715200”,
“strict_mode” = “false”,
“format” = “json”
)FROM KAFKA
(
“kafka_broker_list”= “10.240.102.4:9092,10.240.102.6:9092,10.240.102.10:9092”,
“kafka_topic” = “kafka”,
“property.kafka_default_offsets” = “OFFSET_END”,
“property.enable.auto.commit”=“True”

