Oracle Streams Demo 1: Schema Level Capture
Version 11.2.0.2
 
Note: This demonstration is intended to demonstrate basic techniques for implementation of schema level Streams capture and apply. The database Orabase is my standard Windows implementation of the Oracle RDBMS on the laptop I use when traveling to conferences. The second database, TargetDB, is configured on a second laptop running under Linux 5.2 with a connection between the two created using a GigE switch and some Cat 5E cable. Use the library's setup page to create the required tablespaces and to perform other activities as required.
 
Setup As SYS - Prepare Database and Instance (both databases)
linux 192.168.1.91
windows 192.168.1.19

conn / as sysdba

-- validate Oracle parameters
archive log list                    -- document this
show parameter aq_tm_processes      -- min 3
show parameter compatible           -- must be 11.2.0.0.0 or above
show parameter global_names         -- must be TRUE
show parameter undo_retention       -- min. 3600 (1 hr.)

-- altering initialization parameters examples
alter system set compatible=11.2.0.0.0 scope=BOTH;
alter system set global_names=TRUE scope=BOTH;
alter system set undo_retention=3600 scope=BOTH;
-- retest parameter after modification

shutdown immediate;

startup mount;

alter database archivelog;

alter database open;
 
Setup As SYS - Create Streams Administrators - On Source DB create user localadmin, on Target DB create user remoteadmin
conn / as sysdba

CREATE USER localadmin
IDENTIFIED BY localadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 50M ON uwdata;

GRANT create session TO localadmin;
GRANT create database link TO localadmin;
GRANT create procedure TO localadmin;

GRANT aq_administrator_role TO localadmin IDENTIFIED BY localadmin;

GRANT execute ON dbms_streams_adm TO localadmin;
GRANT execute ON dbms_streams_auth TO localadmin;

col username format a12
col profile format a19

SELECT username, created, expiry_date, profile, initial_rsrc_consumer_group
FROM dba_users
WHERE account_status = 'OPEN'
ORDER BY 1;

SELECT *
FROM dba_sys_privs
WHERE grantee = 'LOCALADMIN';

col privilege format a15
col owner format a15

SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;

exec dbms_streams_auth.grant_admin_privilege('LOCALADMIN', TRUE);

exec dbms_streams_auth.grant_remote_admin_access('LOCALADMIN');

SELECT *
FROM dba_streams_administrator;

WITH q AS (SELECT MAX(snap_id) MAXSNAPID FROM dba_hist_streams_pool_advice)
SELECT size_for_estimate, size_factor, estd_spill_count, estd_unspill_count
FROM dba_hist_streams_pool_advice dhspa, q
WHERE dhspa.snap_id = q.maxsnapid;

SELECT first_time , next_time
FROM
dba_logmnr_log ;
has no rows at this point in time

Note: DBMS_LOGMNR_D.SET_TABLESPACE can be used to change the tablespace of the logminer tables if desired
 
Configure TNSNAMES.ORA (both databases)
SOURCE =
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = alpha1.mlib.org)(PORT = 1521))
      (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = orabase)
    )
  )

TARGET =
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = alpha2.mlib.org)(PORT = 1521))
      (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = targetdb)
    )
  )

-- on both servers
ping 192.168.1.19
ping 192.168.1.91

tnsping source
tnsping targetdb

-- on source in SQL*Plus
conn remoteadmin/remoteadmin@target;

-- on target in SQL*Plus
conn localadmin/localadmin@orabase;

-- if any of these do not work listener's need to be properly configured and started
 
Create Database Link
on Source DB on Target DB
conn localadmin/localadmin

SELECT * FROM global_name;

CREATE DATABASE LINK targetdb
CONNECT TO remoteadmin
IDENTIFIED BY remoteadmin
USING 'TARGETDB';

SELECT COUNT(*)
FROM user_objects@target_link;
conn remoteadmin/remoteadmin

SELECT * FROM global_name;

CREATE DATABASE LINK orabase
CONNECT TO locadmin
IDENTIFIED BY locadmin
USING 'ORABASE';

SELECT SYSDATE
FROM dual@alpha1;
 
Prepare Schema Tables for Streams Replication
on Source DB on Target DB
conn / as sysdba

SELECT do.object_id, do.object_name, c.type#
FROM dba_objects do, cdef$ c
WHERE do.object_id = c.obj#
AND do.owner = 'SCOTT'
AND do.object_type = 'TABLE'
ORDER BY 3;

conn scott/tiger

SELECT table_name
FROM user_tables;

ALTER TABLE bonus ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE dept ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE emp ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE salgrade ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

conn / as sysdba

SELECT do.object_id, do.object_name, c.type#
FROM dba_objects do, cdef$ c
WHERE do.object_id = c.obj#
AND do.owner = 'SCOTT'
AND do.object_type = 'TABLE'
ORDER BY 3;
conn / as sysdba

CREATE USER scottrep
IDENTIFIED BY scottrep
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON system
QUOTA 0 ON sysaux
QUOTA 50M ON uwdata;

GRANT create session TO scottrep;
GRANT create table TO scottrep;
GRANT create type TO scottrep;
 
Create Streams Queues
on Source DB on Target DB

dbms_streams_adm.set_up_queue(
queue_table    IN VARCHAR2 DEFAULT 'streams_queue_table',
storage_clause IN VARCHAR2 DEFAULT NULL,
queue_name     IN VARCHAR2 DEFAULT 'streams_queue',
queue_user     IN VARCHAR2 DEFAULT NULL,
comment        IN VARCHAR2 DEFAULT NULL);

conn locadmin/locadmin

SELECT COUNT(*)
FROM user_queues;

SELECT COUNT(*)
FROM user_queue_tables;

exec dbms_streams_adm.set_up_queue(
queue_table => 'SOURCE_SCOTT_QTAB',
storage_clause => 'PCTFREE 0 PCTUSED 99',
queue_name => 'SCOTT_CAPTURE_Q',
queue_user => 'LOCALADMIN',
comment => 'Demo Orabase Streams Queue');

desc user_queues

col name format a24
col queue_table format a18
col queue_type format a16
col retention format a10
col user_comment format a30

SELECT name, queue_table, queue_type, enqueue_enabled,
dequeue_enabled, retention, user_comment
FROM user_queues;

desc user_queue_tables

col object_type format a12
col sort_order format a12
col recipients format a11
col message_grouping format a17

SELECT queue_table, type, object_type, sort_order, recipients, message_grouping
FROM user_queue_tables;

SELECT compatible, primary_instance, secondary_instance, owner_instance, user_comment, secure
FROM user_queue_tables;

desc source_scott_qtab
conn remoteadmin/remoteadmin

SELECT COUNT(*)
FROM user_queues;

SELECT COUNT(*)
FROM user_queue_tables;

exec dbms_streams_adm.set_up_queue(
queue_table    => 'TARGET_SCOTT_QTAB',
storage_clause => 'PCTFREE 0 PCTUSED 99',
queue_name     => 'SCOTT_APPLY_Q',
queue_user     => 'REMOTEADMIN',
comment        => 'Demo TargetDB Streams Queue');

desc user_queues

col name format a24
col queue_table format a18
col queue_type format a16
col retention format a10
col user_comment format a30

SELECT name, queue_table, queue_type, enqueue_enabled,
dequeue_enabled, retention, user_comment
FROM user_queues;

desc user_queue_tables

col object_type format a12
col sort_order format a12
col recipients format a11
col message_grouping format a17

SELECT queue_table, type, object_type, sort_order, recipients, message_grouping
FROM user_queue_tables;

SELECT compatible, primary_instance, secondary_instance, owner_instance, user_comment, secure
FROM user_queue_tables;

desc target_scott_qtab
 
Create Capture Process (on Source DB only)

dbms_streams_adm.add_schema_rules(
schema_name        IN  VARCHAR2,
streams_type       IN  VARCHAR2,
streams_name       IN  VARCHAR2 DEFAULT NULL,
queue_name         IN  VARCHAR2 DEFAULT 'streams_queue',
include_dml        IN  BOOLEAN  DEFAULT TRUE,
include_ddl        IN  BOOLEAN  DEFAULT FALSE,
include_tagged_lcr IN  BOOLEAN  DEFAULT FALSE,
source_database    IN  VARCHAR2 DEFAULT NULL,
dml_rule_name      OUT VARCHAR2,
ddl_rule_name      OUT VARCHAR2,
inclusion_rule     IN  BOOLEAN  DEFAULT TRUE,
and_condition      IN  VARCHAR2 DEFAULT NULL);

SELECT COUNT(*)
FROM dba_streams_schema_rules;

set serveroutput on

DECLARE
 dml_rname VARCHAR2(30);
 ddl_rname VARCHAR2(30);
BEGIN
  dbms_streams_adm.add_schema_rules(
  schema_name  => 'SCOTT',
  streams_type => 'CAPTURE',
  streams_name => 'SCOTT_CAPTURE',
  queue_name   => 'SCOTT_CAPTURE_Q',
  include_dml  => TRUE,
  include_ddl  => TRUE,
  include_tagged_lcr => FALSE,
  dml_rule_name  => dml_rname,
  ddl_rule_name  => ddl_rname,
  inclusion_rule => TRUE,
  and_condition  => NULL);

  dbms_output.put_line('DML Rule Name: ' || dml_rname);
  dbms_output.put_line('DDL Rule Name: ' || ddl_rname);
END;
/

-- typical return values
DML Rule Name: "LOCALADMIN"."SCOTT1"
DDL Rule Name: "LOCALADMIN"."SCOTT2"

desc dba_capture

SELECT * FROM dba_capture;

desc dba_capture_extra_attributes

SELECT * FROM dba_capture_extra_attributes;

desc dba_capture_parameters

col capture_name format a16
col parameter format a28

SELECT * FROM dba_capture_parameters;

desc dba_capture_prepared_schemas

SELECT * FROM dba_capture_prepared_schemas;

desc dba_capture_prepared_tables

col table_owner format a12
col table_name format a12

SELECT * FROM dba_capture_prepared_tables;
 
Export / Import Scott Schema (preferably do this with DataPump)
Note: This step requires that the schema in the Source database be created in the Target database. The example demonstrates exporting and importing the data using DataPump. Transferring the physical file between the servers can be done with anything from SCP or FTP to a flash drive depending upon circumstances.
on Source DB on Target DB
SELECT directory_path
FROM dba_directories
WHERE directory_name = 'DATA_PUMP_DIR';

-- in a terminal window
expdp system/oracle1 DIRECTORY=data_pump_dir DUMPFILE=streamsdemo1.dmp SCHEMAS=scott
SELECT directory_path
FROM dba_directories
WHERE directory_name = 'DATA_PUMP_DIR';

-- in a terminal window
impdp system/oracle1 DIRECTORY=data_pump_dir DUMPFILE=data_pump_dir:streamsdemo1.dmp SCHEMAS=scott
REMAP_SCHEMA=scott:scottrep
 
Create Propagation Process (on the source database only)

dbms_streams_adm.add_schema_propagation_rules(
schema_name            IN  VARCHAR2,
streams_name           IN  VARCHAR2 DEFAULT NULL,
source_queue_name      IN  VARCHAR2,
destination_queue_name IN  VARCHAR2,
include_dml            IN  BOOLEAN  DEFAULT TRUE,
include_ddl            IN  BOOLEAN  DEFAULT FALSE,
include_tagged_lcr     IN  BOOLEAN  DEFAULT FALSE,
source_database        IN  VARCHAR2 DEFAULT NULL,
dml_rule_name          OUT VARCHAR2,
ddl_rule_name          OUT VARCHAR2,
inclusion_rule         IN BOOLEAN   DEFAULT TRUE,
and_condition          IN VARCHAR2  DEFAULT NULL,
queue_to_queue         IN BOOLEAN   DEFAULT NULL);
 
set serveroutput on

DECLARE
 ddl_rname VARCHAR2(30);
 dml_rname VARCHAR2(30);
BEGIN
  dbms_streams_adm.add_schema_propagation_rules(
  schema_name  => 'SCOTT',
  streams_name => 'SCOTT_CAPTURE',
  source_queue_name => 'SCOTT_CAPTURE_Q',
  destination_queue_name => 'SCOTT_APPLY_Q',
  include_dml => TRUE,
  include_ddl => FALSE,
  include_tagged_lcr => FALSE,
  source_database => 'ORABASE',
  dml_rule_name => dml_rname,
  ddl_rule_name => ddl_rname,
  inclusion_rule => TRUE,
  and_condition => NULL,
  queue_to_queue => TRUE);

  dbms_output.put_line('DML Rule Name: ' || dml_rname);
  dbms_output.put_line('DDL Rule Name: ' || ddl_rname);
END;
/

-- typical return values
DML Rule Name: "LOCALADMIN"."SCOTT16"
DDL Rule Name:


desc dba_propagation

col propagation_name format a17
col SQName format a16
col DQOwner format a11
col DQName format a14
col DBLINK format a10
col rule_set_name format a14
col negative_rule_set_name format a23
col queue_to_queue format a15

SELECT propagation_name, status, source_queue_name SQName, destination_queue_owner DQOwner, destination_queue_name DQName,
destination_dblink DBLINK
FROM dba_propagation;

SELECT propagation_name, rule_set_name, negative_rule_set_name, queue_to_queue
FROM dba_propagation;

-- examine rules created by previous actions
desc dba_rules

col rule_owner format a12
col rule_name format a12

SELECT rule_owner, rule_name, rule_condition
FROM dba_rules;

desc dba_rulesets

col ruleset_name format a20
col base_table format a35
col ruleset_comment format a30

SELECT ruleset_name, base_table, ruleset_comment
FROM dba_rulesets
WHERE owner = 'LOCALADMIN';

desc dba_rule_set_rules

col rule_set_name format a20
col rule_set_rule_comment format a32

SELECT rule_set_name, rule_name, rule_set_rule_enabled, rule_set_rule_eval_ctx_name
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN';

SELECT rule_set_name, rule_name, rule_set_rule_enabled, rule_set_rule_comment
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN';

desc dba_streams_schema_rules

col streams_name format a14
col streams_type format a13
col rule_type format a10
col inc_tagged_lcr format a15

SELECT streams_name, streams_type, rule_type, include_tagged_lcr INC_TAGGED_LCR
FROM dba_streams_schema_rules;

SELECT streams_name, rule_type, rule_name, rule_condition
FROM dba_streams_schema_rules;

-- revisit the capture and propagation status values
SELECT capture_name, status, captured_scn, applied_scn
FROM dba_capture;

SELECT propagation_name, status
FROM dba_propagation;

-- add a table propagation rule to only replicate information on employees who's department is 20

DECLARE
 ins_rule VARCHAR2(30);
 upd_rule VARCHAR2(30);
 del_rule VARCHAR2(30);
BEGIN
  dbms_streams_adm.add_subset_rules(
    table_name         => 'SCOTT.EMP',
    dml_condition      => 'DEPTNO=20',
    streams_type       => 'CAPTURE',
    streams_name       => 'SCOTT_CAPTURE',
    queue_name         => 'SCOTT_CAPTURE_Q',
    include_tagged_lcr =>FALSE,
    source_database    => NULL,
    insert_rule_name   => ins_rule,
    update_rule_name   => upd_rule,
    delete_rule_name   => del_rule);

  dbms_output.put_line('IRULE: ' || ins_rule);
  dbms_output.put_line('URULE: ' || upd_rule);
  dbms_output.put_line('DRULE: ' || del_rule);
END;
/

-- typical return values
"LOCALADMIN"."EMP10"
"LOCALADMIN"."EMP10"
"LOCALADMIN"."EMP10"

SELECT rule_set_name, rule_name, rule_set_rule_enabled
FROM dba_rule_set_rules
WHERE rule_set_owner = 'LOCALADMIN'
AND rule_name = 'EMP10';

desc dba_streams_table_rules

col table_name format a11
col dml_condition format a14
col subsetting_operation format a21

SELECT streams_name, streams_type, table_name, rule_type, dml_condition, subsetting_operation
FROM dba_streams_table_rules;

SELECT streams_name, source_database, rule_name, rule_condition
FROM dba_streams_table_rules;
 
Create Capture Process

dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name   IN VARCHAR2,
source_database_name IN VARCHAR2,
instantiation_scn    IN NUMBER,
apply_database_link  IN VARCHAR2 DEFAULT NULL,
recursive            IN BOOLEAN  DEFAULT FALSE);

on Source DB on Target DB
SELECT dbms_flashback.get_system_change_number
FROM dual;
BEGIN
  dbms_apply_adm.set_schema_instantiation_scn(
  source_schema_name => 'SCOTT',
  source_database_name => 'ORABASE',
  instantiation_scn => 4648838),
  apply_database_link => NULL,
  recursive => FALSE);
END;
/
 
Monitor Streams Replication
SELECT dp.propagation_name, TO_CHAR(dqs.start_date, 'HH24:MI:SS MM/DD/YY') AS START_DATE,
dqs.message_delivery_mode AS MSG_DEL_MODE, dqs.next_time, dqs.latency,
DECODE(dqs.schedule_disabled, 'Y', 'Disabled', 'N', 'Enabled') AS SCHEDULE_DISABLED,
dqs.process_name, (dqs.total_bytes/1024/1024) AS SIZE_MB
FROM dba_queue_schedules dqs, dba_propagation dp
WHERE dp.destination_dblink = COALESCE(REGEXP_SUBSTR(dqs.destination, '[^@]+', 1, 2), dqs.destination)
AND dqs.schema = dp.source_queue_owner
AND dqs.qname = dp.source_queue_name
ORDER BY dp.propagation_name;

SQL> select count(*) from DBA_STREAMS_COLUMNS where owner = 'SCOTT';

COUNT(*)
----------
18

SQL> SELECT RULE_NAME, RULE_TYPE, DML_CONDITION
2 FROM DBA_STREAMS_RULES
3 WHERE STREAMS_NAME = 'SCOTT_CAPTURE';

RULE_NAME RULE_TYPE DML_CONDITION
------------ ---------- --------------
ORABASE5 DDL
ORABASE4 DML
SCOTT1 DML
SCOTT2 DDL
EMP7 DML DEPTNO=20
EMP8 DML DEPTNO=20
EMP9 DML DEPTNO=20
EMP10 DML DEPTNO=20
EMP11 DML DEPTNO=20
EMP12 DML DEPTNO=20

SELECT RULE_NAME, RULE_CONDITION
FROM DBA_STREAMS_RULES
WHERE STREAMS_NAME = 'SCOTT_CAPTURE';

SQL> select dbms_flashback.get_system_change_number() from dual;

DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER()
-----------------------------------------
4588418

-- good on target
exec dbms_apply_adm.set_schema_instantiation_scn(source_schema_name => 'SCOTT', source_database_name => 'ORABASE', instantiation_scn => 4588418, apply_database_link => 'TARGETDB', recursive => FALSE);

-- good on target
select apply_name, status from dba_apply;

-- good on source
BEGIN
  dbms_capture_adm.start_capture(capture_name => 'SCOTT_CAPTURE');
END;
/

SQL> select capture_name, start_scn, status
2 from dba_capture;

CAPTURE_NAME START_SCN STATUS
------------------------------ ---------- --------
SCOTT_CAPTURE 4489419 ENABLED


INSERT INTO emp VALUES (1111, 'MORGAN', 'DBA', 7566, TRUNC(SYSDATE), 4000, 0, 20);
COMMIT;
 
Post-Streams Clean-up
on Source DB on TARGETDB
conn / as sysdba

-- stop capture process and queue
 exec dbms_streams_adm.remove_queue('SCOTT_CAPTURE_Q', TRUE, TRUE);

ALTER TABLE scott.bonus
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE scott.dept
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE scott.emp
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE scott.salgrade
DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

DROP USER localadmin CASCADE;

DROP DATABASE LINK targetdb;
conn / as sysdba

-- stop apply process


DROP USER remoteadmin CASCADE;
 
 
Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: © 2012 Daniel A. Morgan All Rights Reserved