KisFlow-Golang流式实时计算案例(四)-KisFlow在消息队列MQ中的应用

news2025/4/12 8:34:40

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用


DownLoad kis-flow source

$go get github.com/aceld/kis-flow

KisFlow with Kafka

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

这里以github.com/segmentio/kafka-go 作为第三方Kafka Client SDK(开发者也可以选择其他kafka的go开发工具)。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // 最长等待时间
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with Nsq

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

本KisFlow消费者以github.com/nsqio/go-nsq作为第三方SDK。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with RocketMQ

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

github.com/apache/rocketmq-client-go 作为RocketMQ消费者SDK。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFloq := kis.Pool().GetFlow("CalStuAvgScore")
    if myFloq == nil {
        panic("myFloq is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFloq.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFloq.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}

作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(三)-KisFlow在多协程中的应用

最后编辑于:2025-03-31 20:49:36


喜欢的朋友记得点赞、收藏、关注哦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328176.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode:1582. 二进制矩阵中的特殊位置(python3解法)

难度&#xff1a;简单 给定一个 m x n 的二进制矩阵 mat&#xff0c;返回矩阵 mat 中特殊位置的数量。 如果位置 (i, j) 满足 mat[i][j] 1 并且行 i 与列 j 中的所有其他元素都是 0&#xff08;行和列的下标从 0 开始计数&#xff09;&#xff0c;那么它被称为 特殊 位置。 示…

Cline – OpenRouter 排名第一的CLI 和 编辑器 的 AI 助手

Cline – OpenRouter 排名第一的CLI 和 编辑器 的 AI 助手&#xff0c;Cline 官网&#xff1a;https://github.com/cline/cline Star 37.8k ps&#xff0c;OpenRouter的网址是&#xff1a;OpenRouter &#xff0c;这个排名第一&#xff0c;据我观察&#xff0c;是DeepSeek v3…

Mock.js虚拟接口

Vue3中使用Mock.js虚拟接口数据 一、创建项目 pnpm创建vite的项目,通过 PNPM来简化依赖管理。若还没有安装 PNPM&#xff0c;可以通过 npm来安装&#xff1a; 安装 PNPM npm install -g pnpm//使用国内镜像加速pnpm add -g pnpmlatestpnpm config set registry http://regis…

LoRa模块通信距离优化:如何实现低功耗覆盖30公里无线传输要求

在物联网&#xff08;IoT&#xff09;快速发展的今天&#xff0c;LoRa&#xff08;Long Range&#xff09;技术作为一种基于扩频调制的远距离无线通信技术&#xff0c;因其远距离通信、低功耗和强抗干扰能力等优势&#xff0c;在农业监测、城市智能管理、环境监测等多个领域得到…

OpenCV 从入门到精通(day_05)

1. 模板匹配 1.1 什么是模板匹配 模板匹配就是用模板图&#xff08;通常是一个小图&#xff09;在目标图像&#xff08;通常是一个比模板图大的图片&#xff09;中不断的滑动比较&#xff0c;通过某种比较方法来判断是否匹配成功。 1.2 匹配方法 rescv2.matchTemplate(image, …

OpenRouter开源的AI大模型路由工具,统一API调用

简介 ‌OpenRouter是一个开源的路由工具‌&#xff0c;它可以绕过限制调用GPT、Claude等国外模型。以下是对它的详细介绍&#xff1a; 一、主要功能 OpenRouter专注于将用户请求智能路由到不同的AI模型&#xff0c;并提供统一的访问接口。它就像一个“路由器”&#xff0c;能…

zabbix监控网站(nginx、redis、mysql)

目录 前提准备&#xff1a; zabbix-server主机配置&#xff1a; 1. 安装数据库 nginx主机配置&#xff1a; 1. 安装nginx redis主机配置&#xff1a; 1. 安装redis mysql主机配置&#xff1a; 1. 安装数据库 zabbix-server&#xff1a; 1. 安装zabbix 2. 编辑配置文…

蓝桥杯冲刺

例题1&#xff1a;握手问题 方法1&#xff1a;数学推理(简单粗暴&#xff09; 方法2&#xff1a;用代码实现方法1 #include<iostream> using namespace std; int main() {int result 0;for (int i 1; i < 49; i){for (int j i 1; j < 50; j){//第i个人与第j个…

Spring Security(maven项目) 3.1.0

前言&#xff1a; 通过实践而发现真理&#xff0c;又通过实践而证实真理和发展真理。从感性认识而能动地发展到理性认识&#xff0c;又从理性认识而能动地指导革命实践&#xff0c;改造主观世界和客观世界。实践、认识、再实践、再认识&#xff0c;这种形式&#xff0c;循环往…

C# 从代码创建选型卡+表格

private int tabNum 1; private int sensorNum 5; private void InitializeUI() {// 创建右侧容器面板Panel rightPanel new Panel{Dock DockStyle.Right,Width 300,BackColor SystemColors.ControlDark,Parent this};// 根据防区数量创建内容if (tabNum &g…

OpenCV 从入门到精通(day_02)

1. 边缘填充 为什么要填充边缘呢&#xff1f;我们以下图为例&#xff1a; 可以看到&#xff0c;左图在逆时针旋转45度之后原图的四个顶点在右图中已经看不到了&#xff0c;同时&#xff0c;右图的四个顶点区域其实是什么都没有的&#xff0c;因此我们需要对空出来的区域进行一个…

Ceph异地数据同步之-RBD异地同步复制(上)

#作者&#xff1a;闫乾苓 文章目录 前言基于快照的模式&#xff08;Snapshot-based Mode&#xff09;工作原理单向同步配置步骤单向同步复制测试双向同步配置步骤双向同步复制测试 前言 Ceph的RBD&#xff08;RADOS Block Device&#xff09;支持在两个Ceph集群之间进行异步镜…

【C++】STL库_stack_queue 的模拟实现

栈&#xff08;Stack&#xff09;、队列&#xff08;Queue&#xff09;是C STL中的经典容器适配器 容器适配器特性 不是独立容器&#xff0c;依赖底层容器&#xff08;deque/vector/list&#xff09;通过限制基础容器接口实现特定访问模式不支持迭代器操作&#xff08;无法遍历…

一周学会Pandas2 Python数据处理与分析-编写Pandas2 HelloWord项目

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 我们首先准备一个excel文件&#xff0c;用来演示pandas操作数据集(数据的集合)。excel文件属于数据集的一种&#xf…

【易订货-注册/登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…

如何在Windows上找到Python安装路径?两种方法快速定位

原文&#xff1a;如何在Windows上找到Python安装路径&#xff1f;两种方法快速定位 | w3cschool笔记 在 Windows 系统上找到 Python 的安装路径对于设置环境变量或排查问题非常重要。本文将介绍两种方法&#xff0c;帮助你找到 Python 的安装路径&#xff1a;一种是通过命令提…

lvgl避坑记录

一、log调试 #if LV_USE_LOG && LV_LOG_LEVEL > LV_LOG_LEVEL_INFOswitch(src_type) {case LV_IMG_SRC_FILE:LV_LOG_TRACE("lv_img_set_src: LV_IMG_SRC_FILE type found");break;case LV_IMG_SRC_VARIABLE:LV_LOG_TRACE("lv_img_set_src: LV_IMG_S…

element-plus中,表单校验的使用

目录 一.案例1&#xff1a;给下面的表单添加校验 1.目的要求 2.步骤 ①给需要校验的el-form-item项&#xff0c;添加prop属性 ②定义一个表单校验对象&#xff0c;里面存放了每一个prop的检验规则 ③给el-form组件&#xff0c;添加:rules属性 ④给el-form组件&#xff0…

PyTorch复现线性模型

【前言】 本专题为PyTorch专栏。从本专题开始&#xff0c;我将通过使用PyTorch编写基础神经网络&#xff0c;带领大家学习PyTorch。并顺便带领大家复习以下深度学习的知识。希望大家通过本专栏学习&#xff0c;更进一步了解人更智能这个领域。 材料来源&#xff1a;2.线性模型_…

Kafka+Zookeeper从docker部署到spring boot使用完整教程

文章目录 一、Kafka1.Kafka核心介绍&#xff1a;​核心架构​核心特性​典型应用 2.Kafka对 ZooKeeper 的依赖&#xff1a;3.去 ZooKeeper 的演进之路&#xff1a;注&#xff1a;&#xff08;本文采用ZooKeeper3.8 Kafka2.8.1&#xff09; 二、Zookeeper1.核心架构与特性2.典型…