LiveKit的agent介绍

news2025/1/12 1:53:48

概念

LiveKit核心概念:

  • Room(房间)
  • Participant(参会人)
  • Track(信息流追踪)

Agent 架构图

订阅信息流

agent交互流程

客户端操作

加入房间

房间创建方式

手动

赋予用户创建房间的权限,在客户的加入并创建房间。

自动

客户的指定ws_url和token,加入指定房间。

room = LiveKit.create(appContext = applicationContext)
room.connect(wsUrl, token)

离开房间

调用 Room.disconnect() 通知 LiveKit 离开事件。如果应用程序在未通知 LiveKit 的情况下关闭,则将继续显示参与者在 Room 中 15 秒。

Swift上,当应用程序退出时,会自动调用 Room.disconnect

发送消息

发送方式

客户端通过LocalParticipant.publishData API 向房间中的任何参与者发布任意数据消息。房间数据通过 WebRTC 数据通道发布到SFU;LiveKit 服务器会将该数据转发给聊天室中的一个或多个参与者。

给指定用户发消息,通过设置destinationIdentities ,它表示用户的身份。

// 发送消息
coroutineScope.launch {
  val data: ByteArray = //...

  // 发送有损消息给全员,LOSSY表示数据发送一次,无顺序保证。这对于优先考虑交付速度的实时更新来说是理想的选择。
  room.localParticipant.publishData(data, DataPublishReliability.LOSSY)

  // 发送可靠的消息给指定成员,RELIABLE表示发送数据时最多重试3次并保证顺序。适合优先保证交付而不是抵延迟的场景,例如室内聊天。
  val identities = listOf(
    Participant.Identity("alice"),
    Participant.Identity("bob"),
  )
  room.localParticipant.publishData(data, DataPublishReliability.RELIABLE, identities)
}

// 处理接收到的消息
coroutineScope.launch {
  room.events.collect { event ->
    if(event is RoomEvent.DataReceived) {
        // Process data
    }
  }
}

消息大小限制

由于 SCTP 协议的限制,对大于 16 KiB 的消息使用数据通道是不切实际的,包括 LiveKit 的协议包装器。我们建议将消息大小保持在 15 KiB 以下。详细了解数据通道大小限制。

消息的topic

消息可以指定topic,在接收方通过topic进行过滤出感兴趣的消息。

发送信息流

livekit默认支持摄像头、麦克风、录屏3个流,也支持用户发布自定义流的配置。

音视频流

// Turns camera track on
room.localParticipant.setCameraEnabled(true)

// Turns microphone track on
room.localParticipant.setMicrophoneEnabled(true)

录屏流

// Create an intent launcher for screen capture
// This *must* be registered prior to onCreate(), ideally as an instance val
val screenCaptureIntentLauncher = registerForActivityResult(
    ActivityResultContracts.StartActivityForResult()
) { result ->
    val resultCode = result.resultCode
    val data = result.data
    if (resultCode != Activity.RESULT_OK || data == null) {
        return@registerForActivityResult
    }
    lifecycleScope.launch {
        room.localParticipant.setScreenShareEnabled(true, data)
    }
}

// When it's time to enable the screen share, perform the following
val mediaProjectionManager =
    getSystemService(MEDIA_PROJECTION_SERVICE) as MediaProjectionManager
screenCaptureIntentLauncher.launch(mediaProjectionManager.createScreenCaptureIntent())

自定义流配置

// Option 1: set room defaults
val options = RoomOptions(
  audioTrackCaptureDefaults = LocalAudioTrackOptions(
    noiseSuppression = true,
    echoCancellation = true,
    autoGainControl = true,
    highPassFilter = true,
    typingNoiseDetection = true,
  ),
  videoTrackCaptureDefaults = LocalVideoTrackOptions(
    deviceId = "",
    position = CameraPosition.FRONT,
    captureParams = VideoPreset169.H1080.capture,
  ),
  audioTrackPublishDefaults = AudioTrackPublishDefaults(
    audioBitrate = 20_000,
    dtx = true,
  ),
  videoTrackPublishDefaults = VideoTrackPublishDefaults(
    videoEncoding = VideoPreset169.H1080.encoding,
  )
)
var room = LiveKit.create(
  ...
  roomOptions = options,
)

// Option 2: create tracks manually
val localParticipant = room.localParticipant
val audioTrack = localParticipant.createAudioTrack("audio")
localParticipant.publishAudioTrack(audioTrack)

val videoTrack = localParticipant.createVideoTrack("video", LocalVideoTrackOptions(
  CameraPosition.FRONT,
  VideoPreset169.H1080.capture
))
localParticipant.publishVideoTrack(videoTrack)

订阅信息流

默认用户进入房间,会监听所有信息流。

coroutineScope.launch {
  room.events.collect { event ->
    when(event) {
      is RoomEvent.TrackSubscribed -> {
        // Audio tracks are automatically played.
        val videoTrack = event.track as? VideoTrack ?: return@collect
        videoTrack.addRenderer(videoRenderer)
      }
      else -> {}
    }
  }
}

监听事件

事件分为:room事件和参与者事件。这是事件列表:

EVENT

DESCRIPTION

ROOM EVENT

PARTICIPANT EVENT

ParticipantConnected 参与者Connected

A RemoteParticipant joins after the local participant.

✔️

RemoteParticipant 在本地参与者之后加入。

ParticipantDisconnected 参与者断开连接

A RemoteParticipant leaves

✔️

RemoteParticipant 离开

Reconnecting 重新连接

The connection to the server has been interrupted and it's attempting to reconnect.

✔️

与服务器的连接已中断,它正在尝试重新连接。

Reconnected 重新

Reconnection has been successful

✔️

重新连接成功

Disconnected 断开

Disconnected from room due to the room closing or unrecoverable failure

✔️

由于会议室关闭或无法恢复的故障而与会议室断开连接

TrackPublished 轨迹已发布

A new track is published to room after the local participant has joined

✔️

✔️

本地参加者加入后,新轨道将发布到聊天室

TrackUnpublished TrackUnpublished (未发布)

A RemoteParticipant has unpublished a track

✔️

✔️

RemoteParticipant 已取消发布轨道

TrackSubscribed

The LocalParticipant has subscribed to a track

✔️

✔️

LocalParticipant 已订阅跟踪

TrackUnsubscribed 跟踪Unsubscribed

A previously subscribed track has been unsubscribed

✔️

✔️

之前订阅的曲目已取消订阅

TrackMuted TrackMuted (轨道静音)

A track was muted, fires for both local tracks and remote tracks

✔️

✔️

轨道已静音,本地轨道和远程轨道均触发

TrackUnmuted TrackUnmuted (轨道未静音)

A track was unmuted, fires for both local tracks and remote tracks

✔️

✔️

轨道已取消静音,本地轨道和远程轨道均触发

LocalTrackPublished LocalTrack已发布

A local track was published successfully

✔️

✔️

已成功发布本地轨道

LocalTrackUnpublished

A local track was unpublished

✔️

✔️

本地曲目未发布

ActiveSpeakersChanged ActiveSpeakers已更改

Current active speakers has changed

✔️

当前当前活跃的发言人已更改

IsSpeakingChanged

The current participant has changed speaking status

✔️

当前参与者已更改发言状态

ConnectionQualityChanged 连接质量已更改

Connection quality was changed for a Participant

✔️

✔️

参与者的连接质量已更改

ParticipantMetadataChanged

A participant's metadata was updated via server API

✔️

✔️

参与者的元数据已通过服务器 API 更新

RoomMetadataChanged RoomMetadataChanged 的

Metadata associated with the room has changed

✔️

与聊天室关联的元数据已更改

DataReceived 已接收数据

Data received from another participant or server

✔️

✔️

从其他参与者或服务器接收的数据

TrackStreamStateChanged TrackStreamStateChanged (已更改)

Indicates if a subscribed track has been paused due to bandwidth

✔️

✔️

指示订阅的曲目是否因带宽而暂停

TrackSubscriptionPermissionChanged

One of subscribed tracks have changed track-level permissions for the current participant

✔️

✔️

其中一个已订阅的轨道已更改当前参与者的轨道级别权限

ParticipantPermissionsChanged

When the current participant's permissions have changed

✔️

✔️

ParticipantPermissions已更改

当前参与者的权限发生更改时

服务端操作

生成用户token

需要LiveKit服务的API_KEY和API-SECRET,通过LiveKit API生成JWT令牌。

通过登录JWT获取到用户的信息,identify=user_id+场景,name=用户昵称(默认值),room名称=场景名(user_id)

# server.py
import os
from livekit import api
from flask import Flask

app = Flask(__name__)

@app.route('/getToken')
def getToken():
 token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \
 .with_identity("identity") \
 .with_name("my name") \
 .with_grants(api.VideoGrants(
 room_join=True,
 room="my-room",
 ))
 return token.to_jwt()

开发环境可以通过CLI快速创建token:

livekit-cli token create   --api-key devkey --api-secret secret   --join --room test_room --identity test_user   --valid-for 24h

token属性

基于JWT的令牌,包含用户身份、放假名称、功能、权限等。按照场景颁发token,也就是对应的房间。

聊天室权限在解码的加入令牌的 video 字段中指定。它可能包含以下一个或多个属性:

FIELD

TYPE

DESCRIPTION

roomCreate room创建

bool

Permission to create or delete rooms

创建或删除聊天室的权限

roomList roomList 会议室

bool

Permission to list available rooms

列出可用会议室的权限

roomJoin room加入

bool

Permission to join a room

加入聊天室的权限

roomAdmin roomAdmin 管理员

bool

Permission to moderate a room

管理聊天室的权限

roomRecord roomRecord (房间记录)

bool

Permissions to use Egress service

使用 Egress 服务的权限

ingressAdmin 入口管理员

bool 布尔

Permissions to use Ingress service

Ingress 服务使用权限

room 房间

string 字符串

Name of the room, required if join or admin is set

聊天室的名称,如果设置了 join 或 admin,则为必填项

canPublish 可以发布

bool 布尔

Allow participant to publish tracks

允许参与者发布轨迹

canPublishData

bool 布尔

Allow participant to publish data to the room

允许参与者将数据发布到聊天室

canPublishSources

string[] 字符串[]

When set, only listed source can be published. (camera, microphone, screen_share, screen_share_audio)

设置后,只能发布列出的源。(摄像头、麦克风、screen_share、screen_share_audio)

canSubscribe canSubscribe 订阅

bool 布尔

Allow participant to subscribe to tracks

允许参加者订阅曲目

canUpdateOwnMetadata

bool 布尔

Allow participant to update its own metadata

允许参与者更新自己的元数据

hidden 隐藏

bool 布尔

Hide participant from others in the room

对聊天室中的其他人隐藏参与者

kind 类

string 字符串

Type of participant (standard, ingress, egress, sip, or agent). this field is typically set by LiveKit internals.

参与者类型(标准、入口、出口、SIP 或代理)。此字段通常由 LiveKit 内部设置。

session断开操作

用户离开房间后,回话会结束,通过add_shutdown_callback回调,可以处理后续操作。例如:发送聊天结束事件。

async def entrypoint(ctx: JobContext):
    async def my_shutdown_hook():
        # save user state
        ...
    ctx.add_shutdown_callback(my_shutdown_hook)

Agent操作

创建Agent服务节点

LiveKit的Agent框架现在只支持python的SDK,文档地址如下:https://docs.livekit.io/agents/quickstart/

这是官方给的demo:

import asyncio

from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero


# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
    # Create an initial chat context with a system prompt
    initial_ctx = llm.ChatContext().append(
        role="system",
        text=(
            "You are a voice assistant created by LiveKit. Your interface with users will be voice. "
            "You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
        ),
    )

    # Connect to the LiveKit room
    # indicating that the agent will only subscribe to audio tracks
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    # VoiceAssistant is a class that creates a full conversational AI agent.
    # See https://github.com/livekit/agents/tree/main/livekit-agents/livekit/agents/voice_assistant
    # for details on how it works.
    assistant = VoiceAssistant(
        vad=silero.VAD.load(),
        stt=deepgram.STT(),
        llm=openai.LLM(),
        tts=openai.TTS(),
        chat_ctx=initial_ctx,
    )

    # Start the voice assistant with the LiveKit room
    assistant.start(ctx.room)

    await asyncio.sleep(1)

    # Greets the user with an initial message
    await assistant.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
    # Initialize the worker with the entrypoint
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

Agent的生命周期

  1. 当worker程序启动时,会通过websocket连接到LiveKit服务器,将自己注册成worker。一个worker下会有多个子进程(Agent)来处理请求。
  2. 当用户进入房间时,LiveKit服务器通过负载均衡选择一个worker,为用户提供服务。
  3. 子进程处理来自用户的消息,并给出回复。
  4. 当用户退出房间时,房间滚啊比,并且断开与agent的连接。

Agent内部执行流程

agent在处理请求时,包含几个节点:

  1. request handler:判断能否处理请求,不能请求则LiveKit会讲任务交给其他worker
  2. entrypoint:agent进入房间之前,执行的初始化操作
  3. prewarm function:agent进程启动时调用,可以执行加载模型等耗时的操作

Worker类型

opts = WorkerOptions(
    ...
    # when omitted, the default is JobType.JT_ROOM
    worker_type=JobType.JT_ROOM,
)

JobType 枚举有两个选项:

  • JT_ROOM:将为每个房间创建一个新的代理实例。
  • JT_PUBLISHER:将为房间里的每个参与者创建一个新的代理实例。

Agent处理请求

处理音频流

@ctx.room.on("track_subscribed")
def on_track_subscribed(
    track: rtc.Track,
    publication: rtc.TrackPublication,
    participant: rtc.RemoteParticipant,
):
    # 监听音频流
    if track.kind == rtc.TrackKind.KIND_AUDIO:
        audio_stream = rtc.AudioStream(track)
        async for event in audio_stream:
            do_something(event.frame)

发布音频流

发布音频涉及将流拆分为长度固定的音频帧。内部缓冲区保存 50 毫秒长的音频队列,实时发送。用于发送新帧的 capture_frame 方法是阻塞的,在缓冲区接收整个帧之前阻塞在那里。这样可以更轻松地处理中断。

为了发布音轨,需要事先确定采样率和声道数,以及每帧的长度(样本数)。下面的示例是在 10ms 长帧中以 48kHz 传输恒定的 16 位正弦波:

SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHz

async def entrypoint(ctx: JobContext):
    await ctx.connect()

    source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)
    track = rtc.LocalAudioTrack.create_audio_track("example-track", source)
    # since the agent is a participant, our audio I/O is its "microphone"
    options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
    # ctx.agent is an alias for ctx.room.local_participant
    publication = await ctx.agent.publish_track(track, options)

    frequency = 440
    async def _sinewave():
        audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)
        audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)

        time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATE
        total_samples = 0
        while True:
            time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATE
            sinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)
            np.copyto(audio_data, sinewave)

            # send this frame to the track
            await source.capture_frame(frame)
            total_samples += samples_per_channel

处理文本消息

监听data_received事件,处理用户发来的消息;通过publish_data()发送消息给用户。

@room.on("data_received")
def on_data_received(data: rtc.DataPacket):
  logging.info("received data from %s: %s", data.participant.identity, data.data)

# string payload will be encoded to bytes with UTF-8
await room.local_participant.publish_data("my payload",
                reliable=True,
                destination_identities=["identity1", "identity2"],
                topic="topic1")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2117618.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【原创】java+springboot+mysql校园疫情管理系统设计与实现

个人主页:程序猿小小杨 个人简介:从事开发多年,Java、Php、Python、前端开发均有涉猎 博客内容:Java项目实战、项目演示、技术分享 文末有作者名片,希望和大家一起共同进步,你只管努力,剩下的交…

【JAVA开源】基于Vue和SpringBoot的图书个性化推荐系统

本文项目编号 T 015 ,文末自助获取源码 \color{red}{T015,文末自助获取源码} T015,文末自助获取源码 目录 一、系统介绍1.1 业务分析1.2 用例设计1.3 时序设计 二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究…

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-…

[AHK] 调用函数动态生成ListBox窗口

需求背景 动态生成向导对话框,由用户选一个选项,类似做选择题。 运行效果 AHK v1 代码 if(A_ScriptFullPathA_LineFile)MsgBox % ListBox("窗口标题", "这是一个生成listbox的Demo", "a|b|c|d|",3) return ;---------…

【小沐学OpenGL】Ubuntu环境下glew的安装和使用

文章目录 1、简介1.1 OpenGL简介1.2 glew简介 2、安装glew2.1 命令安装glew2.2 直接代码安装glew2.3 cmake代码安装glew 3、测试glew3.1 测试glewfreeglut3.2 测试glewglfw 结语 1、简介 1.1 OpenGL简介 Linux 系统中的 OpenGL 是一个跨语言、跨平台的应用程序编程接口&#…

智能的PHP开发工具PhpStorm v2024.2全新发布——支持日志文件

PhpStorm是一个轻量级且便捷的PHP IDE,其旨在提高用户效率,可深刻理解用户的编码,提供智能代码补全,快速导航以及即时错误检查。可随时帮助用户对其编码进行调整,运行单元测试或者提供可视化debug功能。 立即获取PhpS…

【私活儿分享】手串珠子管理小程序,便捷查询珠子(串手链的珠子)位置

前言 之间帮客户做了个查询手串珠子位置的小程序,便于帮助客户管理众多的珠子,这个珠子就是戴在手上串起来的饰品。好了,话不多说,进入正题! 正文 小程序比较简单,采用云开发。两个页面,一个查…

Git 新手指南

Git 命令大全 Git 是目前最流行的分布式版本控制系统,用于跟踪文件的更改,协调不同开发者的协作。掌握 Git 命令能够极大提高工作效率,尤其在软件开发过程中。本文将详细介绍 Git 的一些常用命令,帮助你更好地理解和使用 Git。 1…

一款免费开源的截图软件,SETUNA截图软件

SETUNA是一款功能强大且便捷的屏幕截图工具,适用于多种场景,包括日常办公、学习和游戏娱乐等。该软件的主要特点如下: 高效截图:用户可以轻松截取屏幕上的任何部分,并且支持自定义选取截图范围。图片编辑功能&#xf…

聊聊go语言channel中的一些小技巧

写在文章开头 go语言提供了各种非常方便的语法糖,使得我们实现用最少的语法做尽可能高效的事情,而本文就简单介绍如何实现非阻塞处理多个channel,希望对你有帮助。 Hi,我是 sharkChili ,是个不断在硬核技术上作死的技…

项目进度一

一.双token验证登陆/注册 1.在前端中,得到响应记录acessToken和 refreshToken ,并记录在 localStorage中,当登录之后的请求都要携带着accessToken,如果accessToken过期,就再验证一下refreshToken,如果也过期就需要重新登录&#…

亚马逊测评自养号卖家如何以低成本提高店铺产品曝光率和销量?

在跨境电商领域,随着市场日趋饱和和竞争加剧,卖家普遍面临流量低、转化率低的共同挑战。为了在这种严重的“内卷化”环境中脱颖而出,不少卖家投入了大量的资金和资源,尝试了各种站内和站外的推广方式,但往往因为缺乏成…

Vue2中使用ant-design的tab组件让他一行充满

使用tabs组件默认样式这样 想改成水平居中铺满如下: 需要改下css样式 /deep/ .ant-tabs-nav {width: 100%;& > div {width: 100%;display: flex;align-items: center;}.ant-tabs-tab {flex: 1;text-align: center;}}

Python中matplotlib-legend图例水平排列

在matplotlib中,图例是用来标识不同数据系列或不同类别的标记和颜色的标签。有时候我们希望将图例水平排列,以节省空间并使得图例更加美观。本文将介绍如何在matplotlib中实现图例的水平排列。 方法一:使用legend的loc参数 我们可以通过leg…

Android通知(Notification)的基本用法

Android通知(Notification)的基本用法 通知(Notification)是Android系统中比较有特色的一个功能,当某个应用程序希望向用户发出一些提示信息,而该应用程序又不在前台运行时,就可以借助通知来实…

云计算实训43——部署k8s基础环境、配置内核模块、基本组件安装

一、K8S基本概念 1、k8s是什么 K8S是Kubernetes的 缩写,由于k 和 s 之间有⼋个字符,所以因此得名。 Kubernetes 是⼀个可移植的、可扩展的开源平台,⽤于管理容器化 的⼯作负载和服务,可促进声 明式配置和⾃动化。 2、k8s的功…

pico 手臂手部手指完整版

别忘了设置好pico的设置和导入需要的样式 一、动画设置 1.1设置Avatar 遮罩; 1.2创建动画、手指动画的话,我创建了四个,分别对应的是(平展、弯曲食指、弯曲其他手指、握拳) 1.3建立分层、我建了三层,默认层&#xff…

第二十章 加密 SOAP 主体

文章目录 第二十章 加密 SOAP 主体加密概述加密 SOAP 主体 第二十章 加密 SOAP 主体 本主题介绍如何加密 IRIS Web 服务和 Web 客户端发送的 SOAP 消息正文。 主题“加密安全标头元素”和“使用派生密钥令牌进行加密和签名”描述了如何加密安全标头元素以及加密 SOAP 主体的其…

这10个提示词技巧太强了,用过的人都说真香!

ChatGPT 已经发布一年多了,这一年,我们经常会听到“提示词工程(Prompt Engineering)”这个术语💻 在探讨这个概念之前,我们需要先了解什么是Prompt(提示词)🤔 简单来说…

面试题复习(0902-0909)

1. 完全背包问题 和01背包唯一的区别是&#xff0c;每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09; 代码和01唯一的区别在于j的循环是从小到大&#xff0c;不是从大到小。ij谁在外谁在内层区别不大。 #include <bits/stdc.h> using namespace std…