StarRocks的慢SQL换行处理
--2022-03-16 春雷
1、前言
StarRocks的慢SQL有的出现了换行符的情况,之前采取 流式慢SQL采集 的时候,没有注意到此情况,导致SQL采集结果进行了截断,需要兼容此种SQL换行的情况。
注:
如果需要查看流水采集的实现方式,请查看之前的文章:
2、具体
2.1、问题具体现象
流式采集的慢SQL存在SQL阶段的问题
【慢SQL文件里面的表现为】:
2.2、处理
【更改filebeat的配置文件】:
/xxx/conf/filebeat.yml
filebeat.inputs:
-
type: log
enabled: true
ignore_older: 5m
multiline.pattern: ‘^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}’
multiline.negate: true
multiline.match: after
include_lines: [‘slow_query’]
paths:- /xxx/fe/log/fe.audit.log
fields:
log_topics: xxx_slow_log
processors:
-
script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str = event.Get(“message”);
var slow_time = str.substr(0, 19);
var query_type = str.substr(25,10);
var detail_query = str.substr(38);
var js_arr = detail_query.split("|");
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace(‘Client=’,’’);
var Client_tmp3 = Client_tmp2.replace(‘t=’,’’);
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace(‘User=default_cluster:’,’’);
var Db_tmp = js_arr[2];
var Db = Db_tmp.replace(‘Db=default_cluster:’,’’);
var State_tmp = js_arr[3];
var State = State_tmp.replace(‘State=’,’’);
var Time_tmp = js_arr[4];
var Time = Time_tmp.replace(‘Time=’,’’);
var ScanBytes_tmp = js_arr[5];
var ScanBytes = ScanBytes_tmp.replace(‘ScanBytes=’,’’);
var ScanRows_tmp = js_arr[6];
var ScanRows = ScanRows_tmp.replace(‘ScanRows=’,’’);
var ReturnRows_tmp = js_arr[7];
var ReturnRows = ReturnRows_tmp.replace(‘ReturnRows=’,’’);
var StmtId_tmp = js_arr[8];
var StmtId = StmtId_tmp.replace(‘StmtId=’,’’);
var QueryId_tmp = js_arr[9];
var QueryId = QueryId_tmp.replace(‘QueryId=’,’’);
var IsQuery_tmp = js_arr[10];
var IsQuery = IsQuery_tmp.replace(‘IsQuery=’,’’);
var feIp_tmp = js_arr[11];
var feIp = feIp_tmp.replace(‘feIp=’,’’);
var Stmt_tmp = js_arr[12];
var Stmt = Stmt_tmp.replace(‘Stmt=’,’’);
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[13];
var Digest = Digest_tmp.replace(‘Digest=’,’’);
event.Put(“query_type”,query_type);
event.Put(“slow_time”,slow_time);
event.Put(“Client”,Client);
event.Put(“User”,User);
event.Put(“Db”,Db);
event.Put(“State”,State);
event.Put(“Time”,Time);
event.Put(“ScanBytes”,ScanBytes);
event.Put(“ScanRows”,ScanRows);
event.Put(“ReturnRows”,ReturnRows);
event.Put(“StmtId”,StmtId);
event.Put(“QueryId”,QueryId);
event.Put(“IsQuery”,IsQuery);
event.Put(“feIp”,feIp);
event.Put(“Stmt”,Stmt);
event.Put(“Digest”,Digest);
event.Put(“starrocks_fe_ip”,‘feip’);
}
-
drop_fields:
fields: [“ecs”,“agent”,“message”,“log”,“host”]
【解释】:
-
multiline.pattern
- 指定用于匹配多行的正则表达式
-
multiline.negate:
- 定义模式是否被否定。默认false
-
multiline.match:
- 指定Filebeat如何把多行合并成一个事件。可选的值是 after 或者 before
【查看kafka数据】:
【查看结果】: