Flink Sql Redis Connector

news2024/10/23 14:24:07

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (
  `order_id` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hgetall',
  'key' = 'orders'
);


select * from orders

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (
  `order_number` BIGINT,
  `price` DECIMAL(32,2),
  `order_time` TIMESTAMP(3),
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);

2. define redis sink table 

CREATE TABLE orders (
  `order_number` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hmset',
  'key' = 'orders'
);

3. insert data to redis sink table (cast data type to string)

insert into redis_sink
	select
		cast(order_number as STRING) order_number,
		cast(price as STRING) price,
		cast(order_time as STRING) order_time
	from orders
	

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {


    
    private ReadableConfig options;
    public RedisSourceSinkFactory(){}

    public RedisSourceSinkFactory(ReadableConfig options){
        this.options = options;
    }

    
    //DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        options = helper.getOptions();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        return new RedisDynamicTableSource(options,columnNames,primaryKey);

    }
    
    /DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        ReadableConfig options = helper.getOptions();
        return new RedisDynamicTableSink(options,columnNames,primaryKey);
    }



    @Override
    public String factoryIdentifier() {
        return "redis";
    }

    //sql connector 必填项
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.PASSWORD);
        options.add(RedisOptions.KEY);
        options.add(RedisOptions.MODE);
        return options;
    }
    
    //sql connector 选填项
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.SINGLE_HOST);
        options.add(RedisOptions.SINGLE_PORT);
        options.add(RedisOptions.CLUSTER_NODES);
        options.add(RedisOptions.FIELD);
        options.add(RedisOptions.CURSOR);
        options.add(RedisOptions.EXPIRE);
        options.add(RedisOptions.COMMAND);
        options.add(RedisOptions.START);
        options.add(RedisOptions.END);
        options.add(RedisOptions.CONNECTION_MAX_TOTAL);
        options.add(RedisOptions.CONNECTION_MAX_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);
        options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);
        options.add(RedisOptions.CONNECTION_TIMEOUT_MS);
        options.add(RedisOptions.TTL_SEC);
        options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);
        options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);
        options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);

        return options;
    }

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;

import java.util.List;

public class RedisDynamicTableSource implements ScanTableSource {

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;


    public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }



    @Override
    public DynamicTableSource copy() {

        return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);
    }




    @Override
    public String asSummaryString() {
        return "redis table source";
    }


    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.all();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);
        return SourceFunctionProvider.of(redisSourceFunction,false);
    }
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;

import java.util.*;


public class RedisSourceFunction extends RichSourceFunction<RowData>{

    private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String value;
    private String field;
    private String[] fields;
    private String cursor;
    private Integer start;
    private Integer end;
    private String[] keySplit;
    private static int position = 1;
    private GenericRowData rowData;
    public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }


    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
        String command = options.get(RedisOptions.COMMAND);

        // judge if command is redis set data command and stop method
        List<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,
                RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);
        if(sourceCommand.contains(command.toUpperCase())){ return;}

        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");
        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);


        if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);



            switch (command.toUpperCase()){
                        case RedisCommandOptions.GET:
                            value = jedis.get(key);
                            rowData = new GenericRowData(2);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            rowData.setField(1,BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGET:
                            field = options.get(RedisOptions.FIELD);
                            value = jedis.hget(key, field);
                            rowData = new GenericRowData(3);
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                            }
                            rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGETALL:
                            if (keyArr.length > 1){
                                for (String str : keyArr) {
                                    rowData = new GenericRowData(columns.size());
                                    keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(str, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }
                                    ctx.collect(rowData);
                                }

                            }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                                rowData = new GenericRowData(columns.size());
                                keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                for (int i = 0; i < primaryKey.size(); i++) {
                                    rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                }

                                for (int i = primaryKey.size(); i < columns.size(); i++) {
                                    String value = jedis.hget(key, columns.get(i));
                                    rowData.setField(i,BinaryStringData.fromString(value));
                                }

                                ctx.collect(rowData);

                            }else{
                                //Fuzzy matching ,gets the data of the entire table
                                String fuzzyKey = new StringBuffer(key).append("*").toString();
                                Set<String> keys = jedis.keys(fuzzyKey);
                                for (String keyStr : keys) {
                                    keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                    rowData = new GenericRowData(columns.size());
                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(keyStr, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }

                                    ctx.collect(rowData);

                                }
                            }

                            break;

                        case RedisCommandOptions.HSCAN:
                            cursor = options.get(RedisOptions.CURSOR);
                            ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);
                            List<Map.Entry<String, String>> result = entries.getResult();
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            position = primaryKey.size();
                            for (int i = 0; i < result.size(); i++) {
                                value = result.get(i).getValue();
                                rowData.setField(position,BinaryStringData.fromString(value));
                                position++;
                            }
                            break;

                        case RedisCommandOptions.LRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            List<String> list = jedis.lrange(key, start, end);
                            rowData = new GenericRowData(list.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            list.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});

                            break;

                        case RedisCommandOptions.SMEMBERS:
                            Set<String> smembers = jedis.smembers(key);
                            rowData = new GenericRowData(smembers.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            smembers.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;

                        case RedisCommandOptions.ZRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            Set<String> sets = jedis.zrange(key, start, end);
                            rowData = new GenericRowData(sets.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            sets.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;


                        default:
                            LOG.error("Cannot process such data type: {}", command);
                            break;
                    }

                    if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                        ctx.collect(rowData);
                    }



            }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.GET:
                    value = jedisCluster.get(key);
                    rowData = new GenericRowData(2);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    rowData.setField(1,BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGET:
                    field = options.get(RedisOptions.FIELD);
                    value = jedisCluster.hget(key, field);
                    rowData = new GenericRowData(3);
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                    }
                    rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGETALL:
                    if (keyArr.length > 1){
                        for (String str : keyArr) {
                            rowData = new GenericRowData(columns.size());
                            keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(str, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }
                            ctx.collect(rowData);
                        }

                    }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                        rowData = new GenericRowData(columns.size());
                        keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        for (int i = 0; i < primaryKey.size(); i++) {
                            rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                        }

                        for (int i = primaryKey.size(); i < columns.size(); i++) {
                            String value = jedisCluster.hget(key, columns.get(i));
                            rowData.setField(i,BinaryStringData.fromString(value));
                        }

                        ctx.collect(rowData);

                    }else{
                        //Fuzzy matching ,gets the data of the entire table
                        String fuzzyKey = new StringBuffer(key).append("*").toString();
                        Set<String> keys = jedisCluster.keys(fuzzyKey);
                        for (String keyStr : keys) {
                            keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(keyStr, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }

                            ctx.collect(rowData);

                        }
                    }

                    break;

                case RedisCommandOptions.HSCAN:
                    cursor = options.get(RedisOptions.CURSOR);
                    ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);
                    List<Map.Entry<String, String>> result = entries.getResult();
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    rowData = new GenericRowData(columns.size());
                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                    }

                    position = primaryKey.size();
                    for (int i = 0; i < result.size(); i++) {
                        value = result.get(i).getValue();
                        rowData.setField(position,BinaryStringData.fromString(value));
                        position++;
                    }
                    break;



                case RedisCommandOptions.LRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    List<String> list = jedisCluster.lrange(key, start, end);
                    rowData = new GenericRowData(list.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    list.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});

                    break;

                case RedisCommandOptions.SMEMBERS:
                    Set<String> smembers = jedisCluster.smembers(key);
                    rowData = new GenericRowData(smembers.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    smembers.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;

                case RedisCommandOptions.ZRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    Set<String> sets = jedisCluster.zrange(key, start, end);
                    rowData = new GenericRowData(sets.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    sets.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;


                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }

            if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                ctx.collect(rowData);
            }

        }else{
            LOG.error("Unsupport such {} mode",mode);
        }




    }

    @Override
    public void cancel() {

        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;

    public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.DELETE)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();

    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);
        return SinkFunctionProvider.of(myRedisSinkFunction);

    }

    @Override
    public DynamicTableSink copy() {
        return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);
    }

    @Override
    public String asSummaryString() {
        return "redis table sink";
    }
}
package org.apache.flink.sink;

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;

import java.util.List;


public class RedisSinkFunction extends RichSinkFunction<RowData>{

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private String fields;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String[] fieldsArr;
    private StringBuffer redisTableKey;
    private String value;

    public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){

        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }


    @Override
    public void invoke(RowData rowData, Context context) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String command = options.get(RedisOptions.COMMAND);
        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");

        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);


        if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedis.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        if (i != primaryKey.size() -1){
                            redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        }

                    }
                    for (int i = 1; i < columns.size(); i++) {
                        if (!primaryKey.contains(columns.get(i))){
                            value = rowData.getString(i).toString();
                            jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                        }
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedis.lpush(key,value);

                    break;

                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedis.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedis.sadd(key,value);
                    break;

                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }


        }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedisCluster.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    for (int i = 1; i < columns.size(); i++) {
                        value = rowData.getString(i).toString();
                        jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.lpush(key,value);

                    break;


                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedisCluster.sadd(key,value);
                    break;


                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }




        }else{
            LOG.error("Unsupport such {} mode",mode);
        }

    }

    @Override
    public void close() throws Exception {
        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843206.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

性能测试(五)—— 数据库性能测试-mysql

1 mysql性能测试的主要内容 MySQL数据库介绍MySQL数据库监控指标MySQL慢查询工作原理及操作SQL的分析与调优方法MySQL索引的概念及作用MySQL索引的工作原理与设计规范MySQL存储引擎MySQL实时监控MySQL集群监控方案MySQL性能测试的用例准备使用Jmeter开发MySQL性能测试脚本执行…

【从零到一】电子元器件网站建设/开发方案、流程及搭建要点全解

电子元器件行业在数字化转型的大潮下也迎来了前所未有的发展机遇。一个高效、专业、用户友好的电子元器件网站&#xff0c;不仅能够提升品牌形象&#xff0c;还能显著提高销售转化率&#xff0c;增强客户粘性。道合顺芯站点将详细阐述电子元器件开发方案、实施流程&#xff0c;…

【ai】tx2-nx:搭配torch的torchvision

微雪的教程pytorch_version 1.10.0 官方教程安装torch官方教程 依赖项 nvidia@tx2-nx:~/twork/03_yolov5$ $ sudo apt-get install libjpeg-dev zlib1g-dev lib

如何避免群发引起反感?

微信群发信息引起反感主要是因为缺乏情感&#xff0c;尽管最初微信群发旨在传递有价值信息&#xff0c;但由于滥用&#xff0c;现在人们对其印象非常负面。但是&#xff0c;还是有办法挽救的&#xff01; 群发消息时按照这3个标准发&#xff0c;可以避免被反感。 1、短信群发目…

wps要会员才能把pdf分开,这不纯属智商税吗

我有一个文档 然后 我给你们写好了一个代码 from PyPDF2 import PdfReader, PdfWriterdef split_pdf(file_path, ranges, output_names):# Open the input PDF filewith open(file_path, rb) as pdf_file:reader = PdfReader(pdf_file)total_pages = len(reader.pages)if len…

Humanize,一个很有人情味的 Python 库

目录 01初识 Humanize 为什么选择 Humanize&#xff1f; 安装 Humanize 02时间与日期的处理 时间差的展示 日期的展示 03数字的处理 数字的单位转换 数字的精确度控制 数字的千位分隔符 04文件大小的处理 文件…

【Linux工具】yum软件包管理器与Vim编辑器的高效运用

目录 Linux 软件包管理器 YUM 什么是软件包 安装工具 rzsz 及注意事项 查看软件包 安装和卸载软件 安装软件 卸载软件 Linux 开发工具 编辑器 - Vim 使用 ​编辑 Vim 与 Vi 的区别 Vim 的基本概念 三种模式 Vim 的基本操作 操作尝试&#xff1a; Vim 命令集解释…

Windows更新报错 0xc1900101 0x30018 解决方案

卸载自带的电脑管家&#xff08;比如华硕、联想、华为等&#xff09; 通过禁用第三方驱动启动Windows&#xff08;winr 运行 msconfig&#xff09;&#xff0c;然后禁用掉第三方服务&#xff0c;重启系统。 检查更新&#xff0c;应该问题就能解决 记得重新运行msconfig&…

【机器学习】线性回归:从基础到实践的深度解析

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 线性回归&#xff1a;从基础到实践的深度解析引言一、线性回归基础1.1 定义与目…

Electron+vite+vuetify项目搭建

最近想用Electron来进行跨平台的桌面应用开发。同时想用vuetify作为组件&#xff0c;于是想搭建一个这样的开发环境。其中踩了不少坑&#xff0c;总是会出现各种的编译错误和问题&#xff0c;依赖的各种问题&#xff0c;搞了好久最终环境终于弄好可正常开发了。这里分享下快速搭…

最新OPPO 真我手机 一加手机 使用adb命令永久关闭系统更新教程

使用adb命令永久关闭系统更新 一、先了解手机系统二、Android 11 以下使用adb 命令永久关闭系统更新1、adb 官方下载2、小白开启 USB 调试模式教程&#xff08;熟手跳过&#xff09;三、Android 12 以上使用adb 命令永久关闭系统更新什么您还是不会弄&#xff01;赞赏我&#x…

git中的多人协作开发场景

✨前言✨ &#x1f4d8; 博客主页&#xff1a;to Keep博客主页 &#x1f646;欢迎关注&#xff0c;&#x1f44d;点赞&#xff0c;&#x1f4dd;留言评论 ⏳首发时间&#xff1a;2024年6月20日 &#x1f4e8; 博主码云地址&#xff1a;博主码云地址 &#x1f4d5;参考书籍&…

告别夏季粉尘螨虫困扰,这些空气净化器品牌你不能错过!

夏季来临&#xff0c;粉尘螨虫肆虐&#xff0c;对家居环境造成巨大威胁。俗话说&#xff1a;“病从口入&#xff0c;祸从口出。”夏季是粉尘和螨虫的活跃期&#xff0c;常规的清洁手段如吸尘、抹布擦拭等已无法彻底清除这些顽固的过敏源。尤其是在空调使用频繁的日子里&#xf…

民宿小程序在线预约系统开发,提高品牌影响力

在旅游业发展旺盛的当下&#xff0c;也带动了各地民宿的发展。在科技的支持下&#xff0c;民宿小程序得到了快速发展&#xff0c;凭借方便快捷的优势为大众带来新的体验。 民宿小程序的发展为用户提供了便捷的预订渠道&#xff0c;用户可以根据对房间的要求选择&#xff0c;能…

DNF安卓分离仅是开始:游戏厂商积极布局自有渠道,市场变革在即

毫无征兆&#xff0c;DNF手游今天突然宣布从各大安卓平台下架。 《地下城与勇士:起源》运营团队于6月19日发布声明&#xff0c;指出因合约到期&#xff0c;游戏将不再上架部分安卓平台的应用商店。然而&#xff0c;这一事件并非完全无迹可循。 早在2021年初&#xff0c;华为游…

崖山数据库一体机 | 高性能、高可靠、智能化运维的一站式数据库解决方案

国产软硬件融合难&#xff1f; 性能调优挑战重重&#xff1f; 兼容性问题频发&#xff1f; 软硬件单独购买TCO成本高&#xff1f; .... 面对数据管理的这些挑战 数据库一体机的出现 提供了全新的解决方案 就在刚结束的浪潮信息元脑中国行-广州站活动现场上&#xff0c;崖…

搭建预约咨询小程序,高效便捷新选择

一、预约咨询小程序是什么&#xff1f; 预约咨询小程序是一款适用于各种生活场景包括医疗、保洁、宠物护理、法律等方面的预约咨询类小程序。 二、这款小程序有什么亮点优势&#xff1f; 预约咨询小程序适用场景广泛&#xff0c;无论是心理咨询、法律咨询&#xff0c;还是宠物…

Spring AI 介绍以及与 Spring Boot 项目整合

Spring AI 项目旨在简化使用 Spring Boot 开发包含人工智能功能的应用程序&#xff0c;提供抽象和支持多种模型提供商及矢量数据库提供商。 Spring AI 的功能特点 支持主流模型提供商&#xff1a;如 OpenAI、Microsoft、Amazon、Google 和 Huggingface 等。支持多种模型类型&a…

springboot大学生体质测试管理系统 LW+PPT+源码

3 系统需求分析 3.1 系统可行性分析及目的 3.1.1 系统设计目的 如今我们已经越来越离不开互联网给我们带来的生活便利&#xff0c;希望大学生体质测试管理系统也能通过活泼、清新的界面给用户提供简单的与互动的网站。方便用户在平时利用有限的时间对测试信息进行查看&#xf…

crontab异常任务删除不了,清除挖矿病毒

1、事件原因 当天发现服务器访问速度异常缓慢&#xff0c;通过top命令查看系统资源使用情况&#xff0c;发现名为systemd-mont&#xff08;可能是一个误写或自定义的进程名&#xff09;的两个异常线程占用了大量的CPU资源&#xff0c;几乎导致CPU满载。 2、查找问题 为了确定这…