背景
通过实时计算满足实时统计
数据流向
kafka–>spark streaming–>stream load–>StarRocks
环境准备
-
创建topic:spark_demo1_src
./bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 4 --topic spark_demo1_src
-
模拟数据脚本
demo1_data_gen.py
#!/bin/python
# Copyright (c) 2020 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import random
import time
def genUid(s = 10000):
return random.randint(1,s)
def getSite():
site_scope = ['https://www.starrocks.com/', 'https://trial.starrocks.com/', 'https://docs.starrocks.com/']
idx = random.randint(0,len(site_scope) -1 )
return site_scope[idx]
def getTm():
delay_jitter = random.randint(-1800, 0)
chance = random.randint(0,3)
return long(time.time() + delay_jitter * chance)
"""
{uid:1, site: https://www.starrocks.com/, time: 1621410635}
"""
def gen():
data = """{ "uid":%d, "site": "%s", "time": %s } """ % (genUid(), getSite(), getTm())
return data
def main():
lines = random.randint(1,long(sys.argv[1]) )
for x in range(lines):
data = gen()
print(data)
if __name__ == '__main__':
main()
gen_data.sh
#!/bin/bash
# Copyright (c) 2020 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
echo "Usage: $0 interval lines topicName"
interval=$1
lines=$2
topic=$3
echo "Sending time data to ${topic:=starrocks_t1_src} every ${interval:=15} seconds..."
while true ; do
python demo1_data_gen.py $lines| ./bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic spark_demo1_src
sleep "${interval:=15}"
done
-
zeppline安装
本次测试使用docker,非docker部署可参考网上文档部署
docker pull apache/zeppelin:0.9.0 #拉取镜像,可配置proxy加速
"""挂载本地方式启动,一直没搞成功
docker run -p 8089:8080 -v /Users/dmzgxl/opt/zep:/opt/zeppelin -v /Users/dmzgxl/opt/zep/logs:/logs -v /Users/dmzgxl/opt/zep/notebooks:/notebook -e ZEPPELIN_LOG_DIR='/logs' -e ZEPPELIN_NOTEBOOK_DIR='/notebook' -v /etc/localtime:/etc/localtime -v /Users/dmzgxl/opt/zep/deps:/deps --rm -d --name zeppelin apache/zeppelin:0.9.0; sleep 10; open http://localhost:8089
"""
#启动zeppelin
docker run -d --name zeppelin -p 8089:8080 apache/zeppelin:0.9.0#可挂载本地目录到docker,方便加载一些依赖,搞得过程中加了挂载后一直没办法起来,所以采取下面方法实现了
#会依赖jar包连接mysql,需要下载插件
wget https://cdn.mysql.com//archives/mysql-connector-java-5.1/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
cd mysql-connector-java-5.1.46
#上传jar包到容器
docker cp mysql-connector-java-5.1.46-bin.jar $(CONTAINER ID):/opt/zeppelin
代码构建
Fork https://github.com/StarRocks/demo.git到自己的仓库,例如https://github.com/LittleBeeBee/demo.git
方法一:主机可访问外网的方式
mkdir project
cd project
git clone https://github.com/LittleBeeBee/demo.git
cd demo/SparkDemo
#编译,会生成target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean scala:compile compile package
方法二:主机无法访问外网的方式
本地主机:自己的电脑 服务器:线上的机器
本地主机编译打包好上传到服务器
- 在服务器新建repo
mkdir project/git/starrocks-demo.git -p
cd project/git/starrocks-demo.git
git init --bare
- 本地关联远程git repo
git clone https://github.com/LittleBeeBee/demo.git
cd demo/
git checkout -b dev #新建dev分支
git remote add ssh://jingdan@doris-sandbox04:/home/disk2/jingdan/project/git/starrocks-demo.git
cd SparkDemo
mvn clean scala:compile compile package
git add *
git commit -m "compile"
git push upstream master
- 服务器拉取代码
cd project
git clone ssh://jingdan@starrocks-sandbox04:/home/disk2/jingdan/project/git/starrocks-demo.git
cd starrocks-demo/SparkDemo
生成测试数据
启动生成数据
cd /home/disk1/starrocks/thirdparty/kafka_2.12-2.5.0/script
sh gen_data.sh 2 10 spark_demo1_src
启动spark程序
cd starrocks-demo/SparkDemo
java -cp target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.starrocks.spark.SparkStreaming2starrocks
查看starrocks数据是否写入
mysql> select * from demo1_spark_tb0 limit 10;
+---------------------------+------------+------+--------+------+
| site | date | hour | minute | uv |
+---------------------------+------------+------+--------+------+
| https://docs.starrocks.com/ | 2021-07-22 | 19 | 56 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 19 | 58 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 1 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 4 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 5 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 7 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 8 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 9 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 11 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 | 20 | 12 | NULL |
+---------------------------+------------+------+--------+------+
可视化demo
打开http://localhost:8089