GPSS【实践 01】Developing a Greenplum Streaming Server Client 自定义GPSS客户端开发实例

news2025/1/13 15:41:30

自定义GPSS客户端开发流程

    • 1.GPSS是什么
    • 2.架构
    • 3.组件下载安装
    • 4.自定义客户端
      • 4.1 GPSS Batch Data API Service Definition
      • 4.2 Setting up a Java Development Environment
      • 4.3 Generating the Batch Data API Client Classes
      • 4.4 Coding the GPSS Batch Data Client
        • 4.4.1 Connect to the GPSS Server
        • 4.4.2 Connect to Greenplum Database
        • 4.4.3 Retrieve Greenplum schema and table information
        • 4.4.4 Prepare a Greenplum table for writing
        • 4.4.5 Write data to the Greenplum table
    • 5.总结

1.GPSS是什么

Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。

  • Greenplum Stream Server包括gpss命令行工具。运行gpss时,会启动一个gpss实例,此实例无限期地等待客户端数据。
  • Greenplum Stream Server还包括gpsscli命令行工具,这是一个客户端工具,用于向GPSS实例提交数据加载作业并管理这些作业。

2.架构

Greenplum Stream Server是一个gRPC服务器。GPSS gRPC服务定义的内容包括:连接到Greenplum数据库和检查Greenplum元数据所需的操作和消息格式;数据从客户端写入greenplum数据库表所需的操作和消息格式。有关gRPC内容参考:https://grpc.io/docs/

gpsscli命令行工具是Greenplum Stream Server的gRPC客户端工具,也可以用于操作Greenplum-Kafka 集成和Greenplum-Informatica连接器。可以使用GPSS API开发自己的GPSS gRPC客户端。Greenplum Stream Server架构如下图:
在这里插入图片描述

Greenplum Stream Server 处理ETL任务的执行流程如下所示:

  • 用户通过客户端应用程序启动一个或多个ETL加载作业;
  • 客户端应用程序使用gRPC协议向正在运行的GPSS服务实例提交和启动数据加载作业;
  • GPSS服务实例将每个加载请求事务提交给Greenplum集群的Master节点,并创建或者重用已存在外部表来存储数据。
  • GPSS服务实例将客户端提交的数据直接写到Greenplum集群Segment节点中。

在这里插入图片描述

GPSS存在的问题
同一个Greenplum Stream Server实例,当有多个客户端同时向一张表写入数据时,客户端会出现连接不稳定情况,这里需要额外做处理。一个客户端向一张表写入数据,不会存在该问题。

3.组件下载安装

组件官网下载《地址》可根据安装的数据库版本进行下载,本次以gpss-gpdb6-1.6.0-rhel7-x86_64.gppkg安装包为例进行说明:

-- 版本信息查询
SELECT "version"()
-- 结果数据
PostgreSQL 9.4.24 
(Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source) 
on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16

在这里插入图片描述

GPSS安装这里不再赘述,可查看《GPSS扩展安装启用配置启动实例》。

4.自定义客户端

《官方GPSS开发文档》已不再提供1.6及以下版本的在线数据,可下载pdf格式查看【实测在线1.7跟1.6是一样的】:

在这里插入图片描述

4.1 GPSS Batch Data API Service Definition

GPSS批处理数据API服务定义如下。将内容复制/粘贴到一个名为gpss的文件中。并注意文件系统的位置。

syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package api;

option java_multiple_files = true;

// Connect service Request message
message ConnectRequest {
  string Host = 1;      // Host address of Greenplum master; must be accessible from gpss server system
  int32 Port = 2;       // Greenplum master port
  string Username = 3;  // User or role name that gpss uses to access Greenplum 
  string Password = 4;  // User password
  string DB = 5;        // Database name
  bool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL
}

// Connect service Response message
message Session {
  string ID = 1;  // Id of client connection to gpss
}

// Operation mode
enum Operation {
  Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
  Merge = 1;   // Insert and Update
  Update = 2;  // Update the value of "UpdateColumns" if "MatchColumns" match
  Read = 3;    // Not supported
}

// Required parameters of the Insert operation
message InsertOption {
  repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
  bool TruncateTable = 2;               // Truncate table before loading?
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Update operation
message UpdateOption {
  repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or not
  repeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns match
  string Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
  repeated string InsertColumns = 1;
  repeated string MatchColumns = 2;
  repeated string UpdateColumns = 3;
  string Condition = 4;
  int64 ErrorLimitCount = 5;
  int32 ErrorLimitPercentage = 6;
}

// Open service Request message
message OpenRequest {
  Session Session = 1;      // Session ID returned by Connect
  string SchemaName = 2;    // Name of the Greenplum Database schema
  string TableName = 3;     // Name of the Greenplum Database table
  string PreSQL = 4;        // SQL to execute before gpss loads the data
  string PostSQL = 5;       // SQL to execute after gpss loads the data
  int32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supported
  string Encoding = 7;      // Encoding of text data; not supported
  string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table

  oneof Option {            // Identify the type of write operation to perform
    InsertOption InsertOption = 100;
    UpdateOption UpdateOption = 101;
    MergeOption MergeOption = 102;
  }
}

message DBValue {
  oneof DBType {
    int32 Int32Value = 1;
    int64 Int64Value = 2;
    float Float32Value = 5;
    double Float64Value = 6;
    string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
    bytes BytesValue = 8;
    google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezone
    google.protobuf.NullValue NullValue = 11;
  }
}

message Row {
  repeated DBValue Columns = 1;
}

message RowData {    
  bytes Data = 1;     // A single protobuf-encoded Row
}

// Write service Request message
message WriteRequest {
  Session Session = 1;
  repeated RowData Rows = 2;     // The data to load into the target table
}

// Close service Response message
message TransferStats {          // Status of the data load operation
  int64 SuccessCount = 1;        // Number of rows successfully loaded
  int64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reached
  repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}

// Close service Request message
message CloseRequest {
  Session session = 1;
  int32 MaxErrorRows = 2;        // -1: returns all, 0: nothing, >0: max rows
  bool Abort = 3;
}

// ListSchema service request message
message ListSchemaRequest {
  Session Session = 1;
}

message Schema {
  string Name = 1;
  string Owner = 2;
}

// ListSchema service response message
message Schemas {
  repeated Schema Schemas = 1;
}

// ListTable service request message
message ListTableRequest {
  Session Session = 1;
  string Schema = 2;    // 'public' is the default if no Schema is provided
}

// DescribeTable service request message
message DescribeTableRequest {
  Session Session = 1;
  string SchemaName = 2;
  string TableName = 3;
}

enum RelationType {
  Table = 0;
  View = 1;
  Index = 2;
  Sequence = 3;
  Special = 4;
  Other = 255;
}

message TableInfo {
  string Name = 1;
  RelationType Type = 2;
}

// ListTable service response message
message Tables {
  repeated TableInfo Tables = 1;
}

// DescribeTable service response message
message Columns {
  repeated ColumnInfo Columns = 1;
}

message ColumnInfo {
  string Name = 1;            // Column name
  string DatabaseType = 2;    // Greenplum data type

  bool HasLength = 3;         // Contains length information?
  int64 Length = 4;           // Length if HasLength is true

  bool HasPrecisionScale = 5; // Contains precision or scale information?
  int64 Precision = 6;
  int64 Scale = 7;

  bool HasNullable = 8;       // Contains Nullable constraint?
  bool Nullable = 9;
}

service Gpss {
  // Establish a connection to Greenplum Database; returns a Session object
  rpc Connect(ConnectRequest) returns (Session) {}

  // Disconnect, freeing all resources allocated for a session
  rpc Disconnect(Session) returns (google.protobuf.Empty) {}

  // Prepare and open a table for write
  rpc Open(OpenRequest) returns(google.protobuf.Empty) {}

  // Write data to table
  rpc Write(WriteRequest) returns(google.protobuf.Empty) {}

  // Close a write operation
  rpc Close(CloseRequest) returns(TransferStats) {}

  // List all available schemas in a database
  rpc ListSchema(ListSchemaRequest) returns (Schemas) {}

  // List all tables and views in a schema
  rpc ListTable(ListTableRequest) returns (Tables) {}

  // Decribe table metadata(column name and column type)
  rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}

官方给出了数据类型的对应关系:

在这里插入图片描述

4.2 Setting up a Java Development Environment

Java 开发环境主要是 JDK 和 gRPC 的代码生成插件,这里以 Maven 为例,也可以从官方文档给出的 git 地址进行拉取:

        <!--以下为 GPSS 使用依赖的版本-->
        <grpc.version>1.16.1</grpc.version>
        <protobuf.version>3.6.1</protobuf.version>
        <os.maven.plugin>1.6.2</os.maven.plugin>
        <protobuf.maven.plugin>0.6.1</protobuf.maven.plugin>
        
		<!--以下为 GPSS 使用的依赖-->
		<dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-all</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <!--以下为 GPSS 使用的插件-->
           <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>${protobuf.maven.plugin}</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>
                        io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}
                    </pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

4.3 Generating the Batch Data API Client Classes

使用4.2的依赖和插件即可生成api类:

在这里插入图片描述

4.4 Coding the GPSS Batch Data Client

这里是实际的编程阶段,要注意的是GPSS插件安装之后要启动服务。

4.4.1 Connect to the GPSS Server

配置类:

@Configuration
@PropertySource(value = "classpath:gpss.properties")
public class GPSSConfiguration {
    @Value("${gpss_host}")
    public String gpssHost;
    @Value("${gpss_port}")
    public int gpssPort;
}

连接工具类:

@Component
public class GPSSManagement {
    @Autowired
    private GPSSConfiguration configuration;

    private ManagedChannel channel = null;
    private GpssGrpc.GpssBlockingStub bStub = null;

    /**
     * 获取bStub
     *
     * @return GpssGrpc.GpssBlockingStub 对象
     */
    public GpssGrpc.GpssBlockingStub getBlockingStub() {
        channel = ManagedChannelBuilder.forAddress(configuration.gpssHost, configuration.gpssPort)
                .usePlaintext(true)
                .build();
        bStub = GpssGrpc.newBlockingStub(channel);
        return bStub;
    }

    /**
     * 获取session
     *
     * @return Session对象
     */
    public Session getSession() {
        ConnectRequest connRequest = ConnectRequest.newBuilder()
                .setHost(configuration.gpHost)
                .setPort(configuration.gpPort)
                .setUsername(configuration.gpUsername)
                .setPassword(configuration.gpPassword)
                .setDB(configuration.gpDatabase)
                .setUseSSL(false)
                .build();
        return bStub.connect(connRequest);
    }

    /**
     * 断开连接
     *
     * @param session 要断开的session对象
     */
    public void closeConnection(Session session) {
        bStub.disconnect(session);
    }

    /**
     * 断开通道
     *
     * @param time 断开等待时间
     * @throws InterruptedException 可能出现的异常
     */
    public void closeChannel(long time) throws InterruptedException {
        if (time < 10) {
            time = 10L;
        }
        channel.shutdown().awaitTermination(time, TimeUnit.SECONDS);
    }
}

4.4.2 Connect to Greenplum Database

这里贴出官方的代码片段,连接数据库的代码会整合到GPSSUtill类内:

Session mSession = null;
String gpMasterHost = "localhost";
Integer gpMasterPort = 15432;
String gpRoleName = "gpadmin";
String gpPasswd = "changeme";
String dbname = "testdb";

// create a connect request builder
ConnectRequest connReq = ConnectRequest.newBuilder()
    .setHost(gpMasterHost)
    .setPort(gpMasterPort)
    .setUsername(gpRoleName)
    .setPassword(gpPasswd)
    .setDB(dbname)
    .setUseSSL(false)
  .build();

// use the blocking stub to call the Connect service
mSession = bStub.connect(connReq);

// (placeholder) do greenplum stuff here

// use the blocking stub to call the Disconnect service
bStub.disconnect(mSession);

4.4.3 Retrieve Greenplum schema and table information

这里贴出官方的代码片段,获取数据库schema和表信息的代码会整合到GPSSUtill类内,获取全部schema:

import java.util.ArrayList;
import java.util.List;

// create a list schema request builder
ListSchemaRequest lsReq = ListSchemaRequest.newBuilder()
    .setSession(mSession)
  .build();

// use the blocking stub to call the ListSchema service
List<Schema> listSchema = bStub.listSchema(lsReq).getSchemasList();

// extract the name of each schema and save in an array
ArrayList<String> schemaNameList = new ArrayList<String>();
for(Schema s : listSchema) {
  schemaNameList.add(s.getName());
} 

获取schema下的表:

// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);

// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()
    .setSession(mSession)
    .setSchema(schemaName)
  .build();

// use the blocking stub to call the ListTable service
List<TableInfo> tblList = bStub.listTable(ltReq).getTablesList();

// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
  if(ti.getTypeValue() == RelationType.Table_VALUE) {
    tblNameList.add(ti.getName());
  }
}

获取表的字段信息【代码内要根据字段信息对数据进行校验和格式转换】:

// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);

// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
    .setSession(mSession)
    .setSchemaName(schemaName)
    .setTableName(tableName)
  .build();

// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();

// print the name and type of each column
for(ColumnInfo ci : columnList) {
  String colname = ci.getName();
  String dbtype = ci.getDatabaseType();
  // display the column name and type to stdout
  System.out.println( "column " + colname + " type: " + dbtype );
}

4.4.4 Prepare a Greenplum table for writing

这里贴出官方的代码片段,写入表准备的代码会整合到GPSSUtill类内:

Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder()
    .setErrorLimitCount(errLimit)
    .setErrorLimitPercentage(errPct)
    .setTruncateTable(false)
    .addInsertColumns("loantitle")
    .addInsertColumns("riskscore")
    .addInsertColumns("d2iratio")
  .build();

// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
    .setSession(mSession)
    .setSchemaName(schemaName)
    .setTableName(tableName)
    //.setPreSQL("")
    //.setPostSQL("")
    //.setEncoding("")
    .setTimeout(5)
    //.setStagingSchema("")
    .setInsertOption(iOpt)
  .build();

// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);

// (placeholder) write data here

// create a close request builder
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder()
    .setSession(mSession)
    //.setMaxErrorRows(15)
    //.setAbort(true)
  .build();

// use the blocking stub to call the Close service
tStats = bStub.close(cReq);

// display the result to stdout
System.out.println( "CloseRequest tStats: " + tStats.toString() );

4.4.5 Write data to the Greenplum table

这里贴出官方的代码片段,写入表的代码会整合到GPSSUtill类内【包含meger和update操作代码】:

// create an array of rows
ArrayList<RowData> rows = new ArrayList<>();
for (int row = 0; row < 2; row++) {
  // create a row builder
  api.Row.Builder builder = api.Row.newBuilder();

  // create builders for each column, in order, and set values - text, int, text
  api.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();
  colbuilder1.setStringValue("xxx");
  builder.addColumns(colbuilder1.build());
  api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();
  colbuilder2.setInt32Value(77);
  builder.addColumns(colbuilder2.build());
  api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();
  colbuilder3.setStringValue("yyy");
  builder.addColumns(colbuilder3.build());

  // build the row
  RowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());

  // add the row
  rows.add(rowbuilder.build());
}

// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder()
    .setSession(mSession)
    .addAllRows(rows)
  .build();

// use the blocking stub to call the Write service; it returns nothing
bStub.write(wReq);

GPSSUtil类代码:

@Component
public class GPSSUtil {

    @Autowired
    private GPSSManagement gpssManagement;

    public void insertData(String insertTableName, List<Map<Object, Object>> data) {
        String schemaName = "public";

        int size = data.size();
        long start = System.currentTimeMillis();

        // 获取连接及session
        GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
        Session session = gpssManagement.getSession();

        // 根据入库表名称获取表结构
        DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
                .build();
        List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
        List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());

        // 封装入库信息并开启请求
        // 错误个数,错误百分比
        int errLimit = 20, errPct = 20;
        InsertOption.Builder builderInsert = InsertOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct)
                .setTruncateTable(false);

//        columnNameList.forEach(builderInsert::addInsertColumns);
        columnList.forEach(columnInfo -> {
            builderInsert.addInsertColumns(columnInfo.getName());
//            builderInsert.addExpressions("(jdata->>'" + columnInfo.getName() + "')::" + columnInfo.getDatabaseType());
        });
        InsertOption insertOpt = builderInsert.build();

        // 开启请求
        OpenRequest openReq = OpenRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
//                .setEncoding("utf-8")
                .setTimeout(5)
                .setInsertOption(insertOpt)
                .build();
        gpssStub.open(openReq);

        // 格式化对象
        List<RowData> rows = new ArrayList<>();
        formatRows(rows, columnList, data);

        // create a write request builder
        WriteRequest writeReq = WriteRequest.newBuilder()
                .setSession(session)
                .addAllRows(rows)
                .build();

        //use the blocking stub to call the write service it returns nothing
        gpssStub.write(writeReq);

        // create a close request builder
        TransferStats tStats = null;
        CloseRequest closeReq = CloseRequest.newBuilder()
                .setSession(session)
                .build();

        // use the blocking stub to call the  Close service
        tStats = gpssStub.close(closeReq);

        gpssManagement.closeConnection(session);
        try {
            gpssManagement.closeChannel(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        long use = end - start;
        System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");


    }

    public void updateData(String insertTableName, List<Map<Object, Object>> data) {
        String schemaName = "public";

        int size = data.size();
        long start = System.currentTimeMillis();

        // 获取连接及session
        GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
        Session session = gpssManagement.getSession();

        // 根据入库表名称获取表结构
        DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
                .build();
        List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
        List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());

        // 封装入库信息并开启请求
        // 错误个数,错误百分比
        int errLimit = 20, errPct = 20;
        UpdateOption.Builder updateOption = UpdateOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct);
        columnNameList.forEach(columnName -> {
            if ("id".equalsIgnoreCase(columnName)) {
                updateOption.addMatchColumns(columnName);
            } else {
                updateOption.addUpdateColumns(columnName);
            }
        });
        UpdateOption updateOpt = updateOption.build();

        // 开启请求
        OpenRequest openReq = OpenRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
                .setEncoding("utf-8")
                .setTimeout(5)
                .setUpdateOption(updateOpt)
                .build();
        gpssStub.open(openReq);

        // 格式化对象
        List<RowData> rows = new ArrayList<>();
        formatRows(rows, columnList, data);

        // create a write request builder
        WriteRequest writeReq = WriteRequest.newBuilder()
                .setSession(session)
                .addAllRows(rows)
                .build();

        //use the blocking stub to call the write service it returns nothing
        gpssStub.write(writeReq);

        // create a close request builder
        TransferStats tStats = null;
        CloseRequest closeReq = CloseRequest.newBuilder()
                .setSession(session)
                .build();

        // use the blocking stub to call the  Close service
        tStats = gpssStub.close(closeReq);

        gpssManagement.closeConnection(session);
        try {
            gpssManagement.closeChannel(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        long use = end - start;
        System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");


    }

    public void mergeData(String insertTableName, List<Map<Object, Object>> data) {
        String schemaName = "public";

        int size = data.size();
        long start = System.currentTimeMillis();

        // 获取连接及session
        GpssGrpc.GpssBlockingStub gpssStub = gpssManagement.getBlockingStub();
        Session session = gpssManagement.getSession();

        // 根据入库表名称获取表结构
        DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
                .build();
        List<ColumnInfo> columnList = gpssStub.describeTable(dtReq).getColumnsList();
        List<String> columnNameList = columnList.stream().map(ColumnInfo::getName).collect(Collectors.toList());

        // 封装入库信息并开启请求
        // 错误个数,错误百分比
        int errLimit = 20, errPct = 20;
        MergeOption.Builder mergeOption = MergeOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct).addMatchColumns("id");
        columnNameList.forEach(columnName -> {
            mergeOption.addInsertColumns(columnName);
            mergeOption.addUpdateColumns(columnName);
        });
        MergeOption mergeOpt = mergeOption.build();

        // 开启请求
        OpenRequest openReq = OpenRequest.newBuilder()
                .setSession(session)
                .setSchemaName(schemaName)
                .setTableName(insertTableName)
                .setEncoding("utf-8")
                .setTimeout(5)
                .setMergeOption(mergeOpt)
                .build();
        gpssStub.open(openReq);

        // 格式化对象
        List<RowData> rows = new ArrayList<>();
        formatRows(rows, columnList, data);

        // create a write request builder
        WriteRequest writeReq = WriteRequest.newBuilder()
                .setSession(session)
                .addAllRows(rows)
                .build();

        //use the blocking stub to call the write service it returns nothing
        gpssStub.write(writeReq);

        // create a close request builder
        TransferStats tStats = null;
        CloseRequest closeReq = CloseRequest.newBuilder()
                .setSession(session)
                .build();

        // use the blocking stub to call the  Close service
        tStats = gpssStub.close(closeReq);

        gpssManagement.closeConnection(session);
        try {
            gpssManagement.closeChannel(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        long use = end - start;
        System.out.println("CloseRequest tStats: " + tStats.toString() + "GP数据库入库:" + "-" + size + "条数据总耗时:" + use + "毫秒。");


    }

    /**
     * 格式化数据
     *
     * @param rows       格式化后的数据
     * @param columnList 目标表的字段名称和类型
     * @param data       入库数据
     */
    private void formatRows(List<RowData> rows, List<ColumnInfo> columnList, List<Map<Object, Object>> data) {
        // addColumns
        for (Map<Object, Object> insertRow : data) {
            // builder
            Row.Builder builder = Row.newBuilder();
            // format
            columnList.forEach(columnInfo -> {
                String name = columnInfo.getName();
                Object valueObject = insertRow.get(name);
                String databaseType = columnInfo.getDatabaseType();
                if (valueObject != null) {
                    String value = valueObject.toString();
                    if ("VARCHAR".equals(databaseType)) {
                        builder.addColumns(DBValue.newBuilder().setStringValue(value));
                    } else if ("INT2".equalsIgnoreCase(databaseType)) {
                        builder.addColumns(DBValue.newBuilder().setInt32Value(Integer.parseInt(value)));
                    }
                } else {
                    if ("VARCHAR".equals(databaseType)) {
                        builder.addColumns(DBValue.newBuilder().setStringValue(""));
                    } else if ("INT2".equalsIgnoreCase(databaseType)) {
                        builder.addColumns(DBValue.newBuilder().setInt32Value(-1));
                    }
                }
            });
            // builder the row
            RowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());
            // add the row
            rows.add(rowBuilder.build());
        }
    }
}

5.总结

官方文档还算是比较详细的,但是仅给出核心代码,实际使用时要写的代码还是挺多的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/416005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【论文笔记】Attention Augmented Convolutional Networks(ICCV 2019 入选文章)

目录 一、摘要 二、介绍 三、相关工作 卷积网络Convolutional networks&#xff1a; 网络中注意力机制Attention mechanisms in networks&#xff1a; 四、方法 1. 图像的自注意力Self-attention over images&#xff1a; 二维位置嵌入Two-dimensional Positional Enco…

redis 第一章

开始学习redis 之旅吧 关于redis 的介绍 redis 是一个开源的软件&#xff0c;可以存储结构化的数据在内存中&#xff0c;像内存数据库&#xff0c;缓存、消息中间件、流处理引擎。 redis 提供的数据结构像strings, hashes, lists, sets, sorted sets 。Redis具有内置复制、Lua…

《花雕学AI》13:早出对策,积极应对ChatGPT带来的一系列风险和挑战

ChatGPT是一款能和人类聊天的机器人&#xff0c;它可以学习和理解人类语言&#xff0c;也可以帮人们做一些工作&#xff0c;比如翻译、写文章、写代码等。ChatGPT很强大&#xff0c;让很多人感兴趣&#xff0c;也让很多人担心。 使用ChatGPT有一些风险&#xff0c;比如数据的质…

Pytorch 张量操作 Python切片操作

目录一维张量定义一维实例操作二维张量操作张量拼接-注意需要拼接的维度一定要相同广播机制更高维的演示总结YOLOv5 Focus样例参考梳理一下Pytorch的张量切片操作一维张量定义 一维向量的操作其实很像numpy一维数组&#xff0c;基本定义如下&#xff1a; 1.默认步长为1 2.起始…

HotSpot经典垃圾收集器

虽然垃圾收集器的技术在不断进步&#xff0c;但直到现在还没最好的收集器出现&#xff0c;更加不存在“万能”的收集器&#xff0c;所以我们选择的只是对具体应用最合适的收集器。 图 HotSpot中的垃圾收集器&#xff0c;连线表示可搭配使用 1 Serial收集器 是最基础、历史最悠…

第08章_面向对象编程(高级)

第08章_面向对象编程(高级) 讲师&#xff1a;尚硅谷-宋红康&#xff08;江湖人称&#xff1a;康师傅&#xff09; 官网&#xff1a;http://www.atguigu.com 本章专题与脉络 1. 关键字&#xff1a;static 回顾类中的实例变量&#xff08;即非static的成员变量&#xff09; c…

linux文件类型和根目录结构

目录 一、Linux文件类型 二、Linux系统的目录结构 1. FHS 2. 路径以及工作目录 &#xff08;1&#xff09;路径 &#xff08;2&#xff09;工作目录 一、Linux文件类型 使用ls -l命令查看到的第一个字符文件类型说明-普通文件类似于Windows的记事本d目录文件类似于Windo…

【GPT4】GPT4 创作郭德纲姜昆相声作品的比较研究

欢迎关注【youcans的 AIGC 学习笔记】原创作品 说明&#xff1a;本文附录内容由 youcans 与 GPT-4 共同创作。 【GPT4】GPT4 创作郭德纲姜昆相声作品的比较研究研究总结0. 背景1. 对 GPT4 创作的第 1 段相声的分析2. 对GPT4 创作的第 2 段相声的分析3. 对GPT4 创作的第 3 段相…

Window常用命令

一、快捷键 1、自带快捷键 序号快捷键作用1windowsGXBOX录屏2cmd >osk屏幕键盘3cmd >calc计算器4cmd >mrt恶意软件删除工具 2、浏览器快捷键 序号快捷键作用1Alt P浏览器图片下载&#xff08;来自油猴脚本&#xff09; 二、其他功能 1、解决端口占用 第一步&…

Linux安装单细胞分析软件copykat

Linux安装单细胞分析软件copykat 测试环境 Linux centos 7R 4.1.2minconda3天意云24C192GB安装步骤 新建环境 conda activate copykatconda install r-base4.1.2 安装基础软件 checkPkg <- function(pkg){return(requireNamespace(pkg, quietly TRUE))}if(!checkPkg("…

类的加载过程-过程二:Linking阶段

链接过程之验证阶段(Verification) 当类加载到系统后&#xff0c;就开始链接操作&#xff0c;验证是链接操作的第一步。 它的目的是保证加载的字节码是合法、合理并符合规范的。 验证的步骤比较复杂&#xff0c;实际要验证的项目也很繁多&#xff0c;大体上Java虚拟机需要做…

基于stable diffusion的艺术操作

下面是作者基于stable diffusion的艺术操作 得益于人工智能的强大技术 以下所有的图 绝对是整体星球上唯一的图 现在人工智能越来越强大&#xff0c;感觉将来最有可能取代的就是摄影师、中低级的程序员、UI设计师、数据分析师等&#xff0c;人们未来更多从事的职业应该是快速…

机器学习 01

目录 一、机器学习 二、机器学习工作流程 2.1 获取数据 2.2 数据集 2.2.1 数据类型构成 2.2.2 数据分割 2.3 数据基本处理 2.4 特征工程 2.4.1什么是特征工程 2.4.2 为什么需要特征工程(Feature Engineering) 2.4.3 特征工程内容 2.5 机器学习 2.6 模型评估 2.7 …

【消息队列】细说Kafka消费者的分区分配和重平衡

消费方式 我们直到在性能设计中异步模式&#xff0c;一般要么是采用pull&#xff0c;要么采用push。而两种方式各有优缺点。 pull &#xff1a;说白了就是通过消费端进行主动拉去数据&#xff0c;会根据自身系统处理能力去获取消息&#xff0c;上有Broker系统无需关注消费端的…

Windows GPU版本的深度学习环境安装

本文记录了cuda、cuDNN的安装配置。 参考文章&#xff1a; cuda-installation-guide-microsoft-windows 12.1 documentation Installation Guide :: NVIDIA cuDNN Documentation 一、cuda安装 注意事项&#xff1a; 1、cuda安装最重要的是查看自己应该安装的版本。 表格…

Java数组打印的几种方式

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点!人生格言&#xff1a;当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友一起加油喔&#x1f9be;&am…

独立看门狗(IWDG)实验

独立看门狗简介 单片机系统在外界的干扰下会出现程序跑飞的现象导致出现死循环&#xff0c; 看门狗电路就是为了避免这种情况的发生 。IWDG&#xff08;Independent watchdog&#xff09;独立看门狗&#xff0c;可以用来检测并解决由于软件错误导致的故障&#xff0c;当计数器…

使用 ArcGIS Pro 进行土地利用分类的机器学习和深度学习

随着技术进步&#xff0c;尤其是地理信息系统 (GIS)工具的进步&#xff0c;可以更有效地对土地利用进行分类。分类的使用可用于识别植被覆盖变化、非法采矿区和植被抑制区域&#xff0c;这些只是土地利用分类的众多示例中的一部分。 分类的一大困难是确定要解决的问题的级别。…

MongoDB 聚合管道中使用数组表达式运算符断言数组($isArray)

数组表达式运算符主要用于文档中数组的操作&#xff0c;接上一篇&#xff1a; MongoDB 聚合管道中使用数组表达式运算符&#xff08;$concatArrays合并数组&#xff09;https://blog.csdn.net/m1729339749/article/details/130162048本篇我们主要介绍数组表达式运算符中用于断…

在windows上安装部署cicd

安装步骤 下载gitlab-runner&#xff0c;官网地址如下&#xff1a; https://docs.gitlab.com/runner/install/windows.html在任意位置创建文件夹&#xff0c;并把安装程序放入文件夹中 安装gitlab-runner 注意需要使用管理员权限&#xff0c;打开powershell才能运行 cd C:\Gi…