Oracle AQ Demo 1
Version 11.2.0.3
 
Setup As SYS
conn / as sysdba

-- validate Oracle parameters
show parameter aq_tm_processes
show parameter job_queue_processes

SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;

col owner format a10
col grantee format a10
col grantor format a10

SELECT *
FROM queue_privileges;

-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 20M ON uwdata;

GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;

GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin;   -- required for demo but not for AQ
GRANT execute on dbms_crypto TO aqadmin; -- required for demo but not for AQ

SELECT username, account_status, created
FROM dba_users
ORDER BY 1;

SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';

col privilege format a15
col owner format a15

SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;

-- create AQ users
CREATE USER gen_pharmacy
IDENTIFIED BY gen_pharm
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;

GRANT create session, create synonym TO gen_pharmacy;

CREATE USER icu_pharmacy
IDENTIFIED BY icu_pharm
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;

GRANT create session, create synonym TO icu_pharmacy;

CREATE USER icu
IDENTIFIED BY icu
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;

GRANT create session, create_synonym TO icu;

CREATE USER nurses_stn
IDENTIFIED BY nurses_stn
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;

GRANT create session, create synonym TO nurses_stn;
 
Setup As AQADMIN
conn aqadmin/aqadmin

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id     NUMBER,
source VARCHAR2(30),
rx     VARCHAR2(30));
/

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

desc message_t

-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;

CREATE TABLE test (
reg_col   VARCHAR2(11),
see_msg   message_t);

desc test

set describe depth all linenum on indent on

desc test

INSERT INTO test
(reg_col, see_msg)
VALUES
('Test Values', message_t(1, 'Thorazine', USER));

SELECT * FROM test;

DROP TABLE test PURGE;
 
Build As AQADMIN
-- table to hold dequeued messages
CREATE TABLE rx_processed_data (
rx_id            NUMBER,
rx_name          VARCHAR2(30),
source           VARCHAR2(30),
mprop_priority   NUMBER(38),
mprop_delay      NUMBER(38),
mprop_attempts   NUMBER(38),
msg_enqueue_time DATE,
mprop_state      NUMBER(5),
processed_by     VARCHAR2(30),
processed_ts     TIMESTAMP);

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

Create a Queue Table
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table        IN VARCHAR2,      -- table's name
queue_payload_type IN VARCHAR2,      -- user defined data type's name
storage_clause     IN VARCHAR2       DEFAULT NULL,  -- define pctfree
sort_list          IN VARCHAR2       DEFAULT NULL,  -- priority and/or enq_time
multiple_consumers IN BOOLEAN        DEFAULT FALSE, -- FALSE: message can be consumed only once
message_grouping   IN BINARY_INTEGER DEFAULT NONE,  -- TRANSACTIONAL: messages in a transaction are a group
comment            IN VARCHAR2       DEFAULT NULL,  -- definer's comments
auto_commit        IN BOOLEAN        DEFAULT TRUE,  -- deprecated parameter
primary_instance   IN BINARY_INTEGER DEFAULT 0,     -- manage queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0,     -- RAC failover if possible
compatible         IN VARCHAR2       DEFAULT NULL,  -- lowest compatible version
non_repudiation    IN BINARY_INTEGER DEFAULT 0,
secure             IN BOOLEAN        DEFAULT FALSE);
*/

exec dbms_aqadm.create_queue_table(
     queue_table => 'rx_queue_table',
     queue_payload_type => 'message_t',
     storage_clause => 'PCTFREE 0 PCTUSED 99',
     sort_list => 'ENQ_TIME',
     multiple_consumers => TRUE,
     comment => 'Pharmacy Queue Table',
     compatible => '11.2',
     secure => FALSE);

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

Single Consumer Queue Table Multiple Consumer Queue Table
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER by 2,1;

OBJECT_NAME               OBJECT_TYPE
------------------------- ----------
AQ$_RX_QUEUE_TABLE_I      INDEX
AQ$_RX_QUEUE_TABLE_T      INDEX

SYS_C0028215              INDEX
SYS_LOB0000093484C00032$$ LOB
AQ$_RX_QUEUE_TABLE_E      QUEUE
RX_PROCESSED_DATA         TABLE
RX_QUEUE_TABLE            TABLE
MESSAGE_T                 TYPE
AQ$RX_QUEUE_TABLE         VIEW
AQ$_RX_QUEUE_TABLE_F      VIEW
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER by 2,1;

OBJECT_NAME               OBJECT_TYPE
------------------------- -------------------
AQ$_RX_QUEUE_TABLE_V      EVALUATION CONTEXT
SYS_C0028218              INDEX
SYS_C0028221              INDEX
SYS_IOT_TOP_93503         INDEX
SYS_IOT_TOP_93505         INDEX
SYS_IOT_TOP_93507         INDEX
SYS_IOT_TOP_93510         INDEX

SYS_LOB0000093494C00032$$ LOB
AQ$_RX_QUEUE_TABLE_E      QUEUE
AQ$_RX_QUEUE_TABLE_N      SEQUENCE
Q$_RX_QUEUE_TABLE_G      TABLE
AQ$_RX_QUEUE_TABLE_H      TABLE
AQ$_RX_QUEUE_TABLE_I      TABLE
AQ$_RX_QUEUE_TABLE_S      TABLE
AQ$_RX_QUEUE_TABLE_T      TABLE

RX_PROCESSED_DATA         TABLE
RX_QUEUE_TABLE            TABLE
SYS_IOT_OVER_93507        TABLE
MESSAGE_T                 TYPE
AQ$RX_QUEUE_TABLE         VIEW
AQ$RX_QUEUE_TABLE_S       VIEW
AQ$_RX_QUEUE_TABLE_F      VIEW

col evaluation_function format a30
col evaluation_context_comment format a30
col table_name format a30
col user_comment format a30

-- examine evaluation context if a multiple consumer queue
SELECT * FROM user_evaluation_contexts;

-- examine evaluation context table if a multiple consumer_queue
SELECT * FROM user_evaluation_context_tables;

-- examine queues
SELECT name, queue_table, user_comment
FROM user_queues;

-- examine sequence
SELECT sequence_name, min_value, max_value,
increment_by, cycle_flag, order_flag, cache_size
FROM user_sequences;

set describe depth 1

-- examine tables
desc RX_QUEUE_TABLE

set describe depth all

desc RX_QUEUE_TABLE

-- look at those tables and views that exist based on your consumer type
desc AQ$RX_QUEUE_TABLE
desc AQ$_RX_QUEUE_TABLE_G
desc AQ$_RX_QUEUE_TABLE_H
desc AQ$_RX_QUEUE_TABLE_I
desc AQ$_RX_QUEUE_TABLE_S
desc AQ$_RX_QUEUE_TABLE_T

-- if a multi-consumer queue
SELECT table_name, iot_name
FROM user_tables;

-- examine views
desc AQ$RX_QUEUE_TABLE
desc AQ$_RX_QUEUE_TABLE_F  --
desc AQ$RX_QUEUE_TABLE_S   -- transformations
desc AQ$RX_QUEUE_TABLE_R   -- rules

Create the rx_queue Queue Using the rx_queue_Table
dbms_aqadm.create_queue (
queue_name          IN VARCHAR2,                            -- queue's name
queue_table         IN VARCHAR2,                            -- previously defined queue table
queue_type          IN BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries         IN NUMBER         DEFAULT NULL,         -- default is 2**31-1
retry_delay         IN NUMBER         DEFAULT 0,            -- in seconds
retention_time      IN NUMBER         DEFAULT 0,
dependency_tracking IN BOOLEAN        DEFAULT FALSE,        -- must be FALSE: the default
comment             IN VARCHAR2       DEFAULT NULL,         --definer's comment
auto_commit         IN BOOLEAN        DEFAULT TRUE);        -- deprecated parameter

exec dbms_aqadm.create_queue(
     queue_name => 'rx_queue',
     queue_table => 'rx_queue_table',
     queue_type => dbms_aqadm.NORMAL_QUEUE,
     max_retries => 3,
     retry_delay => 1,
     comment => 'Prescription Queue');

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type
ORDER BY 1;
-- note QUEUE and RULE SET creation plus new views if you have a multiple consumer queue

SELECT name, queue_table, user_comment
FROM user_queues;

SELECT rule_set_name, rule_set_eval_context_owner, rule_set_eval_context_name
FROM user_rule_sets;

set long 10000
set pagesize 0

SELECT view_name, text
FROM user_views;

set pagesize 25

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
Start Queue
dbms_aqadm.start_queue (
queue_name IN VARCHAR2,
enqueue    IN BOOLEAN DEFAULT TRUE,
dequeue    IN BOOLEAN DEFAULT TRUE);

exec dbms_aqadm.start_queue(queue_name => 'RX_QUEUE');

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
Create Queue Subscribers
dbms_aqadm.grant_queue_privilege (
privilege    IN VARCHAR2,
queue_name   IN VARCHAR2,
grantee      IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE);

desc queue_privileges

col grantee format a12
col grantor format a12
col owner format a20
col name format a20

SELECT grantee, owner, name, grantor, enqueue_privilege, dequeue_privilege
FROM queue_privileges
WHERE name = 'RX_QUEUE';

BEGIN
  dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'gen_pharmacy', FALSE);
  dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'icu_pharmacy', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'gen_pharmacy', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'icu', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'nurses_stn', FALSE);
END;
/

SELECT grantee, owner, name, grantor, enqueue_privilege, dequeue_privilege
FROM queue_privileges
WHERE name = 'RX_QUEUE';

-- create queue subscribers
desc user_queue_subscribers

col queue_name format a12
col address format a10

SELECT queue_name, consumer_name, address, protocol, delivery_mode, queue_to_queue
FROM user_queue_subscribers;

TYPE aq$_agent AS OBJECT (
name    VARCHAR2(30),       -- name of message producer or consumer
address VARCHAR2(1024),     -- Protocol-specific address of the recipient in the form [schema.]queue[@dblink]
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only

dbms_aqadm.add_subscriber(
queue_name     IN VARCHAR2,                                   -- name of queue
subscriber     IN sys.aq$_agent,                              -- name, address and, protocol
rule           IN VARCHAR2 DEFAULT NULL,                      -- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL                       -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE,                      -- TRUE indicates queue-to-queue messaging
delivery_mode  IN PLS_INTEGER DEFAULT dbms_aqadm.persistent); -- BUFFERED, PERSISTENT_OR_BUFFERED, or PERSISTENT

DECLARE
 subsc_t    sys.aq$_agent;
 subsc_addr VARCHAR2(1024) := 'AQADMIN.RX_QUEUE';
BEGIN
  subsc_t := sys.aq$_agent('gen_pharmacy', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t, 'priority > 11');

  subsc_t := sys.aq$_agent('icu_pharmacy', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t, 'priority < 10');

  subsc_t := sys.aq$_agent('icu', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t);

  subsc_t := sys.aq$_agent('nurse_stn', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t);
END;
/

col consumer_name format a13

SELECT queue_name, queue_table, consumer_name, protocol, delivery_mode, queue_to_queue
FROM user_queue_subscribers;

Create Propagation Schedule
dbms_aqadm.schedule_propagation (
queue_name        IN VARCHAR2,                 -- name of queue
destination       IN VARCHAR2 DEFAULT NULL,    -- destination database link
start_time        IN DATE     DEFAULT SYSDATE, -- initial propagation start time
duration          IN NUMBER   DEFAULT NULL,    -- propagation window in seconds
next_time         IN VARCHAR2 DEFAULT NULL,    -- date-time of next window
latency           IN NUMBER   DEFAULT 60,      -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL);   -- target queue name and db link

desc user_queue_schedules

SELECT COUNT(*) FROM user_queue_schedules;

exec dbms_aqadm.schedule_propagation('RX_QUEUE', latency =>0);

col qname format a10
col destination format a11
col start_date format a12
col start_time format a10
col next_time format a10

SELECT qname, destination, start_date, start_time, propagation_window, next_time, latency
FROM user_queue_schedules;

col process_name format a12
col session_id format a10
col instance format 99
col last_run_date format a20
col last_run_time format a13
col current_start_date format a20

SELECT qname, process_name, session_id, instance, last_run_date, last_run_time, current_start_date
FROM user_queue_schedules;

col next_run_date format a20

SELECT qname, current_start_time, next_run_date, next_run_time, total_time, total_number
FROM user_queue_schedules;

SELECT qname, total_bytes, max_number, max_bytes, avg_number, avg_size, avg_time
FROM user_queue_schedules;
 
CREATE TABLE rx_inventory (
pharm_id VARCHAR2(3),
rx_name VARCHAR2(30),
rx_quantity NUMBER(5));

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Aspirin', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Compazine', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Prozac', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Lipitor', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Oxycontin', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'aaa', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'bbb', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'ccc', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Tetracycline', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Thorazine', 10);

COMMIT;

SELECT *
FROM rx_inventory;

GRANT SELECT, UPDATE ON rx_inventory TO gen_pharmacy;
GRANT SELECT, UPDATE ON rx_inventory TO icu_pharmacy;
 
-- create procedure to dequeue messages
TYPE DEQUEUE_OPTIONS_T IS RECORD (
consumer_name  VARCHAR2(30) DEFAULT NULL,
dequeue_mode   BINARY_INTEGER DEFAULT REMOVE,
navigation     BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility     BINARY_INTEGER DEFAULT ON_COMMIT,
wait           BINARY_INTEGER DEFAULT FOREVER,
msgid          RAW(16)        DEFAULT NULL,
correlation    VARCHAR2(128)  DEFAULT NULL,
deq_condition  VARCHAR2(4000) DEFAULT NULL,
signature      aq$_sig_prop   DEFAULT NULL,
transformation VARCHAR2(60)   DEFAULT NULL,
delivery_mode  PLS_INTEGER    DEFAULT PERSISTENT);

TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128)  DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51)   DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent      DEFAULT NULL,
original_msgid  RAW(16)        DEFAULT NULL);

TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;

dbms_aq.dequeue(
queue_name         IN  VARCHAR2,
dequeue_options    IN  dequeue_options_t,
message_properties OUT message_properties_t,
payload            OUT <user_defined_data_type_name>
msgid              OUT RAW);

CREATE OR REPLACE PROCEDURE demo_dequeue(appuser IN VARCHAR2) AUTHID DEFINER IS
 dq_msgid  RAW(16);
 dq_opt    dbms_aq.dequeue_options_t;
 msg_prop  dbms_aq.message_properties_t;
 payload_t message_t;
 q_on_hand rx_inventory.rx_quantity%TYPE;

 no_messages EXCEPTION;
 pragma exception_init(no_messages, -25228);

 pragma autonomous_transaction;
BEGIN
  dq_opt.consumer_name := appuser;
  dq_opt.dequeue_mode := dbms_aq.remove;
  dq_opt.navigation := dbms_aq.first_message;
  dq_opt.visibility := dbms_aq.immediate;
  dq_opt.wait := 10;

  dbms_aq.dequeue('AQADMIN.RX_QUEUE', dq_opt, msg_prop, payload_t, dq_msgid);

  SELECT rx_quantity
  INTO q_on_hand
  FROM rx_inventory
  WHERE rx_name = payload_t.rx;

  IF q_on_hand > 0 THEN
    UPDATE rx_inventory
    SET rx_quantity = rx_quantity - 1
    WHERE rx_name = payload_t.rx;
  ELSE
    NULL; -- you would never do this in the real world ... it is just a placeholder
    -- send back a failure message using the queue as homework for the nurses to dequeue
  END IF;

  INSERT INTO rx_processed_data
  (rx_id, rx_name, source, mprop_priority, mprop_delay,
   mprop_attempts, msg_enqueue_time, mprop_state, processed_by, processed_ts)
  VALUES
  (payload_t.id, payload_t.rx, payload_t.source, msg_prop.priority, msg_prop.delay,
   msg_prop.attempts, msg_prop.enqueue_time, msg_prop.state, APPUSER, SYSTIMESTAMP);
  COMMIT;
EXCEPTION
  WHEN no_messages THEN
    COMMIT;
  WHEN others THEN
    RAISE;
END demo_dequeue;
/

GRANT execute ON demo_dequeue TO gen_pharmacy;
GRANT execute ON demo_dequeue TO icu_pharmacy;

conn gen_pharmacy/gen_pharm

CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

conn icu_pharmacy/icu_pharm

CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

conn aqadmin/aqadmin

Create Procedure to Enqueue Messages
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility         BINARY_INTEGER DEFAULT ON_COMMIT,
relative_msgid     RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation     VARCHAR2(60) DEFAULT NULL);

TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128) DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent DEFAULT NULL,
original_msgid  RAW(16) DEFAULT NULL);

dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  enqueue_options_t,
message_properties IN  message_properties_t,
payload            IN  <user_defined_data_type>,
msgid              OUT RAW);

CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg IN MESSAGE_T, urgency IN NUMBER)
AUTHID DEFINER IS

eq_msgid RAW(16);
eq_opt   dbms_aq.enqueue_options_t;
msg_prop dbms_aq.message_properties_t;
agt_prop sys.aq$_agent;

pragma autonomous_transaction;

BEGIN
  agt_prop := sys.aq$_agent(USER, NULL, 0);
  msg_prop.sender_id := agt_prop;

  IF urgency < 10 THEN
    msg_prop.priority := urgency;
    eq_opt.visibility := dbms_aq.immediate;
    msg_prop.delay := dbms_aq.no_delay;
    msg_prop.expiration := 300;   -- push to exception queue in 5 min.
  ELSE
    msg_prop.priority := urgency;
    eq_opt.sequence_deviation := NULL;
    msg_prop.delay := 1;          -- one second delay before sending
    msg_prop.expiration := 1800;  -- push to exception queue in 30 min.
  END IF;

  dbms_aq.enqueue('aqadmin.rx_queue', eq_opt, msg_prop, usermsg, eq_msgid);
  COMMIT;
END demo_enqueue;
/

 
Enqueues Message Procedure
conn aqadmin/aqadmin

CREATE OR REPLACE PROCEDURE order_rx AUTHID DEFINER IS

TYPE ixbb IS TABLE OF VARCHAR2(30)
INDEX BY BINARY_INTEGER;

TYPE ixbi IS TABLE OF NUMBER(2)
INDEX BY BINARY_INTEGER;

 rxarray  ixbb; -- rx name
 rxurgnt  ixbi; -- rx delivery priority
 usermsg  aqadmin.message_t;
 x        PLS_INTEGER;
 endcnt   PLS_INTEGER;
BEGIN
  rxarray(0):='ccc';
  rxarray(1):='Aspirin';
  rxarray(2):='Prozac';
  rxarray(3):='Lipitor';
  rxarray(4):='Tetracycline';
  rxarray(5):='Thorazine';
  rxarray(6):='Oxycontin';
  rxarray(7):='Compazine';
  rxarray(8):='aaa';
  rxarray(9):='bbb';

  rxurgnt(0) := 1;
  rxurgnt(1) := 3;
  rxurgnt(2) := 5;
  rxurgnt(3) := 11;
  rxurgnt(4) := 19;
  rxurgnt(5) := 20;
  rxurgnt(6) := 32;
  rxurgnt(7) := 89;
  rxurgnt(8) := 42;
  rxurgnt(9) := 66;

  IF USER = 'ICU' THEN
    endcnt := 50;
  ELSIF USER = 'NURSES_STN' THEN
    endcnt := 30;
  END IF;

  FOR i IN 1..endcnt
  LOOP
    x := TO_NUMBER(SUBSTR(dbms_crypto.randominteger,3,1));
    usermsg := aqadmin.message_t(i, USER, rxarray(x));
    demo_enqueue(usermsg, rxurgnt(x));

    IF USER = 'ICU' THEN
      dbms_lock.sleep(3);
    ELSIF USER = 'NURSES_STN' THEN
      dbms_lock.sleep(1);
    END IF;
  END LOOP;
END order_rx;
/

CREATE PUBLIC SYNONYM order_rx FOR aqadmin.order_rx;

GRANT execute ON order_rx TO icu;
GRANT execute ON order_rx TO nurses_stn;
 
Dequeues Message Procedure
conn aqadmin/aqadmin

TYPE dbms_aq.aq$_agent_list_t IS TABLE OF aq$_agent
INDEXED BY BINARY_INTEGER;

dbms_aq.listen(
agent_list IN  dbms_aq.aq$_agent_list_t,
wait       IN  BINARY_INTEGER DEFAULT dbms_aq.forever,
agent      OUT sys.aq$_agent);

CREATE OR REPLACE PROCEDURE get_rx_order AUTHID DEFINER IS
 qlist       dbms_aq.aq$_agent_list_t;
 agent_w_msg sys.aq$_agent;

 listen_timeout EXCEPTION;
 pragma exception_init(listen_timeout, -25254);
BEGIN
  qlist(0) := sys.aq$_agent(USER, 'AQADMIN.RX_QUEUE', NULL);
  /* if retrieving message for multiple users simultaneously example
  qlist(0) := sys.aq$_agent('GenPharm', 'AQADMIN.RX_QUEUE', NULL);
  qlist(1) := sys.aq$_agent('ICUPharm', 'AQADMIN.RX_QUEUE', NULL);
  */


  LOOP
    BEGIN
      dbms_aq.listen(agent_list => qlist, wait => 15, agent => agent_w_msg);
      IF agent_w_msg.name = USER THEN
        demo_dequeue(USER);
      END IF;
    EXCEPTION
      WHEN listen_timeout THEN
        EXIT;
    END;
  END LOOP;
END get_rx_order;
/

CREATE PUBLIC SYNONYM get_rx_order FOR aqadmin.get_rx_order;

GRANT execute ON get_rx_order TO gen_pharmacy;
GRANT execute ON get_rx_order TO icu_pharmacy;
 
To Run Demo
The following requires that you simultaneously open four SQL*Plus sessions. In the first session log on as the ICU and type step 2 but do not execute it. In the second session do the same thing as the Nurses Station, and in the third as the Pharmacy. Again not executing the stored procedure. Then log on as the aqadmin, set up the SQL*Plus environment and execute one of the two queries then enter a slash (to repeat the query) but do not press the <Enter> key.
Step ICU Orders Meds Nurses Station Orders Meds Fill Orders Fill Orders
1 conn icu/icu conn nurses_stn/nurses_stn conn gen_pharmacy/gen_pharm conn icu_pharmacy/icu_pharm
2 exec order_rx exec order_rx exec get_rx_order exec get_rx_order
conn aqadmin/aqadmin

SELECT * from rx_inventory

SELECT COUNT(*) FROM rx_queue_table;

col source format a10
col processed_by format a12
col processed_ts format a30

SELECT rx_id, rx_name, source, mprop_priority, mprop_delay, mprop_attempts
FROM rx_processed_data
ORDER BY rx_id;

SELECT rx_id, rx_name, source, mprop_priority, mprop_delay, mprop_attempts
FROM rx_processed_data
ORDER BY processed_ts;

SELECT rx_id, msg_enqueue_time, mprop_state, processed_by, processed_ts
FROM rx_processed_data
ORDER BY processed_ts;

SELECT rx_id, rx_name, source, processed_by
FROM rx_processed_data
ORDER BY processed_ts;

Then, with everything set up ... go to the first window and hit the <Enter> key, then the second, then the third and finally the same in the AQADMIN session. Continue to monitor the AQADMIN session while the ICU and Nurses order medications and the pharmacies fill them.
 
Cleanup As AQADMIN
conn aqadmin/aqadmin

-- there is a simpler form of this below
CREATE OR REPLACE PROCEDURE purgeQtable(qtable IN VARCHAR2) AUTHID CURRENT_USER AS
 po_t dbms_aqadm.aq$_purge_options_t;
 qname VARCHAR2(30);

 CURSOR qcur IS
 SELECT name
 FROM user_queues
 WHERE queue_table = UPPER(qtable);
BEGIN
  po_t.block := FALSE;
  dbms_aqadm.purge_queue_table(USER || '.' || qtable, NULL, po_t);
  execute immediate 'ALTER TABLE ' || qtable || ' ENABLE ROW MOVEMENT';
  execute immediate 'ALTER TABLE ' || qtable || ' SHRINK SPACE CASCADE';
  execute immediate 'ALTER TABLE ' || qtable || ' DISABLE ROW MOVEMENT';
  FOR qrec IN qcur LOOP
    qname := qrec.name;
    IF INSTR(qname, '$') > 0 THEN
      dbms_aqadm.start_queue(qname, enqueue=>FALSE);
    ELSE
      dbms_aqadm.start_queue(qname);
    END IF;
  END LOOP;
  dbms_utility.compile_schema(USER,compile_all=>FALSE);
EXCEPTION
  WHEN OTHERS THEN
      dbms_output.put_line('PurgeQTable: Error Starting Queue: '||qname||': '||SQLERRM);
END purgeQtable;
/

set serveroutput on
exec purgeQtable('RX_QUEUE_TABLE');

dbms_aqadm.stop_queue (
queue_name IN VARCHAR2,
enqueue    IN BOOLEAN DEFAULT TRUE,
dequeue    IN BOOLEAN DEFAULT TRUE,
wait       IN BOOLEAN DEFAULT TRUE);

dbms_aqadm.purge_queue_table(
queue_table     IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options   IN aq$_purge_options_t);

dbms_aqadm.drop_queue (
queue_name  IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE);

dbms_aqadm.drop_queue_table (
queue_table IN VARCHAR2,
force       IN BOOLEAN DEFAULT FALSE,
auto_commit IN BOOLEAN DEFAULT TRUE);

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

-- stop Queue rx_queue
exec dbms_aqadm.stop_queue(queue_name => 'rx_queue', wait => TRUE);

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

SELECT COUNT(*)
FROM rx_queue_table;

-- purge the queue table rx_queue_table
DECLARE
 po_t dbms_aqadm.aq$_purge_options_t;
BEGIN
  po_t.block := FALSE;
  dbms_aqadm.purge_queue_table('RX_QUEUE_TABLE', NULL, po_t);
END;
/

SELECT COUNT(*)
FROM rx_queue_table;

SELECT name, queue_table
FROM user_queues;

-- drop Queue rx_queue
exec dbms_aqadm.drop_queue('RX_QUEUE');

SELECT name, queue_table
FROM user_queues;

-- drop Queue Table
exec dbms_aqadm.drop_queue_table('rx_queue_table', TRUE);

SELECT name, queue_table
FROM user_queues;

-- verify queue infrastructure is dropped
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

conn / as sysdba

drop user gen_pharmacy cascade;
drop user icu_pharmacy cascade;
drop user icu cascade;
drop user nurses_stn cascade;
drop user aqadmin cascade;
 
Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: © 2012 Daniel A. Morgan All Rights Reserved