kafka实时返回浏览数据

news2024/11/26 22:51:56

在安装完kafka(Docker安装kafka_docker 部署kafka-CSDN博客),查看容器是否启动:

docker ps | grep -E 'kafka|zookeeper'

再用python开启服务

from fastapi import FastAPI, Request
from kafka import KafkaProducer
import kafka
import json
import logging
from datetime import datetime

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

# 初始化 FastAPI 应用
app = FastAPI()

# 示例博客文章数据
blog_posts = [
    {"id": 1, "title": "First Post", "content": "This is the first post."},
    {"id": 2, "title": "Second Post", "content": "This is the second post."}
]

def produce_view_event(ip_address, post_id):
    """
    生成博客文章的查看事件。

    参数:
        ip_address (str): 查看者的 IP 地址。
        post_id (int): 被查看的文章 ID。
    """
    logging.info(f"生成查看事件,文章 ID: {post_id},IP 地址: {ip_address}")

    try:
        # 初始化 Kafka 生产者
        producer = KafkaProducer(
            bootstrap_servers='110.40.130.231:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

        # 准备发送到 Kafka 的消息
        message = {
            "ip_address": ip_address,
            "post_id": post_id,
            "event_type": "view"
        }

        logging.info(f"发送消息到 Kafka: {message}")
        future = producer.send('blog_views', value=message)

        try:
            # 等待消息成功发送
            record_metadata = future.get(timeout=10)
            logging.info(
                f"消息发送成功。主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
        except Exception as e:
            logging.error(f"发送消息失败: {e}")

        # 确保所有消息已发送并关闭生产者
        producer.flush()
        producer.close()

        # 将查看事件打印到控制台
        print_view_event(ip_address, post_id)
    except kafka.errors.NoBrokersAvailable as e:
        logging.error(f"没有可用的 Broker: {e}")

def print_view_event(ip_address, post_id):
    """
    打印博客文章的查看事件。

    参数:
        ip_address (str): 查看者的 IP 地址。
        post_id (int): 被查看的文章 ID。
    """
    event_type = "view"
    created_at = datetime.now().isoformat()
    print(
        f"View Event - IP Address: {ip_address}, Post ID: {post_id}, Event Type: {event_type}, Created At: {created_at}")

@app.get("/posts/{post_id}")
def get_post(post_id: int, request: Request):
    """
    根据 ID 获取博客文章。

    参数:
        post_id (int): 博客文章的 ID。
        request (Request): 进来的请求对象。

    返回:
        dict: 如果找到文章则返回文章,否则返回错误信息。
    """
    logging.info(f"收到请求,文章 ID: {post_id}")
    for post in blog_posts:
        if post["id"] == post_id:
            logging.info(f"找到文章: {post}")
            produce_view_event(request.client.host, post_id)
            return post
    return {"error": "文章未找到"}

if __name__ == "__main__":
    import uvicorn
    import os

    # 获取当前文件名(不带扩展名)供 UVicorn 使用
    app_modeel_name = os.path.basename(__file__).replace(".py", "")
    print(app_modeel_name)

    # 使用 UVicorn 运行 FastAPI 应用
    uvicorn.run(f"{app_modeel_name}:app", host='0.0.0.0', port=1213, reload=True)

访问:http://110.40.130.231:1213/posts/1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2236143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用QtWebEngine的Mac应用如何发布App Store

前言 因为QtWebEngine时第三方包,苹果并不直接支持进行App Store上签名和发布,所以构建和发布一个基于使用QtWebEngine的应用程序并不容易,这里我们对Qt 5.8稍微做一些修改,以便让我们的基于QtWeb引擎的应用程序并让签名能够得到苹果的许可。 QtWebEngine提供了C++和Qml的…

C++虚继承演示

在继承中如果出现: 这种情况,B和C都继承了A,D继承了B、C 在D中访问A的成员会出现: 这样的警告 是因为在继承时A出现两条分支:ABD、ACD 编译器不知道访问的A中的元素是经过B继承还是C继承 所以B、C在继承A时要用到…

【赵渝强老师】Redis的RDB数据持久化

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出会造成服务器中的数据库状态也会消失。所以 Redis 提供了数据持久化功能。Redis支持两种方式的持久化,一种是RDB方式;另一种是AOF(ap…

【计算机网络】章节 知识点总结

一、计算机网络概述 1. 计算机网络向用户提供的两个最重要的功能:连通性、共享 2. 因特网发展的三个阶段: 第一阶段:从单个网络 ARPANET 向互联网发展的过程。1983 年 TCP/IP 协议成为 ARPANET 上的标准协议。第二阶段:建成三级…

Linux基础(十)——磁盘分区、格式化、检验和挂载

磁盘分区、格式化、检验和挂载 1.观察磁盘的分区状态2.UUID3.磁盘分区(gdisk/fdisk)3.1 gdisk3.2 fdisk 4.磁盘的格式化4.1 XFS文件系统的格式化4.2 ext4文件系统的格式化 5.文件系统的救援6.文件系统的挂载与卸载6.1 挂载6.2 卸载 7.设置开机挂载8.特殊…

Android的BroadcastReceiver

1.基本概念:BroadCast用于进程间或者线程间通信 本质上是用Binder方法,以AMS为订阅中心,完成注册,发布,监听的操作。 2.简单实现的例子 package com.android.car.myapplication;import android.content.BroadcastRe…

一招解决Mac没有剪切板历史记录的问题

使用Mac的朋友肯定都为Mac的剪切功能苦恼过,旧内容覆盖新内容,导致如果有内容需要重复输入的话,就需要一次一次的重复复制粘贴,非常麻烦 但其实Mac也能够有剪切板历史记录功能,iCopy,让你的Mac也能拥有剪切…

利用泰勒公式近似计算10的平方根

文章目录 1. 泰勒公式是什么2、利用泰勒公式计算 10 \sqrt{10} 10 ​第 1 步:泰勒级数展开第 2 步:计算各阶导数第 3 步:在 x 9 x 9 x9 处计算各阶导数第 4 步:构建各阶泰勒展开式第 5 步:计算 f ( 10 ) f(10) f(1…

python-读写Excel:openpyxl-(4)下拉选项设置

使用openpyxl库的DataValidation对象方法可添加下拉选择列表。 DataValidation参数说明: type: 数据类型("whole", "decimal", "list", "date", "time", "textLength", "custom"…

淘宝商品详情API大揭秘:用Python开启探险之旅

淘宝,一个充满奇迹的丛林 在这个名为淘宝的丛林里,每一件商品都是一座神秘的宝藏。而我们,作为勇敢的探险家,将用Python这把瑞士军刀,去揭开这些宝藏的面纱。准备好了吗?让我们一起踏上这段奇妙的探险之旅…

AJAX 全面教程:从基础到高级

AJAX 全面教程:从基础到高级 目录 什么是 AJAXAJAX 的工作原理AJAX 的主要对象AJAX 的基本用法AJAX 与 JSONAJAX 的高级用法AJAX 的错误处理AJAX 的性能优化AJAX 的安全性AJAX 的应用场景总结与展望 什么是 AJAX AJAX(Asynchronous JavaScript and XML…

电路设计中的防接反电路

在现代电子产品设计中,防接反电路是确保设备正常运作、避免损坏的重要措施之一。尤其是在日用电器或工厂生产的电子产品中,常常会涉及电源接反的情况。如果产品设计中没有考虑到这一点,可能会导致电路损坏,甚至对使用者造成安全隐患。因此,如何设计有效的防接反电路,是电…

从零开始学习python 7(持续更新ing)

一、Python函数 1.1 函数的定义 函数的定义:实现【特定功能】的代码块。 函数的作用: 简化代码提高代码重用性便于维护和修改提高代码的可扩展性 函数的三要素: 功能 len() max() sum()参数 s.clear() s.append(华清远见)返回值 s.sort()…

NUMAP应用成果亮相中国核学会核反应堆热工流体力学分会第四届学术年会

10月28日-30日,中国核学会核反应堆热工流体力学分会第四届(2024年)学术年会在北京隆重召开。该学术年会是我国反应堆热工流体领域中方向设置最全、规模最大、最具影响力的学术交流盛会。大会共设置3个专题研讨会、8个主题论坛,组织…

Odoo:免费开源的医药流通行业信息化解决方案

文 / 开源智造Odoo亚太金牌服务 方案概述 开源智造Odoo免费开源ERP提供面向医药批发采、供、销业财一体化,及直接面向消费者的门店终端、全渠道管理、营销管理以及GSP合规管理解决方案,提升企业运营效率和全业务链条的数字化管控、追溯能力。 行业的最新…

支付业务以及支付业务的质量保障

前一段时间要在组里分享支付业务,网上找了很多文章,发现有好多也是我自己写的。回头看看3年前的文章,当时对支付有一定的测试经验,但是对支付的了解也不是很深。 经过这近两年的支付相关的测试经验,对支付业务有了更深…

实现视频一键压缩的10款工具大盘点:

在这个信息爆炸的时代,我们们已经习惯了那些高清的4K视频,但是却出现了新的问题。那就是文件过大臃肿。不用担心,你可以使用视频压缩工具来解决这个问题;市场上的视频压缩工具可谓五花八门,不仅具备了智能的算法压缩技…

【spark的集群模式搭建】Standalone集群模式的搭建(简单明了的安装教程)

文章目录 1、使用Anaconda部署Python2、上传、解压、重命名3、创建软连接4、配置spark环境变量5、修改 spark-env.sh配置文件6、启动hdfs,创建文件夹7、修改spark-defaults.conf配置文件8、修改workers配置文件9、修改log4j.properties配置文件(可选&…

《现代网络技术》读书笔记:SDN数据平面和OpenFlow

本文部分内容来源于《现代网络技术:SDN,NFV,QoE、物联网和云计算:SDN,NFV,QoE,IoT,andcloud》 SDN数据平面 SDN 数据平面也称为基础设施层,而在ITU-T的Y3300标准中则称为资源层,它是网络转发设备根据 SDN控制平面的决策来执行数据…

无线婴儿监视器方案(附SI24R1选型)

随着现代科技的进步,父母们对宝宝的关注和保护达到了前所未有的高度。为了满足这一需求,市场上涌现出了一系列智能婴儿监视器。这些设备不仅能实时监控宝宝的活动,还能让父母在家中的任何角落都能轻松掌握宝宝的动态。今天,我们向…