自定义GPSS客户端开发流程
- 1.GPSS是什么
- 2.架构
- 3.组件下载安装
- 4.自定义客户端
- 4.1 GPSS Batch Data API Service Definition
- 4.2 Setting up a Java Development Environment
- 4.3 Generating the Batch Data API Client Classes
- 4.4 Coding the GPSS Batch Data Client
- 4.4.1 Connect to the GPSS Server
- 4.4.2 Connect to Greenplum Database
- 4.4.3 Retrieve Greenplum schema and table information
- 4.4.4 Prepare a Greenplum table for writing
- 4.4.5 Write data to the Greenplum table
- 5.总结
1.GPSS是什么
Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。
- Greenplum Stream Server包括gpss命令行工具。运行gpss时,会启动一个gpss实例,此实例无限期地等待客户端数据。
- Greenplum Stream Server还包括gpsscli命令行工具,这是一个客户端工具,用于向GPSS实例提交数据加载作业并管理这些作业。
2.架构
Greenplum Stream Server是一个gRPC服务器。GPSS gRPC服务定义的内容包括:连接到Greenplum数据库和检查Greenplum元数据所需的操作和消息格式;数据从客户端写入greenplum数据库表所需的操作和消息格式。有关gRPC内容参考:https://grpc.io/docs/
gpsscli命令行工具是Greenplum Stream Server的gRPC客户端工具,也可以用于操作Greenplum-Kafka 集成和Greenplum-Informatica连接器。可以使用GPSS API开发自己的GPSS gRPC客户端。Greenplum Stream Server架构如下图:
Greenplum Stream Server 处理ETL任务的执行流程如下所示:
- 用户通过客户端应用程序启动一个或多个ETL加载作业;
- 客户端应用程序使用gRPC协议向正在运行的GPSS服务实例提交和启动数据加载作业;
- GPSS服务实例将每个加载请求事务提交给Greenplum集群的Master节点,并创建或者重用已存在外部表来存储数据。
- GPSS服务实例将客户端提交的数据直接写到Greenplum集群Segment节点中。
GPSS存在的问题
同一个Greenplum Stream Server实例,当有多个客户端同时向一张表写入数据时,客户端会出现连接不稳定情况,这里需要额外做处理。一个客户端向一张表写入数据,不会存在该问题。
3.组件下载安装
组件官网下载《地址》可根据安装的数据库版本进行下载,本次以gpss-gpdb6-1.6.0-rhel7-x86_64.gppkg
安装包为例进行说明:
-- 版本信息查询
SELECT "version"()
-- 结果数据
PostgreSQL 9.4.24
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source)
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16
GPSS安装这里不再赘述,可查看《GPSS扩展安装启用配置启动实例》。
4.自定义客户端
《官方GPSS开发文档》已不再提供1.6及以下版本的在线数据,可下载pdf格式查看【实测在线1.7跟1.6是一样的】:
4.1 GPSS Batch Data API Service Definition
GPSS批处理数据API服务定义如下。将内容复制/粘贴到一个名为gpss的文件中。并注意文件系统的位置。
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
package api;
option java_multiple_files = true;
// Connect service Request message
message ConnectRequest {
string Host = 1; // Host address of Greenplum master; must be accessible from gpss server system
int32 Port = 2; // Greenplum master port
string Username = 3; // User or role name that gpss uses to access Greenplum
string Password = 4; // User password
string DB = 5; // Database name
bool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
}
// Connect service Response message
message Session {
string ID = 1; // Id of client connection to gpss
}
// Operation mode
enum Operation {
Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
Merge = 1; // Insert and Update
Update = 2; // Update the value of "UpdateColumns" if "MatchColumns" match
Read = 3; // Not supported
}
// Required parameters of the Insert operation
message InsertOption {
repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
bool TruncateTable = 2; // Truncate table before loading?
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Update operation
message UpdateOption {
repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or not
repeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns match
string Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
repeated string InsertColumns = 1;
repeated string MatchColumns = 2;
repeated string UpdateColumns = 3;
string Condition = 4;
int64 ErrorLimitCount = 5;
int32 ErrorLimitPercentage = 6;
}
// Open service Request message
message OpenRequest {
Session Session = 1; // Session ID returned by Connect
string SchemaName = 2; // Name of the Greenplum Database schema
string TableName = 3; // Name of the Greenplum Database table
string PreSQL = 4; // SQL to execute before gpss loads the data
string PostSQL = 5; // SQL to execute after gpss loads the data
int32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supported
string Encoding = 7; // Encoding of text data; not supported
string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table
oneof Option { // Identify the type of write operation to perform
InsertOption InsertOption = 100;
UpdateOption UpdateOption = 101;
MergeOption MergeOption = 102;
}
}
message DBValue {
oneof DBType {
int32 Int32Value = 1;
int64 Int64Value = 2;
float Float32Value = 5;
double Float64Value = 6;
string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
bytes BytesValue = 8;
google.protobuf.Timestamp TimeStampValue = 10; // Time without timezone
google.protobuf.NullValue NullValue = 11;
}
}
message Row {
repeated DBValue Columns = 1;
}
message RowData {
bytes Data = 1; // A single protobuf-encoded Row
}
// Write service Request message
message WriteRequest {
Session Session = 1;
repeated RowData Rows = 2; // The data to load into the target table
}
// Close service Response message
message TransferStats { // Status of the data load operation
int64 SuccessCount = 1; // Number of rows successfully loaded
int64 ErrorCount = 2; // Number of error lines if Errorlimit is not reached
repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}
// Close service Request message
message CloseRequest {
Session session = 1;
int32 MaxErrorRows = 2; // -1: returns all, 0: nothing, >0: max rows
bool Abort = 3;
}
// ListSchema service request message
message ListSchemaRequest {
Session Session = 1;
}
message Schema {
string Name = 1;
string Owner = 2;
}
// ListSchema service response message
message Schemas {
repeated Schema Schemas = 1;
}
// ListTable service request message
message ListTableRequest {
Session Session = 1;
string Schema = 2; // 'public' is the default if no Schema is provided
}
// DescribeTable service request message
message DescribeTableRequest {
Session Session = 1;
string SchemaName = 2;
string TableName = 3;
}
enum RelationType {
Table = 0;
View = 1;
Index = 2;
Sequence = 3;
Special = 4;
Other = 255;
}
message TableInfo {
string Name = 1;
RelationType Type = 2;
}
// ListTable service response message
message Tables {
repeated TableInfo Tables = 1;
}
// DescribeTable service response message
message Columns {
repeated ColumnInfo Columns = 1;
}
message ColumnInfo {
string Name = 1; // Column name
string DatabaseType = 2; // Greenplum data type
bool HasLength = 3; // Contains length information?
int64 Length = 4; // Length if HasLength is true
bool HasPrecisionScale = 5; // Contains precision or scale information?
int64 Precision = 6;
int64 Scale = 7;
bool HasNullable = 8; // Contains Nullable constraint?
bool Nullable = 9;
}
service Gpss {
// Establish a connection to Greenplum Database; returns a Session object
rpc Connect(ConnectRequest) returns (Session) {}
// Disconnect, freeing all resources allocated for a session
rpc Disconnect(Session) returns (google.protobuf.Empty) {}
// Prepare and open a table for write
rpc Open(OpenRequest) returns(google.protobuf.Empty) {}
// Write data to table
rpc Write(WriteRequest) returns(google.protobuf.Empty) {}
// Close a write operation
rpc Close(CloseRequest) returns(TransferStats) {}
// List all available schemas in a database
rpc ListSchema(ListSchemaRequest) returns (Schemas) {}
// List all tables and views in a schema
rpc ListTable(ListTableRequest) returns (Tables) {}
// Decribe table metadata(column name and column type)
rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}
官方给出了数据类型的对应关系:
4.2 Setting up a Java Development Environment
Java 开发环境主要是 JDK 和 gRPC 的代码生成插件,这里以 Maven 为例,也可以从官方文档给出的 git 地址进行拉取:
<!--以下为 GPSS 使用依赖的版本-->
<grpc.version>1.16.1</grpc.version>
<protobuf.version>3.6.1</protobuf.version>
<os.maven.plugin>1.6.2</os.maven.plugin>
<protobuf.maven.plugin>0.6.1</protobuf.maven.plugin>
<!--以下为 GPSS 使用的依赖-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!--以下为 GPSS 使用的插件-->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.maven.plugin}</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
4.3 Generating the Batch Data API Client Classes
使用4.2的依赖和插件即可生成api类:
4.4 Coding the GPSS Batch Data Client
这里是实际的编程阶段,要注意的是GPSS插件安装之后要启动服务。
4.4.1 Connect to the GPSS Server
配置类:
@Configuration
@PropertySource(value = "classpath:gpss.properties")
public class GPSSConfiguration {
@Value("${gpss_host}")
public String gpssHost;
@Value("${gpss_port}")
public int gpssPort;
}
连接工具类:
@Component
public class GPSSManagement {
@Autowired
private GPSSConfiguration configuration;
private ManagedChannel channel = null;
private GpssGrpc.GpssBlockingStub bStub = null;
/**
* 获取bStub
*
* @return GpssGrpc.GpssBlockingStub 对象
*/
public GpssGrpc.GpssBlockingStub getBlockingStub() {
channel = ManagedChannelBuilder.forAddress(configuration.gpssHost, configuration.gpssPort)
.usePlaintext(true)
.build();
bStub = GpssGrpc.newBlockingStub(channel);
return bStub;
}
/**
* 获取session
*
* @return Session对象
*/
public Session getSession() {
ConnectRequest connRequest = ConnectRequest.newBuilder()
.setHost(configuration.gpHost)
.setPort(configuration.gpPort)
.setUsername(configuration.gpUsername)
.setPassword(configuration.gpPassword)
.setDB(configuration.gpDatabase)
.setUseSSL(false)
.build();
return bStub.connect(connRequest);
}
/**
* 断开连接
*
* @param session 要断开的session对象
*/
public void closeConnection(Session session) {
bStub.disconnect(session);
}
/**
* 断开通道
*
* @param time 断开等待时间
* @throws InterruptedException 可能出现的异常
*/
public void closeChannel(long time) throws InterruptedException {
if (time < 10) {
time = 10L;
}
channel.shutdown().awaitTermination(time, TimeUnit.SECONDS);
}
}
4.4.2 Connect to Greenplum Database
这里贴出官方的代码片段,连接数据库的代码会整合到GPSSUtill
类内:
Session mSession = null;
String gpMasterHost = "localhost";
Integer gpMasterPort = 15432;
String gpRoleName = "gpadmin";
String gpPasswd = "changeme";
String dbname = "testdb";
// create a connect request builder
ConnectRequest connReq = ConnectRequest.newBuilder()
.setHost(gpMasterHost)
.setPort(gpMasterPort)
.setUsername(gpRoleName)
.setPassword(gpPasswd)
.setDB(dbname)
.setUseSSL(false)
.build();
// use the blocking stub to call the Connect service
mSession = bStub.connect(connReq);
// (placeholder) do greenplum stuff here
// use the blocking stub to call the Disconnect service
bStub.disconnect(mSession);
4.4.3 Retrieve Greenplum schema and table information
这里贴出官方的代码片段,获取数据库schema和表信息的代码会整合到GPSSUtill
类内,获取全部schema:
import java.util.ArrayList;
import java.util.List;
// create a list schema request builder
ListSchemaRequest lsReq = ListSchemaRequest.newBuilder()
.setSession(mSession)
.build();
// use the blocking stub to call the ListSchema service
List<Schema> listSchema = bStub.listSchema(lsReq).getSchemasList();
// extract the name of each schema and save in an array
ArrayList<String> schemaNameList = new ArrayList<String>();
for(Schema s : listSchema) {
schemaNameList.add(s.getName());
}
获取schema下的表:
// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);
// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()
.setSession(mSession)
.setSchema(schemaName)
.build();
// use the blocking stub to call the ListTable service
List<TableInfo> tblList = bStub.listTable(ltReq).getTablesList();
// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
if(ti.getTypeValue() == RelationType.Table_VALUE) {
tblNameList.add(ti.getName());
}
}
获取表的字段信息【代码内要根据字段信息对数据进行校验和格式转换】:
// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);
// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
.build();
// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();
// print the name and type of each column
for(ColumnInfo ci : columnList) {
String colname = ci.getName();
String dbtype = ci.getDatabaseType();
// display the column name and type to stdout
System.out.println( "column " + colname + " type: " + dbtype );
}
4.4.4 Prepare a Greenplum table for writing
这里贴出官方的代码片段,写入表准备的代码会整合到GPSSUtill
类内:
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct)
.setTruncateTable(false)
.addInsertColumns("loantitle")
.addInsertColumns("riskscore")
.addInsertColumns("d2iratio")
.build();
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
//.setPreSQL("")
//.setPostSQL("")
//.setEncoding("")
.setTimeout(5)
//.setStagingSchema("")
.setInsertOption(iOpt)
.build();
// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);
// (placeholder) write data here
// create a close request builder
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder()
.setSession(mSession)
//.setMaxErrorRows(15)
//.setAbort(true)
.build();
// use the blocking stub to call the Close service
tStats = bStub.close(cReq);
// display the result to stdout
System.out.println( "CloseRequest tStats: " + tStats.toString() );
4.4.5 Write data to the Greenplum table
这里贴出官方的代码片段,写入表的代码会整合到GPSSUtill
类内【包含meger和update操作代码】:
// create an array of rows
ArrayList<RowData> rows = new ArrayList<>();
for (int row = 0; row < 2; row++) {
// create a row builder
api.Row.Builder builder = api.Row.newBuilder();
// create builders for each column, in order, and set values - text, int, text
api.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();
colbuilder1.setStringValue("xxx");
builder.addColumns(colbuilder1.build());
api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();
colbuilder2.setInt32Value(77);
builder.addColumns(colbuilder2.build());
api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();
colbuilder3.setStringValue("yyy");
builder.addColumns(colbuilder3.build());
// build the row
RowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row
rows.add(rowbuilder.build());
}
// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder()
.setSession(mSession)
.addAllRows(rows)
.build();
// use the blocking stub to call the Write service; it returns nothing
bStub.write(wReq);
GPSSUtil
类代码:
@Component
public class GPSSUtil {
@Autowired
private GPSSManagement gpssManagement;
public void insertData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
.build();
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
InsertOption.Builder builderInsert = InsertOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct)
.setTruncateTable(false);
// columnNameList.forEach(builderInsert::addInsertColumns);
columnList.forEach(columnInfo -> {
builderInsert.addInsertColumns(columnInfo.getName());
// builderInsert.addExpressions("(jdata->>'" + columnInfo.getName() + "')::" + columnInfo.getDatabaseType());
});
InsertOption insertOpt = builderInsert.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
// .setEncoding("utf-8")
.setTimeout(5)
.setInsertOption(insertOpt)
.build();
gpssStub.open(openReq);
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
.setSession(session)
.addAllRows(rows)
.build();
//use the blocking stub to call the write service it returns nothing
gpssStub.write(writeReq);
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
.setSession(session)
.build();
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
gpssManagement.closeConnection(session);
try {
gpssManagement.closeChannel(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
}
public void updateData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
.build();
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
UpdateOption.Builder updateOption = UpdateOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct);
columnNameList.forEach(columnName -> {
if ("id".equalsIgnoreCase(columnName)) {
updateOption.addMatchColumns(columnName);
} else {
updateOption.addUpdateColumns(columnName);
}
});
UpdateOption updateOpt = updateOption.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
.setEncoding("utf-8")
.setTimeout(5)
.setUpdateOption(updateOpt)
.build();
gpssStub.open(openReq);
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
.setSession(session)
.addAllRows(rows)
.build();
//use the blocking stub to call the write service it returns nothing
gpssStub.write(writeReq);
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
.setSession(session)
.build();
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
gpssManagement.closeConnection(session);
try {
gpssManagement.closeChannel(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
}
public void mergeData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
.build();
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
MergeOption.Builder mergeOption = MergeOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct).addMatchColumns("id");
columnNameList.forEach(columnName -> {
mergeOption.addInsertColumns(columnName);
mergeOption.addUpdateColumns(columnName);
});
MergeOption mergeOpt = mergeOption.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(insertTableName)
.setEncoding("utf-8")
.setTimeout(5)
.setMergeOption(mergeOpt)
.build();
gpssStub.open(openReq);
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
.setSession(session)
.addAllRows(rows)
.build();
//use the blocking stub to call the write service it returns nothing
gpssStub.write(writeReq);
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
.setSession(session)
.build();
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
gpssManagement.closeConnection(session);
try {
gpssManagement.closeChannel(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
}
/**
* 格式化数据
*
* @param rows 格式化后的数据
* @param columnList 目标表的字段名称和类型
* @param data 入库数据
*/
private void formatRows(List<RowData> rows, List<ColumnInfo> columnList, List<Map<Object, Object>> data) {
// addColumns
for (Map<Object, Object> insertRow : data) {
// builder
Row.Builder builder = Row.newBuilder();
// format
columnList.forEach(columnInfo -> {
String name = columnInfo.getName();
Object valueObject = insertRow.get(name);
String databaseType = columnInfo.getDatabaseType();
if (valueObject != null) {
String value = valueObject.toString();
if ("VARCHAR".equals(databaseType)) {
builder.addColumns(DBValue.newBuilder().setStringValue(value));
} else if ("INT2".equalsIgnoreCase(databaseType)) {
builder.addColumns(DBValue.newBuilder().setInt32Value(Integer.parseInt(value)));
}
} else {
if ("VARCHAR".equals(databaseType)) {
builder.addColumns(DBValue.newBuilder().setStringValue(""));
} else if ("INT2".equalsIgnoreCase(databaseType)) {
builder.addColumns(DBValue.newBuilder().setInt32Value(-1));
}
}
});
// builder the row
RowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row
rows.add(rowBuilder.build());
}
}
}
5.总结
官方文档还算是比较详细的,但是仅给出核心代码,实际使用时要写的代码还是挺多的。