文章目录
- 前言
- 1.启动SQL gateway
- 2.打开session
- 3.执行flink SQL
- 4.查看执行结果
- 5.获取operationHandle的status
- 6.注意事项
- 7.官方链接
前言
flink 1.16版本中发布了一个新功能–SQL gateway,本篇文章就来实践测试下该功能。
1.启动SQL gateway
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
2.打开session
curl --request POST http://localhost:8083/v1/sessions
{"sessionHandle":"94588a47-3509-42b7-98f9-0ff89cc6dbd5"}
//Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value.
curl --request POST http://localhost:8083/v1/sessions/${sessionHandle}/heartbeat
3.执行flink SQL
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data '{"statement": "SET execution.checkpointing.interval = 3s"}'
{"operationHandle":"24ae4332-1f9c-4bd7-9684-b9150dae4108"}
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data "{\"statement\": \"CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector'='mysql-cdc','hostname'='localhost','port'='3306','username'='root','password'='123456','database-name'='mydb','table-name'='products' ,'server-time-zone' = 'Asia/Shanghai')\"}"
{"operationHandle":"63fa431d-632e-4b52-82fd-666d575c26b1"}
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data "{\"statement\": \"CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456', 'database-name' = 'mydb','table-name' = 'orders','server-time-zone' = 'Asia/Shanghai')\"}"
{"operationHandle":"b32dd3c3-2c28-442a-8f96-ea5a160f2144"}%
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data "{\"statement\": \"CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH ('connector' = 'postgres-cdc','hostname' = 'localhost','port' = '5432','username' = 'postgres','password' = 'postgres','database-name' = 'postgres','schema-name' = 'public','table-name' = 'shipments')\"}"
{"operationHandle":"c0802732-04ea-424a-a154-43e6ff69f00f"}%
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data "{\"statement\": \"CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders' )\"}"
{"operationHandle":"299a1096-710f-4c6b-a3da-3d9f2beed178"}%
curl --request POST http://localhost:8083/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/statements/ --data "{\"statement\": \"INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id\"}"
{"operationHandle":"398cf8c4-87df-4306-963e-3b54e7779af7"}%
4.查看执行结果
curl --request GET http://localhost:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0
5.获取operationHandle的status
curl --request GET http://localhost:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/status
{"status":"FINISHED"}
6.注意事项
- SQL语句只能单句执行,复杂SQL按照以下修改:
curl --request POST http://localhost:8083/v1/sessions/${sessionHandle}/statements/ --data "{\"statement\": \"XXXXXX\"}"
- 在根据operationHandle获取其执行结果时需要注意:返回的结果中nextResultUri如果不为空,可以继续获取,具体如下:
//第一次获取执行结果
{
"results": {
"columns": [
{
"name": "result",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"data": [
{
"kind": "INSERT",
"fields": [
"OK"
]
}
]
},
"resultType": "PAYLOAD",
"nextResultUri": "/v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/operations/ba3c15b0-3ed4-4505-9d32-74942536b557/result/1"
}
//nextResultUri不为空再次获取的结果
//curl --request GET http://localhost:8083//v1/sessions/046f8ffc-1ec6-4126-ab4a-667c9725554f/operations/ba3c15b0-3ed4-4505-9d32-74942536b557/result/1
{
"results": {
"columns": [
{
"name": "result",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"data": []
},
"resultType": "EOS",
"nextResultUri": null
}
- SQL gateway在启动是可以修改以下配置
- 配置中最重要的就是session时间设置,如果使用curl则需要将时间设置长一些,session的默认时间是1分钟
- 通过接口查看SQL执行结果时,如果返回结果包含错误信息,那么返回的结果是没有经过格式化的报错,看起来费劲,此时可以直接查看SQL gateway的日志
- 1.16版本中在执行flink sql时只能单句执行
- 使用curl命令传参数时需要注意单、双引号问题
7.官方链接
flink confluence
flink官方文档
flink cdc链接