Oracle 流stream将删除的数据保存

news2025/1/9 6:30:12

Oracle 流stream将删除的数据保存


--实验的目的是捕获hr.employees表的删除行,将删除行插入到emp_del表中。
--设置初始化参数

AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1


--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
--建立表emp_del,用于存放EMPLOYEES的删除数据

conn hr/hr
CREATE TABLE emp_del( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4),
  timestamp      DATE);

CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);

ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));


--建立管理用户,设定默认表空间,授权

conn / as sysdba
drop user strmadmin cascade;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE 
  TO strmadmin IDENTIFIED BY strmadmin;
ALTER USER strmadmin DEFAULT TABLESPACE users;

GRANT ALL ON hr.emp_del  TO strmadmin;

GRANT EXECUTE ON DBMS_APPLY_ADM        TO strmadmin;
GRANT EXECUTE ON DBMS_AQ               TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM            TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK        TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM      TO strmadmin;

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

--建立流队列,名称叫streams_queue ,用于存储捕获的变化

CONNECT strmadmin/strmadmin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

--配置logmnr使用的表空间,我们就用tools

conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('TOOLS');

--增强日志的模式

ALTER TABLE hr.employees  ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk
  (employee_id) ALWAYS;


--配置捕获程序

CONNECT strmadmin/strmadmin
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'hr.employees',   
    streams_type   => 'capture',
    streams_name   => 'capture_emp',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false);
END;
/


--设置scn

DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name    => 'hr.employees',
    source_database_name  => 'ora9',
    instantiation_scn     => iscn);
END;
/

--配置叫emp_agent的代理程序

BEGIN
  DBMS_AQADM.DROP_AQ_AGENT(
     agent_name => 'emp_agent');
END;
/

BEGIN
  DBMS_AQADM.CREATE_AQ_AGENT(
     agent_name => 'emp_agent');
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'emp_agent',
    db_username => 'strmadmin');
END;
/

--建立队列订户

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('emp_agent', NULL, NULL);  
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name          =>  'strmadmin.streams_queue',
    subscriber                  =>  subscriber,
    rule                =>  NULL,
    transformation      =>  NULL);
END;
/

--建立存储过程enq_row_lcr 

CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) IS
  enqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients   DBMS_AQ.AQ$_RECIPIENT_LIST_T;
  enq_eventid  RAW(16);
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  recipients(1) := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  mprop.RECIPIENT_LIST := recipients;
  DBMS_AQ.ENQUEUE(
    queue_name         => 'strmadmin.streams_queue',
    enqueue_options    => enqopt,
    message_properties => mprop,
    payload            => in_any,
    msgid              => enq_eventid);
END;
/

--建立DML处理存储过程

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(10);
  old_values   SYS.LCR$_ROW_LIST;
BEGIN
  -- Re-enqueue the row LCR for explicit dequeue by another application
  enq_row_lcr(in_any);
  -- Access the LCR
  rc := in_any.GETOBJECT(lcr);
  -- Get the object command type
  command := lcr.GET_COMMAND_TYPE();
  -- Check for DELETE command on the hr.employees table
  IF command = 'DELETE' THEN
    -- Set the command_type in the row LCR to INSERT
    lcr.SET_COMMAND_TYPE('INSERT');
    -- Set the object_name in the row LCR to EMP_DEL
    lcr.SET_OBJECT_NAME('EMP_DEL');
    -- Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    -- Set the old values in the row LCR to the new values in the row LCR
    lcr.SET_VALUES('new', old_values);
    -- Set the old values in the row LCR to NULL
    lcr.SET_VALUES('old', NULL);
    -- Add a SYSDATE value for the timestamp column
    lcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));
    --  Apply the row LCR as an INSERT into the EMP_DEL table
    lcr.EXECUTE(true);
  END IF;
END;
/

--配置DML管理者,为hr.employees

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

--建立存储过程为出列和再入列事件

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
  deqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  msgid        RAW(16);
  payload      SYS.AnyData;
  new_messages BOOLEAN := TRUE;
  row_lcr      SYS.LCR$_ROW_RECORD;
  tc           pls_integer;
  next_trans   EXCEPTION;
  no_messages  EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_messages) LOOP
    BEGIN
      DBMS_AQ.DEQUEUE(
        queue_name          =>  'strmadmin.streams_queue',
        dequeue_options     =>  deqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  msgid);
      COMMIT;
      deqopt.navigation := DBMS_AQ.NEXT;
      IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THEN
        tc := payload.GetObject(row_lcr);   
        DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
      END IF;                       
      EXCEPTION
        WHEN next_trans THEN
        deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
        WHEN no_messages THEN
          new_messages  := FALSE;
          DBMS_OUTPUT.PUT_LINE('No more events');
     END;
  END LOOP; 
END;
/

--配置应用程序

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'hr.employees',
    streams_type    => 'apply', 
    streams_name    => 'apply_emp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  false,
    source_database => 'ora9');
END;
/

--启动应用程序

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_emp', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/
 
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_emp');
END;
/

--启动捕获程序

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_emp');
END;
/

--对hr.employees进行插入,删除和修改
conn hr/hr
INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', 
  NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;

UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;

DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;


CONNECT strmadmin/strmadmin
SELECT * FROM hr.emp_del;

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

EXEC emp_dq('emp_agent');

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

--显示应用程序的错误

COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50

SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGE
  FROM DBA_APPLY_ERROR;


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1676147.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

部分树上问题及图的联通性(图论学习总结部分内容)

文章目录 前言三、部分树上问题及图的联通性最小生成树知识点例题 e g 1 : eg1: eg1: 走廊泼水节(克鲁斯卡尔思想的灵活运用) e g 2 : eg2: eg2: B-Picnic Planning e g 3 eg3 eg3:L - Classic Problem&…

Mybatis入门之在基于Springboot的框架下拿到MySQL中数据

介绍 Java技术操作数据库 MyBatis是一款优秀的持久层框架 用于简化JDBC的开发 优秀的持久层框架 我们要基于Springboot整合Mybatis 实操 学习 基于Mybatis是如何操作数据库的 通过MyBatis书写SQL语句 SQL语句执行完毕后 会将查询结果返回给Java程序 表中数据会自动封装…

灵卡科技HDMI音视频采集及H.264编码一体化采集卡—LCC260

推荐一款由灵卡科技倾力打造的高品质HDMI音视频采集卡——LCC260。以创新的技术,精湛的工艺和卓越的性能,为您提供全方位的音视频解决方案。 LCC260是一款集HDMI音视频采集与H.264编码于一身的全功能采集卡。它的输入端配备了最先进的HDMI 1.4a标准接口&…

Vue实战技巧 —— 企业开发实战中的常见疑难问题

Vue企业开发实战中的常见疑难问题 1. 解决Vue动态路由参数变化,页面数据不更新2. vue组件里定时器销毁问题3. vue实现按需加载组件的两种方式4. 组件之间,父子组件之间的通信方案5. Vue中获取当前父元素,子元素,兄弟元素6. 开发环…

HIVE大数据平台SQL优化分享

相信很多小伙伴在面试的时候,必然跳不过去的一个问题就是SQL脚本的优化,这是很多面试官爱问的问题,也是可以证明你实力进阶的一个重要的能力。 下面给大家分享一个重量级的大数据行业sql技能---hive大数据平台SQL优化。 此文章是大数据平台…

Python中tkinter编程入门4

在Python中tkinter编程入门3-CSDN博客中创建了Button控件,点击该控件就会产生一个点击事件,在创建Button控件时指定该点击事件的处理程序后,按键控件就会对用户的点击事件产生响应。 1 定义事件处理器 定义事件处理器就是一个自定义的函数。…

【UnityRPG游戏制作】Unity_RPG项目_BOSS系统

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:就业…

使用Nginx对网站资源进行加密访问并限制访问IP

你好呀,我是赵兴晨,文科程序员。 大家在工作中有没有遇到过这样的需求,新上的网站部署到生产服务器上,但是还没公开,只允许个别高层领导看。 思来想去,我想到了一个简单的方法,通过Nginx对网站…

【Javaer学习Python】 1、Django安装

安装 Python 和 PyCharm 的方法就略过了,附一个有效激活PyCharm的链接:https://www.quanxiaoha.com/pycharm-pojie/pycharm-pojie-20241.html 1、安装Django # 安装Django pip install Django# 查看当前版本 python -m django --version 5.0.62、创建项…

SpringBoot项目的项目部署全过程

一、前端 安装nginx 1.将提前准备好的nginx的安装包上传到Linux中/opt目录下(我用的是Xftp) 2.解压 2.1:在xshell中解压该文件: tar -zxvf nginx-1.20.1.tar.gz 2.2:进入解压后的目录 cd nginx-1.20.1/ 2.3:安装需要的依赖 yum -y install zlib zlib-devel openssl openssl-de…

如何去掉试卷答案,并打印出来

实际上,针对试卷答案的问题,一个简单而高效的方法是使用图片编辑软件中的“消除笔”功能。只需将试卷拍摄成照片,然后通过这一功能,就可以轻松擦除答案。虽然这种方法可能需要一些时间和耐心,但它确实为我们提供了一个…

Kubernetes 群集部署

一、Kubernetes 概述 1.1、什么是 Kubernetes Kubernetes 是一个可移植、可扩展的开源容器编排系统,主要用于自动化部署、扩展和管理容器应用,提供资源调度、部署管理、服务发现、扩容缩容、监控等功能。对于负载均衡、服务发现、高可用、滚动升级、自…

【智能算法】河马优化算法(HO)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献5.代码获取 1.背景 2024年,MH Amiri受到自然界河马社会行为启发,提出了河马优化算法(Hippopotamus Optimization Algorithm, HO)。 2.算法原理 2.1算法思想 …

《无畏契约》游戏画面出现“撕裂感“,你清楚背后的原理吗?

🌸个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 🏵️热门专栏:🍕 Collection与数据结构 (91平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm1001.2014.3001.5482 🧀Java …

LangChain 核心模块学习 模型输入 Prompts

模型输入 Prompts 一个语言模型的提示是用户提供的一组指令或输入,用于引导模型的响应,帮助它理解上下文并生成相关和连贯的基于语言的输出,例如回答问题、完成句子或进行对话。 提示模板(Prompt Templates)&#xf…

一篇文章带你入门CSRF

1.什么是CSRF 用一个形象生动的比喻就是: 你给朋友的房子留下了备用钥匙,但是有人偷偷拿走了这把钥匙,然后用这把钥匙进入了你朋友的房子并做了各种坏事。你朋友以为只有你能使用这把钥匙,所以没对任何操作产生怀疑。 在这个比…

保研机试之【设备驱动程序】

B选项: 综上,我认为这道题选择D~

从头开始学Spring—02基于XML管理bean

目录 1.实验一:入门案例 2.实验二:获取bean 3.实验三:依赖注入之setter注入 4.实验四:依赖注入之构造器注入 5.实验五:特殊值处理 6.实验六:为类类型属性赋值 7.实验七:为数组类型属性赋值…

Charger之三动态电源路径管理(DPPM)

-----本文简介----- 主要内容包括: 领资料:点下方↓名片关注回复:粉丝群 硬件之路学习笔记公众号 Charger的动态电源路径管理(DPPM) 前篇内容:①电池管理IC(Charger)了解一下&…

国产分布式数据库高可用故障检测实现

在分布式数据库架构下,当数据库节点异常时,数据库管理组件能够自动感知到异常并触发节点隔离或者自动切换,是数据库高可用容灾的基本能力。在节点服务器异常、网络异常或进程异常等场景下,各数据库产品本身已经具备了可靠的检测能…