原先的配置
[INFO] StarRocksSourceBeReader [open Scan params.mem_limit 8589934592 B]
[INFO] StarRocksSourceBeReader [open Scan params.query-timeout-s 600 s]
[INFO] StarRocksSourceBeReader [open Scan params.keep-alive-min 100 min]
[INFO] StarRocksSourceBeReader [open Scan params.batch_size 1000]
错误
程序读取starrocks跑10分钟左右报错
Caused by: java.lang.RuntimeException: Failed to get next from be -> ip:[172.24.5.172] CANCELLED msg:[Set cancelled by MemoryScratchSinkOperator]
解决方案
原因:是因为参数params.query-timeout-s 设置600秒,导致未读取完数据,直接取消了。
修改后的配置
[open Scan params.mem_limit 8589934592 B]
[open Scan params.query-timeout-s 6000 s]
[open Scan params.keep-alive-min 100 min]
[open Scan params.batch_size 1000]
各个参数说明
/**
* StarRocks Source
* @return
*/
public static StarRocksSourceOptions createStarRocksSourceOptions(String db,String tableName){
StarRocksSourceOptions.Builder builder = StarRocksSourceOptions.builder()
.withProperty("connector", SR_SOURCE_CONNECTOR)
.withProperty("scan-url", SR_SOURCE_SCAN_URL)
.withProperty("jdbc-url", SR_SOURCE_JDBC_URL)
.withProperty("username", SR_SOURCE_USERNAME)
.withProperty("password", SR_SOURCE_PASSWORD)
//BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)。104857600:100M
.withProperty("scan.params.mem-limit-byte","10737418240")
//数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务。
.withProperty("scan.params.query-timeout-s","2592000")
//Flink 连接器连接 StarRocks 集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错。
.withProperty("scan.connect.timeout-ms","2592000")
//数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于 5。
.withProperty("scan.params.keep-alive-min","8")
//数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错。
.withProperty("scan.max-retries","100")
.withProperty("table-name",tableName)
.withProperty("database-name",db);
return builder.build();