Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。
- Greenplum Stream Server包括gpss命令行工具。运行gpss时,会启动一个gpss实例,此实例无限期地等待客户端数据。
- Greenplum Stream Server还包括gpsscli命令行工具,这是一个客户端工具,用于向GPSS实例提交数据加载作业并管理这些作业。
Greenplum Stream Server是一个gRPC服务器。GPSS gRPC服务定义的内容包括:连接到Greenplum数据库和检查Greenplum元数据所需的操作和消息格式;数据从客户端写入greenplum数据库表所需的操作和消息格式。有关gRPC内容参考:https://grpc.io/docs/
gpsscli命令行工具是Greenplum Stream Server的gRPC客户端工具,也可以用于操作Greenplum-Kafka 集成和Greenplum-Informatica连接器。可以使用GPSS API开发自己的GPSS gRPC客户端。Greenplum Stream Server架构如下图:
Greenplum Stream Server 处理ETL任务的执行流程如下所示:
- 用户通过客户端应用程序启动一个或多个ETL加载作业;
- 客户端应用程序使用gRPC协议向正在运行的GPSS服务实例提交和启动数据加载作业;
- GPSS服务实例将每个加载请求事务提交给Greenplum集群的Master节点,并创建或者重用已存在外部表来存储数据。
- GPSS服务实例将客户端提交的数据直接写到Greenplum集群Segment节点中。
同一个Greenplum Stream Server实例,当有多个客户端同时向一张表写入数据时,客户端会出现连接不稳定情况,这里需要额外做处理。一个客户端向一张表写入数据,不会存在该问题。
-- 版本信息查询
SELECT "version"()
-- 结果数据
PostgreSQL 9.4.24
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source)
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16
4.1 GPSS Batch Data API Service Definition
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
package api;
option java_multiple_files = true;
// Connect service Request message
message ConnectRequest {
string Host = 1; // Host address of Greenplum master; must be accessible from gpss server system
int32 Port = 2; // Greenplum master port
string Username = 3; // User or role name that gpss uses to access Greenplum
string Password = 4; // User password
string DB = 5; // Database name
bool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
// Connect service Response message
message Session {
string ID = 1; // Id of client connection to gpss
// Operation mode
enum Operation {
Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
Merge = 1; // Insert and Update
Update = 2; // Update the value of "UpdateColumns" if "MatchColumns" match
Read = 3; // Not supported
// Required parameters of the Insert operation
message InsertOption {
repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
bool TruncateTable = 2; // Truncate table before loading?
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
// Required parameters of the Update operation
message UpdateOption {
repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or not
repeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns match
string Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
repeated string InsertColumns = 1;
repeated string MatchColumns = 2;
repeated string UpdateColumns = 3;
string Condition = 4;
int64 ErrorLimitCount = 5;
int32 ErrorLimitPercentage = 6;
// Open service Request message
message OpenRequest {
Session Session = 1; // Session ID returned by Connect
string SchemaName = 2; // Name of the Greenplum Database schema
string TableName = 3; // Name of the Greenplum Database table
string PreSQL = 4; // SQL to execute before gpss loads the data
string PostSQL = 5; // SQL to execute after gpss loads the data
int32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supported
string Encoding = 7; // Encoding of text data; not supported
string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table
oneof Option { // Identify the type of write operation to perform
InsertOption InsertOption = 100;
UpdateOption UpdateOption = 101;
MergeOption MergeOption = 102;
message DBValue {
oneof DBType {
int32 Int32Value = 1;
int64 Int64Value = 2;
float Float32Value = 5;
double Float64Value = 6;
string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
bytes BytesValue = 8;
google.protobuf.Timestamp TimeStampValue = 10; // Time without timezone
google.protobuf.NullValue NullValue = 11;
message Row {
repeated DBValue Columns = 1;
message RowData {
bytes Data = 1; // A single protobuf-encoded Row
// Write service Request message
message WriteRequest {
Session Session = 1;
repeated RowData Rows = 2; // The data to load into the target table
// Close service Response message
message TransferStats { // Status of the data load operation
int64 SuccessCount = 1; // Number of rows successfully loaded
int64 ErrorCount = 2; // Number of error lines if Errorlimit is not reached
repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
// Close service Request message
message CloseRequest {
Session session = 1;
int32 MaxErrorRows = 2; // -1: returns all, 0: nothing, >0: max rows
bool Abort = 3;
// ListSchema service request message
message ListSchemaRequest {
Session Session = 1;
message Schema {
string Name = 1;
string Owner = 2;
// ListSchema service response message
message Schemas {
repeated Schema Schemas = 1;
// ListTable service request message
message ListTableRequest {
Session Session = 1;
string Schema = 2; // 'public' is the default if no Schema is provided
// DescribeTable service request message
message DescribeTableRequest {
Session Session = 1;
string SchemaName = 2;
string TableName = 3;
enum RelationType {
Table = 0;
View = 1;
Index = 2;
Sequence = 3;
Special = 4;
Other = 255;
message TableInfo {
string Name = 1;
RelationType Type = 2;
// ListTable service response message
message Tables {
repeated TableInfo Tables = 1;
// DescribeTable service response message
message Columns {
repeated ColumnInfo Columns = 1;
message ColumnInfo {
string Name = 1; // Column name
string DatabaseType = 2; // Greenplum data type
bool HasLength = 3; // Contains length information?
int64 Length = 4; // Length if HasLength is true
bool HasPrecisionScale = 5; // Contains precision or scale information?
int64 Precision = 6;
int64 Scale = 7;
bool HasNullable = 8; // Contains Nullable constraint?
bool Nullable = 9;
service Gpss {
// Establish a connection to Greenplum Database; returns a Session object
rpc Connect(ConnectRequest) returns (Session) {}
// Disconnect, freeing all resources allocated for a session
rpc Disconnect(Session) returns (google.protobuf.Empty) {}
// Prepare and open a table for write
rpc Open(OpenRequest) returns(google.protobuf.Empty) {}
// Write data to table
rpc Write(WriteRequest) returns(google.protobuf.Empty) {}
// Close a write operation
rpc Close(CloseRequest) returns(TransferStats) {}
// List all available schemas in a database
rpc ListSchema(ListSchemaRequest) returns (Schemas) {}
// List all tables and views in a schema
rpc ListTable(ListTableRequest) returns (Tables) {}
// Decribe table metadata(column name and column type)
rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
4.2 Setting up a Java Development Environment
Java 开发环境主要是 JDK 和 gRPC 的代码生成插件,这里以 Maven 为例,也可以从官方文档给出的 git 地址进行拉取:
<!--以下为 GPSS 使用依赖的版本-->
<!--以下为 GPSS 使用的依赖-->
<!--以下为 GPSS 使用的插件-->
4.3 Generating the Batch Data API Client Classes
4.4 Coding the GPSS Batch Data Client
4.4.1 Connect to the GPSS Server
@PropertySource(value = "classpath:gpss.properties")
public class GPSSConfiguration {
public String gpssHost;
public int gpssPort;
public class GPSSManagement {
private GPSSConfiguration configuration;
private ManagedChannel channel = null;
private GpssGrpc.GpssBlockingStub bStub = null;
* 获取bStub
* @return GpssGrpc.GpssBlockingStub 对象
public GpssGrpc.GpssBlockingStub getBlockingStub() {
channel = ManagedChannelBuilder.forAddress(configuration.gpssHost, configuration.gpssPort)
bStub = GpssGrpc.newBlockingStub(channel);
return bStub;
* 获取session
* @return Session对象
public Session getSession() {
ConnectRequest connRequest = ConnectRequest.newBuilder()
return bStub.connect(connRequest);
* 断开连接
* @param session 要断开的session对象
public void closeConnection(Session session) {
* 断开通道
* @param time 断开等待时间
* @throws InterruptedException 可能出现的异常
public void closeChannel(long time) throws InterruptedException {
if (time < 10) {
time = 10L;
channel.shutdown().awaitTermination(time, TimeUnit.SECONDS);
4.4.2 Connect to Greenplum Database
Session mSession = null;
String gpMasterHost = "localhost";
Integer gpMasterPort = 15432;
String gpRoleName = "gpadmin";
String gpPasswd = "changeme";
String dbname = "testdb";
// create a connect request builder
ConnectRequest connReq = ConnectRequest.newBuilder()
// use the blocking stub to call the Connect service
mSession = bStub.connect(connReq);
// (placeholder) do greenplum stuff here
// use the blocking stub to call the Disconnect service
4.4.3 Retrieve Greenplum schema and table information
import java.util.ArrayList;
import java.util.List;
// create a list schema request builder
ListSchemaRequest lsReq = ListSchemaRequest.newBuilder()
// use the blocking stub to call the ListSchema service
List<Schema> listSchema = bStub.listSchema(lsReq).getSchemasList();
// extract the name of each schema and save in an array
ArrayList<String> schemaNameList = new ArrayList<String>();
for(Schema s : listSchema) {
// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);
// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()
// use the blocking stub to call the ListTable service
List<TableInfo> tblList = bStub.listTable(ltReq).getTablesList();
// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
if(ti.getTypeValue() == RelationType.Table_VALUE) {
// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);
// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();
// print the name and type of each column
for(ColumnInfo ci : columnList) {
String colname = ci.getName();
String dbtype = ci.getDatabaseType();
// display the column name and type to stdout
System.out.println( "column " + colname + " type: " + dbtype );
4.4.4 Prepare a Greenplum table for writing
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder()
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
// use the blocking stub to call the Open service; it returns nothing
// (placeholder) write data here
// create a close request builder
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder()
// use the blocking stub to call the Close service
tStats = bStub.close(cReq);
// display the result to stdout
System.out.println( "CloseRequest tStats: " + tStats.toString() );
4.4.5 Write data to the Greenplum table
// create an array of rows
ArrayList<RowData> rows = new ArrayList<>();
for (int row = 0; row < 2; row++) {
// create a row builder
api.Row.Builder builder = api.Row.newBuilder();
// create builders for each column, in order, and set values - text, int, text
api.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();
api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();
api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();
// build the row
RowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row
// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder()
// use the blocking stub to call the Write service; it returns nothing
public class GPSSUtil {
private GPSSManagement gpssManagement;
public void insertData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
InsertOption.Builder builderInsert = InsertOption.newBuilder()
// columnNameList.forEach(builderInsert::addInsertColumns);
columnList.forEach(columnInfo -> {
// builderInsert.addExpressions("(jdata->>'" + columnInfo.getName() + "')::" + columnInfo.getDatabaseType());
InsertOption insertOpt = builderInsert.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
// .setEncoding("utf-8")
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
//use the blocking stub to call the write service it returns nothing
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
try {
} catch (InterruptedException e) {
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
public void updateData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
UpdateOption.Builder updateOption = UpdateOption.newBuilder()
columnNameList.forEach(columnName -> {
if ("id".equalsIgnoreCase(columnName)) {
} else {
UpdateOption updateOpt = updateOption.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
//use the blocking stub to call the write service it returns nothing
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
try {
} catch (InterruptedException e) {
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
public void mergeData(String insertTableName, List<Map<Object, Object>> data) {
String schemaName = "public";
int size = data.size();
long start = System.currentTimeMillis();
// 获取连接及session
GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
Session session = gpssManagement.getSession();
// 根据入库表名称获取表结构
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());
// 封装入库信息并开启请求
// 错误个数,错误百分比
int errLimit = 20, errPct = 20;
MergeOption.Builder mergeOption = MergeOption.newBuilder()
columnNameList.forEach(columnName -> {
MergeOption mergeOpt = mergeOption.build();
// 开启请求
OpenRequest openReq = OpenRequest.newBuilder()
// 格式化对象
List<RowData> rows = new ArrayList<>();
formatRows(rows, columnList, data);
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
//use the blocking stub to call the write service it returns nothing
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
try {
} catch (InterruptedException e) {
long end = System.currentTimeMillis();
long use = end - start;
System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");
* 格式化数据
* @param rows 格式化后的数据
* @param columnList 目标表的字段名称和类型
* @param data 入库数据
private void formatRows(List<RowData> rows, List<ColumnInfo> columnList, List<Map<Object, Object>> data) {
// addColumns
for (Map<Object, Object> insertRow : data) {
// builder
Row.Builder builder = Row.newBuilder();
// format
columnList.forEach(columnInfo -> {
String name = columnInfo.getName();
Object valueObject = insertRow.get(name);
String databaseType = columnInfo.getDatabaseType();
if (valueObject != null) {
String value = valueObject.toString();
if ("VARCHAR".equals(databaseType)) {
} else if ("INT2".equalsIgnoreCase(databaseType)) {
} else {
if ("VARCHAR".equals(databaseType)) {
} else if ("INT2".equalsIgnoreCase(databaseType)) {
// builder the row
RowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row