pyflink学习笔记(四):datastream_api

news2025/1/19 23:10:10

现pyflink环境为1.16 ,下面介绍下常用的datastream算子。

现我整理的都是简单的、常用的,后期会继续补充。

官网:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/python/datastream/intro_to_datastream_api/

from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment
#构建环境,构建环境时,可添加配置参数,也可默认
# https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/python/python_config/
config = Configuration()
config.set_integer("python.fn-execution.bundle.size", 1000)
env = StreamExecutionEnvironment.get_execution_environment(config)
#默认 无参
env = StreamExecutionEnvironment.get_execution_environment()
#添加配置
env.set_parallelism(1) #添加配置:并行度  其他配置可参考官网 https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution/execution_configuration/

数据输入

1.from_collection

从集合Collection中读取数据,用的比较多的。但是一般都是本地测试或者单机运行的时候用。

主要就是两个参数

1:collection(必填) 数组类型-就是传入的数据,数据类型需要对应。

2:type_info(非必填) TypeInformation类型,定义数据的scheam,通过Types.ROW_NAMED定义列名,后面的数组是定义数据类型,三个对应上就行

ds = env.from_collection(
    [('a', 'id=1', 1), ('a', 'id=2', 2), ('a', 'id=3', 3), ('b', 'home=1', 1), ('b', 'home=2', 2)],
    type_info=Types.ROW_NAMED(["key", "url", "value"], [Types.STRING(), Types.STRING(), Types.INT()]))

2.read_text_file

逐行读取给定的文件并创建一个数据流,该数据流包含一个字符串每一行的内容。

两个参数:

1.file_path :文件路径 类型 'file:///some/local/file' or 'hdfs://host:port/file/path'

2.charset_name:默认utf-8

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds1 = env.read_text_file("D:\\tmp\\requirements.txt")
ds1.print()
env.execute()

3.from_source

传递source来获取datastream类型的数据。

主要参数:

  1. source:source数据源

  1. watermark_strategy: Watermark生成策略,有单调递增策略(forMonotonousTimestamps)、固定乱序长度策略(forBoundedOutOfOrderness)等等有需要自己百度下。

  1. source_name:名称

  1. type_info:类型

下面就是生成单调递增的1-10的数据

seq_num_source = NumberSequenceSource(1, 10)
ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), #单调递增的策略
    source_name='seq_num_source',
    type_info=Types.LONG())
ds.print()
ds.execute()

还可读取文件,通过FileSource.for_record_stream_format方法读取文件。input_path为文件路径

 ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )

4.add_source

自定义添加数据源,一般都是添加kafka。

add_source(self, source_func: SourceFunction, source_name: str = 'Custom Source',  type_info: TypeInformation = None) -> 'DataStream':

用户可以自定义kakfak的FlinkKafkaConsumer和 FlinkKafkaProducer

# 注意:file:///前缀不能省略
env.add_jars("file:///.../flink-sql-connector-kafka_2.11-1.12.0.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=Types.ROW([Types.LONG(), Types.LONG()])).build()
kafka_source1 = FlinkKafkaConsumer(
    topics='test_source_topic',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
source_ds = env.add_source(kafka_source1).name('source_kafka')
map_df = source_ds.map(lambda row: json.loads(row)) 
kafka_producer = FlinkKafkaProducer(
        topic='test_source_topic',
        serialization_schema=SimpleStringSchema(),
        producer_config={'bootstrap.servers': 'localhost:9092'})

 map_df.add_sink(kafka_producer).name('sink_kafka')

但是你要用的是1.16版本,那么会提示你FlinkKafkaConsumer不支持了

class KafkaSource(Source):
    """
    The Source implementation of Kafka. Please use a :class:`KafkaSourceBuilder` to construct a
    :class:`KafkaSource`. The following example shows how to create a KafkaSource emitting records
    of String type.

    ::

        >>> source = KafkaSource \\
        ...     .builder() \\
        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
        ...     .set_group_id('MY_GROUP') \\
        ...     .set_topics('TOPIC1', 'TOPIC2') \\
        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
        ...     .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\
        ...     .build()

    .. versionadded:: 1.16.0
    """

所以1.16之后 官方建议用KafkaSource和KafkaSink

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \
    KafkaRecordSerializationSchema

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
kafka_source = KafkaSource \
    .builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_group_id('MY_GROUP') \
    .set_topics('test_source_topic') \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .build()
ds = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "kafka source")

ds = ds.map(lambda x: x[0])


env.execute('datastream_api_demo')

但是把这个报错,跟自定义分区一样,不知道啥情况,等以后在研究研究,1.16也可以使用FlinkKafkaConsumer和 FlinkKafkaProducer,通过add_source来使用。

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)

map(映射)

map是大家非常熟悉的大数据操作算子,主要用于将流中进行转换形成新的数据流,简单来说 就是一一映射,进来什么样出去就什么样。

from pyflink.common import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment

##函数类实现,有点类似java的富含数类。
class MyMapFunction(MapFunction):
    # 这个还可以实现open方法,当作全局调用
    def __init__(self, str_p):
        self.str_p = str_p

    def map(self, value):
        dd = value[1].split('|')
        return dd[0] + "_" + self.str_p


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
data_stream = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
# mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.ROW([Types.STRING(), Types.STRING()]))
mapped_stream = data_stream.map(MyMapFunction("pj"), output_type=Types.STRING())
#亦可以使用lambda匿名函数来实现
# maplambda_fun = data_stream.map(lambda x: x * 3)
# mapped_stream.print()
mapped_stream.print()
env.execute("1")
aaa_pj
bb_pj
aaa_pj

filter(过滤)

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置转换过滤条件,对于一个流内元素进行判断,若为true则输出正常,为false则过滤元素。

filter只负责筛选数据,数据操作不要在filter实现

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, FilterFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
data_stream = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))


class myFilterFunction(FilterFunction):

    def __init__(self, condition):
        self.condition = condition

    def filter(self, value):
        if value[1] == self.condition:
            return value


result = data_stream.filter(myFilterFunction('aaa|bb'))
result1 = data_stream.filter(lambda x: x[1] == 'aaa|bb')
result.print()
env.execute()
+I[1, aaa|bb]

flatMap(扁平映射)

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般集合类型)拆分成一个一个的个体使用。

map:进去是个体,出来也是个体;进去是集合,出来也是集合。

flatmap:进去是集合,出来是个体,当都是个体时跟map就没差了。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
data_stream = env.from_collection([(1, 'aaa@bb'), (2, 'bb@a'), (3, 'aaa@a')])

#继承 FlatMapFunction  ,这种写法主要作用就是为了实现抽象类,比如open实现全局变量长连接等
class myFlatMapFunciotn(FlatMapFunction):

    def flat_map(self, value):
        yield value[1].split("@")[0]
       

result = data_stream.flat_map(myFlatMapFunciotn(), result_type=Types.STRING())
# 如果业务处理中只是为了实现一些数据处理任务,可以直接编写函数实现,效果是一样的。
def split(value):
    yield value[1].split("@")[0]
result1 = data_stream.flat_map(split)
result1.print()
env.execute("1")
aaa
bb
aaa

聚合算子( Aggregation)

直观上看,基本转换算子确实是在"转换",因为它们都是基于当前数据,去做了处理和输出。而在实际应用中,我们往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有的数据聚在一起进行汇总合并,这就是所谓的聚合,也对应这mapreduce中的reduce操作。

1.key_by

key_by是聚合前必须要用到的一个算子。keyby通过指定键,可以将一条流从逻辑上划分成不同的分区(partitions),这里说的分区,其实就是并行处理的子任务,也就对应这人物槽(task slot).

通过分区将数据流到不同的分区中,这样一来所有相同的key的数据都会发往同一个分区,也就是同一个人物槽中进行处理了。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# data_stream = env.from_collection([(1, 'aaa@bb'), (2, 'bb@a'), (3, 'aaa@a')])
ds = env.from_collection(
    [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
    type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]))
result = ds.key_by(lambda x: x[0])
result.print(“1”)
env.execute()
+I[a,1]
+I[a,2]
+I[a,3]
+I[b,1]
+I[b,2]

2.简单聚合

有了keyby分区之后我们就可以使用简单的聚合了,其实无论是sum、max、min等操作,直接DataStream 是不能直接使用的,因为必须分区后才能使用,这也是1.16版本后才可以使用的。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_parallelism(2)
ds = env.from_collection(
    [('a', 'id=1', 1), ('a', 'id=2', 2), ('a', 'id=3', 3), ('b', 'home=1', 1), ('b', 'home=2', 2)],
    type_info=Types.ROW_NAMED(["key", "url", "value"], [Types.STRING(), Types.STRING(), Types.INT()]))
# sum的参数可以是列名,也可以是position
# 该方法通过第一位(也就是key列)分区后,然后根据value列相加分别统计总数
#result = ds.key_by(lambda x: x[0]).sum("value")
result1 = ds.key_by(lambda x: x[0]).max("value")
result2 = ds.key_by(lambda x: x[0]).max_by("value")
result1.print("max:")
result2.print("max_by:")
env.execute()

简单说下max和max_by的区别,它俩都是求指定字段的最小值,但是min只计算指定字段的最小心,其他字段会保留最初第一个数据的值,而min_by则会返回包含字段最小值的整条数据,min一样.

比如下面,观察下结果,max的结果中的url列的值是不变的,因为你只用了value来统计,而max_by是都变化的,用哪个就按照实际业务来把。

max:> +I[a,id=1,1]
max:> +I[a,id=1,2]
max:> +I[a,id=1,3]
max:> +I[b,home=1,1]
max:> +I[b,home=1,2]
max_by:> +I[a,id=1,1]
max_by:> +I[a,id=2,2]
max_by:> +I[a,id=3,3]
max_by:> +I[b,home=1,1]
max_by:> +I[b,home=2,2]

3.归约聚合reduce

如果说简单聚合是对一些特定统计需求的实现,那么reduce算子就是一个一般化的聚合统计操作了。从MapReduce开始,我们就对reduce操作就不陌生了,他可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,在做一个聚合计算。

import random
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

##先生成一些随机数据
name_list = ["wang", "zhao", "yu"]
url_list = ["baidu.com", "taobao.com", "google.com", "yangshi.com"]

#name+url 统计下
result_list = []
for num in range(100):
    result_tuple = (random.choice(name_list), random.choice(url_list))
    result_list.append(result_tuple)


class MyReduceFunction(ReduceFunction):
    # value1是归约后的结果,value2是新数据
    def reduce(self, value1, value2):
        return value1[0], value1[1] + value2[1]

ds = env.from_collection(result_list, type_info=Types.ROW_NAMED(["name", "url"], [Types.STRING(), Types.STRING()]))
result = ds \
    .map(lambda x: (x.name + "_" + x.url, 1)) \ # 通过map先将数据映射成name_url 1的格式
    .key_by(lambda x: x[0]) \ #通过name分区
    .reduce(MyReduceFunction()) #事先reducefunction方法

result.print("reduce:")
env.execute()

截取最后的输出,可以看到会累计打印出每个分区的访问累计数。

reduce:> ('wang_google.com', 14)
reduce:> ('wang_yangshi.com', 9)
reduce:> ('wang_taobao.com', 7)
reduce:> ('zhao_google.com', 9)
reduce:> ('yu_baidu.com', 8)
reduce:> ('zhao_taobao.com', 6)
reduce:> ('yu_yangshi.com', 12)
reduce:> ('zhao_yangshi.com', 10)
reduce:> ('zhao_google.com', 10)
reduce:> ('wang_baidu.com', 4)
reduce:> ('yu_yangshi.com', 13)

但是就想要累计数最多的那个分区(也就是name_url),我们可以通过使用max,也可以使用reduce

import random
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

name_list = ["wang", "zhao", "yu"]
url_list = ["baidu.com", "taobao.com", "google.com", "yangshi.com"]

result_list = []
for num in range(100):
    result_tuple = (random.choice(name_list), random.choice(url_list))
    result_list.append(result_tuple)


class MyReduceFunction(ReduceFunction):

    def reduce(self, value1, value2):
        return value1[0], value1[1] + value2[1]


class MyReduceFunction2(ReduceFunction):
    def reduce(self, value1, value2):
        return value1 if value1[1] > value2[1] else value2


ds = env.from_collection(result_list, type_info=Types.ROW_NAMED(["name", "url"], [Types.STRING(), Types.STRING()]))
result = ds \
    .map(lambda x: (x.name + "_" + x.url, 1)) \
    .key_by(lambda x: x[0]) \
    .reduce(MyReduceFunction()) \
    .key_by(lambda x: "key") \ ##我们将所有分区结果都放到一个分区内,当数据量大时不要这样干,数据量小可以
    .reduce(MyReduceFunction2()) ##通过第二个reducefunction 获取结果
result.print("reduce:")
env.execute()

截取最后输出结果,通过打印可以看出来,这次只会打印当前累计数最大的组合名称,当有别的组合超过当前最大组合的累计数量才会替换,否则只会打印最大的。

reduce:> ('zhao_baidu.com', 10) # 当没有其他组合的数量超过·zhao_baidu.com·时,一直打印当前
reduce:> ('zhao_baidu.com', 10)
reduce:> ('wang_baidu.com', 11) #当·wang_baidu.com· 最多时就会打印该组合
reduce:> ('wang_baidu.com', 11)
reduce:> ('zhao_baidu.com', 11)
reduce:> ('zhao_baidu.com', 11)
reduce:> ('zhao_baidu.com', 11)
reduce:> ('wang_baidu.com', 12)
reduce:> ('wang_baidu.com', 12)
reduce:> ('wang_baidu.com', 13)
reduce:> ('wang_baidu.com', 13)
reduce:> ('wang_baidu.com', 13)

物理分区(Physical Partitioning)

分区就是将数据进行重新分布,传递到不同的流分区去进行下一步处理。

前面介绍的key_by它就是按照键的哈希值来进行重新分区的操作,只不过这种分区操作只能保证把数据按key来分开,至于能不能分的均匀、每个key去哪个分区,这些都是无法控制的,所以常说key_by是一种逻辑分区操作,是一种"软分区"。

那下面我们介绍下什么是物理分区,也就是"硬分区"。物理分区是真正的分区策略,精准的调配数据,告诉每个数据该去哪个分区。

常见的物理分区策略有随机配( Random)、轮询分配( Round-Robin)、重缩放( Rescale和广播( Broadcast),下边我们分别来做了解。

1.随机分区

随机分区服从均匀分布,所以可以把流中的数据随机打乱,均匀的传递到下游任务分区,因为是随机的,所以就算数据一样,分区可能也可能不会相同。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 这里我们设施并行度是1 ,这里设置多少取决配置,我是本地执行那么就是cpu核数
ds = env.from_collection(
    [('a', 'id=1', 1), ('a', 'id=2', 2), ('a', 'id=3', 3), ('b', 'home=1', 1), ('b', 'home=2', 2)],
    type_info=Types.ROW_NAMED(["key", "url", "value"], [Types.STRING(), Types.STRING(), Types.INT()]))
ds.shuffle().print("shuffle:").set_parallelism(4)  # 经过shuffle后我们设置并行度为4
env.execute()

打印后我们可以看到,其中4条均匀分配到4个分区,多出一条随机到其中一个分区。

shuffle::4> +I[a,id=3,3]
shuffle::3> +I[a,id=2,2]
shuffle::2> +I[a,id=1,1]
shuffle::1> +I[b,home=1,1]
shuffle::2> +I[b,home=2,2]

2.轮询分区(rebalance)

轮询也是一种常见的重分 区方式。简单来说就“发牌”,按照先后顺序将数据做依次分发。其实跟随机分区没啥太大的区别,只不过轮询使用的是Round-Robin负载均衡算法,比random更平均的分配到下游计算任务中。

import random
from pyflink.datastream import StreamExecutionEnvironment
name_list = ["wang", "zhao", "yu"]
url_list = ["baidu.com", "taobao.com", "google.com", "yangshi.com"]

result_list = []
for num in range(100):
    result_tuple = (random.choice(name_list), random.choice(url_list))
    result_list.append(result_tuple)

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 这里我们设施并行度是1 ,这里设置多少取决配置,我是本地执行那么就是cpu核数
ds = env.from_collection(
    result_list
    )
ds.rebalance().print("rebalance:").set_parallelism(4)  # 经过rebalance后我们设置并行度为4
env.execute()
......
rebalance::3> ('wang', 'google.com')
rebalance::2> ('yu', 'taobao.com')
rebalance::1> ('wang', 'baidu.com')
rebalance::2> ('zhao', 'taobao.com')
rebalance::3> ('wang', 'google.com')
rebalance::3> ('wang', 'google.com')
rebalance::3> ('zhao', 'yangshi.com')
rebalance::4> ('yu', 'baidu.com')
rebalance::3> ('zhao', 'google.com')

3.重缩放分区 (rescale)

重缩放分区和轮询分区的算法都一样,都是使用Round-Robin负载均衡算法,但是跟轮询的差别是重缩放只会将数据发送到下游并行任务的一部分。如果发牌人有多个,那么轮询是每个发牌人都面向所有人发牌,而重缩放是分成小团体,发牌人只给自己的团体内的所有人轮流发牌。

什么时候该用重缩放?

1.当数据接收方的数量是数据发送方数量的整数倍时,rescale的效率就会更高。

2. 轮询分区和重缩放的连接机制是不同的,重缩放主要是针对每一个任务和校友对应的部分任务找时间建立通信,可以节省资源。而轮询因为是面向所有的数据接收方,当taskmanger数量较多时,这种跨节点的网络传输必然影响效率。

import random
from pyflink.datastream import StreamExecutionEnvironment
name_list = ["wang", "zhao", "yu"]
url_list = ["baidu.com", "taobao.com", "google.com", "yangshi.com"]

result_list = []
for num in range(100):
    result_tuple = (random.choice(name_list), random.choice(url_list))
    result_list.append(result_tuple)

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 这里我们设施并行度是1 ,这里设置多少取决配置,我是本地执行那么就是cpu核数
ds = env.from_collection(
    result_list
    )
ds.rescale().print("rescale:").set_parallelism(4)  # 经过rescale后我们设置并行度为4
env.execute()

结果跟轮询和随机都差不多,主要是效率问题。

4.广播(broadcast)

这种其实不应该说是重分区,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理,可以调用broadcast(),将输入数据复制并发送到下游算子所有并行任务中去。

我理解这个应该跟spark的广播变量是一个意思,就是输入数据不用每个分区都发了,将输入数据定义成广播变量,这样就不用每个分区都传递变量。 提高效率。

import random
from pyflink.datastream import StreamExecutionEnvironment
name_list = ["wang", "zhao", "yu"]
url_list = ["baidu.com", "taobao.com", "google.com", "yangshi.com"]

result_list = []
for num in range(100):
    result_tuple = (random.choice(name_list), random.choice(url_list))
    result_list.append(result_tuple)

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 这里我们设施并行度是1 ,这里设置多少取决配置,我是本地执行那么就是cpu核数
ds = env.from_collection(
    result_list
    )
ds.broadcast().print("broadcast:").set_parallelism(4)  # 经过broadcast后我们设置并行度为4
env.execute()

5.全局分区(global)

就是将所有数据传递到一个分区中,换句话说就是强制让分区数=1,这个太极端了,基本不常用。但是数据量少,或者是特殊情况可以用。

6.自定义分区(custom)

上面都满足不了你了,那么咱就自定义把。

通过partition_custom方法来自定义分区。需要传递两个参数;第一个是自定义分区器(partitioner)对象,第二个是应用分区器的字段,它的指定方式与keyby指定key的方式基本一样,可以通过字段名称指定,也可以通过字段位置的索引来指定。

from typing import Any

from pyflink.datastream import StreamExecutionEnvironment, Partitioner, KeySelector

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  
##将奇偶数分区
ds = env.from_collection(
    [1, 2, 3, 4, 5, 6, 7, 8]
)


class myPartitioner(Partitioner):

    def partition(self, key: Any, num_partitions: int) -> int:
        return key % 2


class myKeySelector(KeySelector):

    def get_key(self, value):
        return value


ds.partition_custom(myPartitioner(), myKeySelector()).print("custom:").set_parallelism(2)
env.execute()

但是执行有错误。应该是提交作业失败,重新提交,但是我也没试明白。不知道咋回事,等后续在研究把。

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)

数据输出

当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。

1.sink_to

上面1.16版本kafka输入数据,下面新版本的输出,同样都是有问题的,还是建议用旧版把。有新的研究会补充上。

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \
    KafkaRecordSerializationSchema
sink = KafkaSink.builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_record_serializer(
    KafkaRecordSerializationSchema.builder()
        .set_topic('test_sink_topic')
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()
ds.sink_to(sink)

2.add_sink

也可以输出到jdbc中

jdbc_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
    .with_user_name("xxxxxx") \
    .with_password("xxxxxx") \
    .with_driver_name("com.mysql.cj.jdbc.Driver") \
    .with_url("jdbc:mysql://localhost:3306/test_db") \
    .build()

ds.add_sink(JdbcSink.sink("insert test_table(id, message) VALUES(null, ?)",
                          type_info=Types.ROW([Types.STRING()]),
                          jdbc_connection_options=jdbc_options))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/397963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面向新时代,海泰方圆战略升级!“1465”隆重发布!

过去四年,海泰方圆“1344”战略一直在引领公司前行,搭建了非常坚实的战略框架基座,并推动全员在实践和行动中达成深度共识。 “1344”战略 1个定位,代表着当前机构用户的一组共性需求,密码安全数据治理信创工程。 3…

【项目精选】基于JAVA的私人牙科诊所管理系统(视频+论文+源码)

点击下载源码 摘要 随着科技的飞速发展,计算机已经广泛的应用于各个领域之中。在医学领域中,计算机主要应用于两个方面:一是医疗设备智能化,以硬件为主。另一种是病例信息管理系统(HIS)以软件建设为主&…

11.网络编程

1.客户端-服务器客户端和服务器是进程,不是机器或者主机2.网络对于主机,网络只是一种I/O设备,是数据源和数据接收方一个插到I/O总线扩展槽的适配器提供了到网络的物理接口物理上而言,网络是一个按照地理远近组成的层次系统最底层是…

使用Arduino Uno构建一个巡线机器人

使用Arduino Uno构建一个巡线机器人 原文 MX 巡线机器人(**LFR)**是一种简单的自主引导机器人,它遵循在地面上绘制的线来检测白色表面上的暗线或黑暗表面上的白线。在本教程中,使用 Arduino Uno 和一些易于访问的组件构建黑线跟…

动态规划——01背包,完全背包,力扣题型讲解

目录 背包问题 01背包及基础 压缩空间(一维dp滚动数组) 416.分割等和子集 1049.最后一块石头的重量 494.目标和 474.一和零 完全背包 理论基础 518.零钱兑换 Ⅱ 377.组合总和 Ⅳ 70.爬楼梯(n阶,完全背包解法&#xff0…

邻桌为何一天就学完了SQL基础语法,数据分析必学的SQL,满满硬货

因为开学原因,导致好久没有更新博客了,谁家大学生一周五天早八, 今天这篇分享数据库操作和 SQL。 SQL 全称是 Structured Query Language,翻译后就是结构化查询语言,是一种数据库查询和程序设计语言,用于…

SpringSecurity学习(三)自定义数据源、前后端分离案例

文章目录一、自定义数据源1. 认证流程与原理分析AuthenticationManager、ProviderManager、AuthenticationProvider三者关系2. 全局配置AuthenticationManager方式由于WebSecurityConfigurerAdapter过期,我们使用以下写法:3. 编码3.1 创建数据库表与插入…

一张图搞定研发团队管理全景图实例

研发团队往往是公司的生命力“源泉”但是研发团队的管理,往往都会“极端化”:※要不然极端的管理,导致创新力下降;※要不然极端的不管理,导致创新的方向太多,蔓延生长,没有边界,企业…

LearnOpenGL-光照-1.颜色

本人刚学OpenGL不久且自学,文中定有代码、术语等错误,欢迎指正 我写的项目地址:https://github.com/liujianjie/LearnOpenGLProject 文章目录颜色OpenGL代码例子颜色 物体颜色简介 我们在现实生活中看到某一物体的颜色并不是这个物体真正拥有…

C++ Primer Plus 第6版 读书笔记(5)第5章 循环和关系表达式

第5章 循环和关系表达式 本章内容包括&#xff1a;for 循环。表达式和语句。递增运算符和递减运算符&#xff1a;和−−。组合赋值运算符。复合语句&#xff08;语句块&#xff09;。逗号运算符。关系运算符&#xff1a;>、>、 、<、<和!。while 循环。typedef 工…

java怎么写接口,java开发api接口教程

在大家的工作中&#xff0c;经常写界面。 而且&#xff0c;最常用的是http接口。 但是&#xff0c;对于初学者Java工作人员来说&#xff0c;写http界面还很难。 那么&#xff0c;用实例来说明吧。 一、建设项目 首先&#xff0c;生成SpringBoot项目。 省略如何构建此处&#x…

proteus中仿真arduino驱动模拟器件(蜂鸣器继电器电机)

模拟器件如蜂鸣器、继电器、直流电机等在arduino电路中&#xff0c;如果我们接在数字管脚上来驱动往往可能因为驱动电流不够而达不到预期效果&#xff0c;或者没有动作或者没有动静。这篇博文我们专门来讨论一下如何驱动他们。 文章目录一、典型电路1、蜂蜜器(1)蜂鸣器的种类:(…

tun驱动之write

tun的write执行类型下面的代码 int fd open("/dev/net/tun", O_RDWR) write(fd, buf, len); 首先要明确一点&#xff0c;向tun驱动写的数据&#xff0c;最后会进入网络协议栈&#xff0c;相当于外部的数据通过网卡进入网络协议栈。所以写入tun驱动的数据&#xff0…

LSTM网络:一种强大的时序数据建模工具

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️&#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

32位Ubuntu系统安装visual studio code

Step.01 下载vscode安装包 vscode自1.36版本后停止支持32位linux系统&#xff0c;所以要使用<1.36版本。1.33版本下载地址&#xff1a; Visual Studio Code March 2019See what is new in the Visual Studio Code March 2019 Release (1.33)https://code.visualstudio.com…

nvm的使用

nvm工具 nvm是什么nvm下载与安装nvm的基本使用 1、nvm介绍 1.1、基于node的开发 在介绍nvm之前&#xff0c;先介绍下前端开发中关于node的使用。目前前端不管是基于vue或者react框架的开发&#xff0c;都是基于node环境下&#xff0c;进行包的管理与开发的。而不同项目组&a…

work-notes(23):结合typora、git、gitee实现云存储笔记完成的操作过程

时间&#xff1a;2023-03-07 文章目录摘要一、下载 typora二、安装 Git三、创建连接远程仓库四、使用 Git 上传到远程仓库五、到gitee上查看总结摘要 由于很想找一个好用&#xff0c;又有云存储的笔记软件。之前用过 有道笔记&#xff08;还行&#xff0c;量大了难找&#xff…

「MySQL进阶」为什么MySQL用B+树做索引而不用二叉查找树、平衡二叉树、B树

「MySQL进阶」为什么MySQL用B树做索引而不用二叉查找树、平衡二叉树、B树 文章目录「MySQL进阶」为什么MySQL用B树做索引而不用二叉查找树、平衡二叉树、B树一、概述二、二叉查找树三、平衡二叉树四、B树五、B树六、聚集索引和非聚集索引七、利用聚集索引和非聚集索引查找数据利…

剑指 Offer 67 把字符串转换成整数

摘要 面试题67. 把字符串转换成整数 一、字符串解析 根据题意&#xff0c;有以下四种字符需要考虑&#xff1a; 首部空格&#xff1a; 删除之即可&#xff1b;符号位&#xff1a;三种情况&#xff0c;即 , − , 无符号"&#xff1b;新建一个变量保存符号位&#xff0…

螯合剂p-SCN-Bn-TCMC,282097-63-6,双功能配体化合物应用于光学成像应用

p-SCN-Bn-TCMC 反应特点&#xff1a;p-SCN-Bn-TCMC属于双功能配体是螯合剂&#xff0c;也具有共价连接到生物靶向载体&#xff08;如抗体、肽和蛋白质&#xff09;的反应位点。应用于核医学、MRI和光学成像应用。西安凯新生物科技有限公司供应的杂环化合物及其衍生物可制作为具…