20221124 kafka实时数据写入Redis

news2024/9/23 3:17:37

一、上线结论

  • 实现了将用户线上实时浏览的沉浸式视频信息,保存在Redis中这样一个功能。
  • 为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景,后续也能扩展到其他所有场景。
  • 用于两个场景:(1)根据用户近期观看物料匹配相似物料(2)过滤用户近期观看物料

二、实现效果展示

用户在线上刷一个视频,redis就会将用户的视频信息保存在用户历史浏览的队列中。

队列大小为100。具体保存的信息如下所示:

一、Redis存储KEY:kafka:user_short_video_streaming:_5c91e0cf0cf2f3d119f92774
二、Redis存储value:[{"duration":4,"resourceId":"28808","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":9,"resourceId":"24262","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":5,"resourceId":"25330","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}]

三、实现策略

  • 采用Java语言实现,
  • 先监听kafka,然后解析kafka消息,进行解码,再解析,从解析后的结果中提取user_id, resource_id和resource_type字段。
  • 连接Redis,构造用户队列,队列长度设置为100(用户刷的视频个数),将数据写入Redis
  • 队列大小为100,超过100顺序pop

代码Git:http://gitlab.dzj.com/applied_algorithm/data_analysis/kafka_streaming_immersive.git

四、项目后续规划

  • 扩展到Feed流,搜索召回等全部的场景
  • jar包后台运行方式改为CICD部署

五、附录

5.1 BUG分享

在实现的过程中,遇到一个序列化问题,就是写入的key和value乱码,导致用Python查询的定义好的KEY的时候查询不到,解决方案如下:

自定义RedisTemplete进行重写,  用jackson进行序列化,将这个类注册到Spring Boot中

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package com.dzj.kafka_streaming.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

/**

 * @Author : wangyongpeng

 * @Date : 2022/12/16 14:34

 * @Description : 重写RedisTemplate, 进行序列化

 */

@Configuration

public class RedisConfig {

    @Bean

    @SuppressWarnings("all")

    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate();

        template.setConnectionFactory(redisConnectionFactory);

        // JSON序列化配置

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();

        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        // String 的序列化

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key 采用String的序列化方式

        template.setKeySerializer(stringRedisSerializer);

        // hash的key也采用String的序列化方式

        template.setHashKeySerializer(stringRedisSerializer);

        // valuex序列化方式采用jackson

        template.setValueSerializer(jackson2JsonRedisSerializer);

        // hash的序列化也用jackson

        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;

    }

}

5.2 项目核心代码

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

package com.dzj.kafka_streaming.listener;

import com.dzj.kafka_streaming.dto.TagNameTypeInfo;

import com.dzj.kafka_streaming.service.ContentTagRelationService;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;

import java.util.ArrayList;

import java.util.Base64;

import java.util.List;

/**

 * "immersive_streaming_" + userId; 这是旧的key,需要清除

 */

@Component

public class MessageListener {

    @Autowired

    private ContentTagRelationService relationService;

    @Resource

    private RedisTemplate<String, Object> redisTemplate;

    private final String TOPIC_NAME = "event-trace-log";

    // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")

    @KafkaListener(topics = {TOPIC_NAME})

    public void listener(ConsumerRecord<String,String> record)  {

        //获取消息

        String message = record.value();

        //消息偏移量

        long offset = record.offset();

        String redisKeyPrefix = "kafka:user_short_video_streaming:_";

        JSONObject dataJson = parseJson(message);

        String eventCode = dataJson.getString("eventCode");

        if ("145001".equals(eventCode)){

            // 测试环境------------------------------------------------------------------------------------------

            // 目前只关注沉浸式中得数据

            String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");

            String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");

            Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");

            String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");

            String userId = dataJson.getJSONObject("eventBody").getString("userId");

            String appType = dataJson.getJSONObject("eventBody").getString("appType");

            // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));

            /**

             * 写入Redis

             * redis存储结构: key = List(5),是一个定长为5,右进左出的队列

             * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个

             */

            String key = redisKeyPrefix + userId;

    //        String key = "immersive_streaming_wyp0001";

            // 定义Redis队列写入的结构

            JSONObject redisListItem = new JSONObject();

            redisListItem.put("resourceId",resourceId);

            redisListItem.put("resourceType",resourceType);

            redisListItem.put("duration",duration);

            redisListItem.put("actionCode",actionCode);

            redisListItem.put("appType",appType);

            String redisListItemString = redisListItem.toJSONString();

            if (redisTemplate.opsForList().size(key) >= 100){

                Object leftPop = redisTemplate.opsForList().leftPop(key);

                redisTemplate.opsForList().rightPush(key, redisListItemString);

                System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

            }else {

                if (!resourceId.isEmpty() && !resourceType.isEmpty()){

                    redisTemplate.opsForList().rightPush(key, redisListItemString);

                    Long size = redisTemplate.opsForList().size(key);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

                }

            }

        }

    }

     

    /**

     * 解析json,解码功能

     */

    public JSONObject parseJson(String message) {

        JSONObject messageJson = JSONObject.parseObject(message);

        String dataString = messageJson.getString("data");

        // --------------------base64解码字符串--------------------

        String data_string = "";

        final Base64.Decoder decoder = Base64.getDecoder();

        try{

            data_string = new String(decoder.decode(dataString), "UTF-8");

        }catch (Exception e){

            System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e);

        }

        // string转换为json,只取eventCode = '145001'沉浸式的

        JSONObject dataJson = JSONObject.parseObject(data_string);

        return dataJson;

    }

    /**

     * 从数据库查询

     * @param resourceId

     * @param resourceType

     * @return

     */

    public List<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){

        List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>();

        try {

            tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);

        catch (Exception e){

            System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到.......");

        }

        return tagNameTypeInfos;

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1552525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一分钟开服 《幻兽帕鲁》游戏专属服务器by京东云主机

使用京东云服务器搭建幻兽帕鲁Palworld游戏联机服务器教程&#xff0c;非常简单&#xff0c;京东云推出幻兽帕鲁镜像系统&#xff0c;镜像直接选择幻兽帕鲁镜像即可一键自动部署&#xff0c;不需要手动操作&#xff0c;真正的新手0基础部署幻兽帕鲁&#xff0c;阿腾云atengyun.…

创新指南|如何将人工智能应用于未来的创新管理——并不断付诸实践

ChatGPT 的推出加剧了围绕人工智能的炒作&#xff0c;现在我们看到了前所未有的巨大进展。对于我们这些热衷于创新的人来说&#xff0c;这是一个激动人心的时刻。他们正在共同采取措施&#xff0c;充分利用人工智能进行创新管理。本文将阐述人工智能能为创新管理做什么&#xf…

iptables添加端口映射,k8s主机查询不到端口但能访问。

研究原因&#xff1a;k8s内一台主机使用命令查询没有80端口。但通过浏览器访问又能访问到服务。 查询了资料是使用了hostport方式暴露pod端口。cni调用iptables增加了DNAT规则。访问时流量先经过iptables直接被NAT到具体服务去了。 链接: K8s罪魁祸首之"HostPort劫持了我…

RegSeg 学习笔记(待完善)

论文阅读 解决的问题 引用别的论文的内容 可以用 controlf 寻找想要的内容 PPM 空间金字塔池化改进 SPP / SPPF / SimSPPF / ASPP / RFB / SPPCSPC / SPPFCSPC / SPPELAN &#xfffc; ASPP STDC&#xff1a;short-term dense concatenate module 和 DDRNet SE-ResNeXt …

短视频矩阵系统究竟怎么样,如何实现一站式精准获客?

在数字化时代的浩渺海洋中&#xff0c;短视频如同一股汹涌的浪潮&#xff0c;席卷了众多企业的营销战略。如何在这样的背景下&#xff0c;驾驭这股浪潮&#xff0c;将其转化为获客的强大动力&#xff0c;已成为众多企业家们心中的疑问。而蜘蛛短视频矩阵系统的出现&#xff0c;…

iOS开发进阶(十一):ViewController 控制器详解

文章目录 一、前言二、UIViewController三、UINavigationController四、UITabBarController五、UIPageViewController六、拓展阅读 一、前言 iOS 界面开发最重要的首属ViewController和View&#xff0c;ViewController是View的控制器&#xff0c;也就是一般的页面&#xff0c;…

岭师大数据技术原理与应用-序章-软工版

HeZaoCha-CSDN博客 序章—软工版 一、环境介绍1. VMware Workstation Pro2. CentOS3. Java4. Hadoop5. HBase6. MySQL7. Hive 二、系统安装1. 虚拟网络编辑器2. 操作系统安装 三、结尾 先说说哥们写这系列博客的原因&#xff0c;本来学完咱也没想着再管部署这部分问题的说&…

第二证券今日投资参考:低空经济迎利好 自动驾驶商业化提速

昨日&#xff0c;两市股指盘中弱势震动&#xff0c;午后加快下探&#xff0c;沪指失守3000点大关&#xff0c;深成指、创业板指跌超2%&#xff1b;到收盘&#xff0c;沪指跌1.26%报2993.14点&#xff0c;深成指跌2.4%报9222.47点&#xff0c;创业板指跌2.81%报1789.82点&#x…

目前常见的搜索引擎有哪些?

常见的搜索引擎可以分为两类&#xff1a;全网搜索类和平台内搜索。 全网搜索类是指可以在互联网范围内进行搜索的引擎&#xff0c;它们提供了广泛的搜索结果&#xff0c;包括网页、图片、视频、新闻等各种类型的内容。以下是一些常见的全网搜索引擎&#xff1a; 百度&#xff…

idea类已经存在却报错

一句话导读 在idea中导入新的项目&#xff0c;很多类都飘红报错&#xff0c;mvn compile可以通过&#xff0c;可能是因为idea缓存问题导致。 由于这个项目是由老项目复制过来后&#xff0c;再继续开发新的功能&#xff0c;很多同事导入后&#xff0c;都爆出新的类找不到。而编译…

linux nginx配置ssl, 实现https+ip访问

mkdir sslZhengShu openssl req -newkey rsa:2048 -nodes -keyout ca.key -out ca.csr openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt openssl genrsa -out server.key 2048 openssl req -new -key server.key -out server.csr 和之前输入一样即可 …

反序列化动态调用 [NPUCTF2020]ReadlezPHP1

在源代码上看到提示 访问一下看看 代码审计一下 <?php #error_reporting(0); class HelloPhp {public $a;public $b;public function __construct(){$this->a "Y-m-d h:i:s";$this->b "date";}public function __destruct(){$a $this->a;…

【Go】结构体中Tag标识

https://blog.csdn.net/weixin_45193103/article/details/123876319 https://blog.csdn.net/qq_49723651/article/details/122005291 https://juejin.cn/post/7005465902804123679 学一点&#xff0c;整一点&#xff0c;基本都是综合别人的&#xff0c;弄成我能理解的内容 Tag定…

ubuntu编译OpenCV and seetaFace2

opencv opencv-4.5.2 opencv_contrib-4.5.2 SeetaFace2 SeetaFace2-master https://github.com/seetafaceengine 指定安装目录&#xff0c;和OpenCV放一个目录下了 安装前 安装 安装后 Qt安装 Windows下 Linux下 报错1 原因&#xff1a; 报错…

CMS(内容管理系统)

一、系统的编写可以在开源网站上下载一个相关项目&#xff0c;然后做2次开发 企业建站系统:MetInfo(米拓)、蝉知、SiteServer CMs等; B2C商城系统:商派Shopex、ECshop、HiShop、XpShop等; 门户建站系统:DedeCMS(织梦)、帝国CMS、PHPCMS、动易、CmsTop等; 博客系统:WordPres…

【JavaWeb】Day18.Vue组件库Element

什么是Element Element&#xff1a;是饿了么团队研发的&#xff0c;一套为开发者、设计师和产品经理准备的基于 Vue 2.0 的桌面端组件库。组件&#xff1a;组成网页的部件&#xff0c;例如 超链接、按钮、图片、表格、表单、分页条等等。官网&#xff1a;Element - The worlds…

Capture One Pro 22 for Mac/win:重塑RAW图像处理的艺术

在数字摄影的世界里&#xff0c;RAW图像处理软件无疑是摄影师们手中的魔法棒&#xff0c;而Capture One Pro 22无疑是这一领域的璀璨明星。这款专为Mac和Windows系统打造的图像处理软件&#xff0c;以其出色的性能、丰富的功能和极致的用户体验&#xff0c;赢得了全球摄影师的广…

C++优先队列——priority_queue,函数对象,labmda表达式,pair等

头文件&#xff1a;#include<queue> 内部使用堆来实现&#xff0c;在需要或得最大的几个值或最小的几个值而不关心整个数组的顺序时非常好用。 用法&#xff1a; priority_queue<int, vector<int>, greater<int>>q; 第一个参数为堆中存储的元素。 …

Rust控制台输出跑马灯效果,实现刷新不换行,实现loading效果

要在 Rust 中实现控制台刷新而不换行&#xff0c;以实现类似 "loading" 状态的效果&#xff0c;你可以使用 \r&#xff08;回车符&#xff09;来覆盖上一行的内容。 use std::io::{self, Write}; use std::thread; use std::time::Duration;fn main() {let loading_…

使用Jenkins打包时执行失败,但手动执行没有问题如ERR_ELECTRON_BUILDER_CANNOT_EXECUTE

具体错误信息如&#xff1a; Error output: Plugin not found, cannot call UAC::_ Error in macro _UAC_MakeLL_Cmp on macroline 2 Error in macro _UAC_IsInnerInstance on macroline 1 Error in macro _If on macroline 9 Error in macro FUNCTION_INSTALL_MODE_PAGE_FUNC…