Oracle DBMS_AQ
Version 11.2.0.3
 
Constants
Parameter Values
delay NO_DELAY
dequeue mode BROWSE, LOCKED, REMOVE, REMOVE_NODATA
expiration NEVER
namespace NAMESPACE_AQ, NAMESPACE_ANONYMOUS
navigation FIRST_MESSAGE, NEXT_MESSAGE, NEXT_TRANSACTION
ntfn_grouping_class NTFN_GROUPING_CLASS_TIME
ntfn_grouping_repeat_count NTFN_GROUPING_FOREVER
ntfn_grouping_type NTFN_GROUPING_TYPE_LAST, NTFN_GROUPING_TYPE_SUMMARY
sequence_deviation BEFORE, TOP
state READY = 0, WAIT = 1, PROCESSED = 2, EXPIRED = 3, DEFERRED = 8, BUFFERED_EXPIRED = 10
UNDELVERABLE = NULL, any other value = PROCESSED
visibility IMMEDIATE, ON_COMMIT
wait FOREVER, NO_WAIT
Data Types
aq$_sig_prop
CREATE OR REPLACE TYPE sys.aq$_sig_prop AS OBJECT (
signature   RAW(2000),
canalgo     VARCHAR2(2000),
digalgo     VARCHAR2(2000),
sigalgo     VARCHAR2(2000),
certificate VARCHAR2(2000),
digval      RAW(2000));
/
dequeue_options_t
TYPE dequeue_options_t IS RECORD (
consumer_name  VARCHAR2(30)   DEFAULT NULL,
dequeue_mode   BINARY_INTEGER DEFAULT REMOVE,
navigation     BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility     BINARY_INTEGER DEFAULT ON_COMMIT,
wait           BINARY_INTEGER DEFAULT FOREVER,
msgid          RAW(16)        DEFAULT NULL,
correlation    VARCHAR2(128)  DEFAULT NULL,
deq_condition  VARCHAR2(4000) DEFAULT NULL,
signature      aq$_sig_prop   DEFAULT NULL,
transformation VARCHAR2(60)   DEFAULT NULL,
delivery_mode  PLS_INTEGER    DEFAULT PERSISTENT);
/
enqueue_options_t
TYPE enqueue_options_t IS RECORD (
visibility         BINARY_INTEGER DEFAULT ON_COMMIT,
relative_msgid     RAW(16)        DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation     VARCHAR2(60)   DEFAULT NULL);
/
message_properties_t
TYPE message_properties_t IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128)  DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51)   DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent      DEFAULT NULL,
original_msgid  RAW(16)        DEFAULT NULL);
/
recipient_list_t
TYPE sys.aq$_recipient_list_t IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
/
Initialization Parameters aq_tm_processes=1 (... max is 10)
job_queue_processes=2
 
DEQUEUE_ARRAY
Dequeues an array of messages from the specified queue

Overload 1
dbms_aq.dequeue_array(
queue_name               IN  VARCHAR2,
dequeue_options          IN  dbms_aq.dequeue_options_t,
array_size               IN  BINARY_INTEGER,
message_properties_array OUT dbms_aq.message_properties_array_t,
payload_array            OUT standard.<COLLECTION_1>,
msgid_array              OUT dbms_aq.msgid_array_t)
RETURN PLS_INTEGER;
See ENQUEUE_ARRAY demo
Overload 2 dbms_aq.dequeue_array(
queue_name               IN  VARCHAR2,
dequeue_options          IN  dbms_aq.dequeue_options_t,
array_size               IN  BINARY_INTEGER,
message_properties_array OUT dbms_aq.message_properties_array_t,
payload_array            OUT standard.<COLLECTION_1>,
msgid_array              OUT dbms_aq.msgid_array_t,
error_array              OUT dbms_aq.error_array_t)
RETURN PLS_INTEGER;
TBD
 
ENQUEUE
Adds a message to the specified queue dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  dbms_aq.enqueue_options_t,
message_properties IN  dbms_aq.message_properties_t,
payload            IN  standard.<OPAQUE_1>,
msgid              OUT RAW);
TBD
Overload 2 dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  dbms_aq.enqueue_options_t,
message_properties IN  dbms_aq.message_properties_t,
payload            IN  standard.<ADT_1>,
msgid              OUT RAW);
See AQ Demo Page Linked At Page Bottom
Overload 3 dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  dbms_aq.enqueue_options_t,
message_properties IN  dbms_aq.message_properties_t,
payload            IN  RAW,
msgid              OUT RAW);
See AQ Demo 1
 
ENQUEUE_ARRAY
Adds an array of messages to the specified queue

Overload 1
dbms_aq.enqueue_array(
queue_name               IN  VARCHAR2,
enqueue_options          IN  dbms_aq.enqueue_options_t,
array_size               IN  PLS_INTEGER,
message_properties_array IN  dbms_aq.message_properties_array_t,
payload_array            IN  standard.<COLLECTION_1>,
msgid_array              OUT dbms_aq.msgid_array_t)
RETURN PLS_INTEGER;
set serveroutput on

CREATE OR REPLACE TYPE message as OBJECT (
data VARCHAR2(10));
/

CREATE OR REPLACE TYPE message_tbl AS TABLE OF message;
/

DECLARE
 app1 sys.aq$_agent;
BEGIN
  dbms_aqadm.create_queue_table(queue_table => 'uw_q_tab',
  multiple_consumers => TRUE, queue_payload_type => 'message',
  compatible => '9.2.0.0.0');

  dbms_aqadm.create_queue('uw_q', 'uw_q_tab');

  dbms_aqadm.start_queue(queue_name => 'uw_q', dequeue => TRUE,
  enqueue => TRUE);

  app1 := sys.aq$_agent('sub1', NULL, NULL);
  dbms_aqadm.add_subscriber('uw_q',app1);
END;
/

DECLARE
 enqopt     dbms_aq.enqueue_options_t;
 msgproparr dbms_aq.message_properties_array_t;
 msgprop    dbms_aq.message_properties_t;
 payloadarr message_tbl;
 msgidarr   dbms_aq.msgid_array_t;
 retval     PLS_INTEGER;
BEGIN
  payloadarr := message_tbl(
  message('00000'),
  message('11111'),
  message('22222'),
  message('33333'),
  message('44444'),
  message('55555'),
  message('66666'),
  message('77777'),
  message('88888'),
  message('99999'));

  msgproparr := dbms_aq.message_properties_array_t(msgprop,
  msgprop, msgprop, msgprop, msgprop, msgprop, msgprop, msgprop,
  msgprop, msgprop);

  retval := dbms_aq.enqueue_array('UWCLASS.UW_Q', enqopt,
  10, msgproparr, payloadarr, msgidarr);

  commit;

  dbms_output.put_line('Enqueued ' || retval || ' messages');

  FOR i IN 1..retval
  LOOP
    dbms_output.put_line('Message ' || i || ' payload: ' || payloadarr(i).data);
  END LOOP;
END;
/

CREATE OR REPLACE TYPE message_arr AS VARRAY(2000) OF message;
/

DECLARE
 deqopt dbms_aq.dequeue_options_t;

 msgproparr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();

 payloadarr message_arr := message_arr();
 msgidarr dbms_aq.msgid_array_t;
 retval PLS_INTEGER;
BEGIN
  payloadarr.extend(10);
  msgproparr.extend(10);
  deqopt.consumer_name := 'SUB1';

  retval := dbms_aq.dequeue_array( queue_name => 'UWCLASS.UW_Q',
  dequeue_options => deqopt, array_size => payloadarr.count,
  message_properties_array => msgproparr, payload_array =>
  payloadarr, msgid_array => msgidarr);

  commit;

  dbms_output.put_line('Dequeued ' || retval || ' messages');

  FOR i IN 1..retval
  LOOP
    dbms_output.put_line ('Message ' || i || ' payload: ' || payloadarr(i).data);
  END LOOP;
END;
/

DECLARE
 app1 sys.aq$_agent;
BEGIN
  app1 := sys.aq$_agent('sub1', NULL, NULL);
  dbms_aqadm.remove_subscriber('uw_q',app1);

  dbms_aqadm.stop_queue( queue_name => 'uw_q');

  dbms_aqadm.drop_queue_table('uw_q_tab', TRUE);

  EXECUTE IMMEDIATE 'DROP TYPE message_tbl';
  EXECUTE IMMEDIATE 'DROP TYPE message';
END;
/
Overload 2 dbms_aq.enqueue_array(
queue_name               IN  VARCHAR2,
enqueue_options          IN  enqueue_options_t,
array_size               IN  PLS_INTEGER,
message_properties_array IN  dbms_aq.message_properties_array_t,
payload_array            IN  standard.<COLLECTION_1>,
msgid_array              OUT dbms_aq.msgid_array_t,
error_array              OUT dbms_aq.error_array_t)
RETURN PLS_INTEGER;
TBD
 
LISTEN
Listens on one or more queues on behalf of a list of agents

Overload 1
dbms_aq.listen(
agent_list IN  dbms_aq.aq$_agent_list_t,
wait       IN  BINARY_INTEGER DEFAULT dbms_aq.forever,
agent      OUT sys.aq$_agent);
See AQ Demo 1
Overload 2 dbms_aq.listen(
agent_list            IN  dbms_aq.aq$_agent_list_t,
wait                  IN  BINARY_INTEGER DEFAULT dbms_aq.forever,
listen_delivery_mode  IN  PLS_INTEGER DEFAULT dbms_aq.persistent,
agent                 OUT sys.aq$_agent,
message_delivery_mode OUT PLS_INTEGER);
See AQ Demo 1
 
POST
Posts to a list of anonymous subscriptions that allows all clients registered for the subscriptions to get notifications dbms_aq.post(post_list IN sys.aq$_post_info_list, post_count IN NUMBER);
TBD
 
REGISTER
Registers an email address, user-defined PL/SQL procedure, or HTTP URL for message notification dbms_aq.register(reg_list IN sys.aq$_reg_info_list, reg_count IN NUMBER);
DECLARE
 reginfo1 sys.aq$_reg_info;
 reginfo2 sys.aq$_reg_info;
 reginfo3 sys.aq$_reg_info;
 reginfo4 sys.aq$_reg_info;
 reginfo5 sys.aq$_reg_info;
 reginfo6 sys.aq$_reg_info;
 reginfolist sys.aq$_reg_info_list;
BEGIN
  -- register for p raw q default pres
  reginfo1 := sys.aq$_reg_info('PUBSUB1.EVENTS:ADMIN', 1,
  'plsql://plsqlregproc1',HEXTORAW('FF'));

  -- register for p raw q xml pres
  reginfo2 := sys.aq$_reg_info('PUBSUB1.EVENTS:ADMIN', 1,
  'plsql://plsqlregproc1?PR=1',HEXTORAW('FF'));

  -- register for p ADT q default pres
  reginfo3 := sys.aq$_reg_info('PUBSUB1.ADTEVENTS:ADMIN', 1,
  'plsql://plsqlregproc2',HEXTORAW('FF'));

  -- register for p ADT q xml pres
  reginfo4 := sys.aq$_reg_info('PUBSUB1.ADTEVENTS:ADMIN', 1,
  'plsql://plsqlregproc2?PR=1',HEXTORAW('FF'));

  -- for np q raw and adt can be enqueued into the same queue
 
-- register for np raw and ADT q default pres
  reginfo5 := sys.aq$_reg_info('PUBSUB1.NONPEREVENTS:ADMIN', 1,
  'plsql://plsqlregproc1',HEXTORAW('FF'));

  -- register for np raw and ADT q xml pres
  reginfo6 := sys.aq$_reg_info('PUBSUB1.NONPEREVENTS:ADMIN', 1,
  'plsql://plsqlregproc2?PR=1',HEXTORAW('FF'));

  reginfolist := sys.aq$_reg_info_list(reginfo1);
  reginfolist.EXTEND;
  reginfolist(2) := reginfo2;
  reginfolist.EXTEND;
  reginfolist(3) := reginfo3;
  reginfolist.EXTEND;
  reginfolist(4) := reginfo4;
  reginfolist.EXTEND;
  reginfolist(5) := reginfo5;
  reginfolist.EXTEND;
  reginfolist(6) := reginfo6;

  dbms_aq.register(reginfolist, 6);

  commit;
END;
/
 
UNBIND_AGENT
Removes the entry for an AQ agent from the LDAP server dbms_aq.unbind_agent(agent IN sys.aq$_agent);
TBD
 
UNREGISTER
Unregisters a subscription which turns off notifications dbms_aq.unregister(reg_list IN sys.aq$_reg_info_list, reg_count IN NUMBER);
DECLARE
 reginfo1 sys.aq$_reg_info;
 reginfo2 sys.aq$_reg_info;
 reginfo3 sys.aq$_reg_info;
 reginfo4 sys.aq$_reg_info;
 reginfo5 sys.aq$_reg_info;
 reginfo6 sys.aq$_reg_info;
 reginfolist sys.aq$_reg_info_list;
BEGIN
  -- register for p raw q default pres
  reginfo1 := sys.aq$_reg_info('PUBSUB1.EVENTS:ADMIN', 1,
  'plsql://plsqlregproc1',HEXTORAW('FF'));

  -- register for p raw q xml pres
  reginfo2 := sys.aq$_reg_info('PUBSUB1.EVENTS:ADMIN', 1,
  'plsql://plsqlregproc1?PR=1',HEXTORAW('FF'));

  -- register for p ADT q default pres
  reginfo3 := sys.aq$_reg_info('PUBSUB1.ADTEVENTS:ADMIN', 1,
  'plsql://plsqlregproc2',HEXTORAW('FF'));

  -- register for p ADT q xml pres
  reginfo4 := sys.aq$_reg_info('PUBSUB1.ADTEVENTS:ADMIN', 1,
  'plsql://plsqlregproc2?PR=1',HEXTORAW('FF'));

  -- for np q raw and ADT can be enqueued into the same queue
  -- register for np raw and adt q default pres

  reginfo5 := sys.aq$_reg_info('PUBSUB1.NONPEREVENTS:ADMIN', 1,
  'plsql://plsqlregproc1',HEXTORAW('FF'));

  -- register for np raw and ADT q xml pres
  reginfo6 := sys.aq$_reg_info('PUBSUB1.NONPEREVENTS:ADMIN', 1,
  'plsql://plsqlregproc2?PR=1',HEXTORAW('FF'));

  reginfolist := sys.aq$_reg_info_list(reginfo1);
  reginfolist.EXTEND;
  reginfolist(2) := reginfo2;
  reginfolist.EXTEND;
  reginfolist(3) := reginfo3;
  reginfolist.EXTEND;
  reginfolist(4) := reginfo4;
  reginfolist.EXTEND;
  reginfolist(5) := reginfo5;
  reginfolist.EXTEND;
  reginfolist(6) := reginfo6;

  dbms_aq.unregister(reginfolist, 6);

  COMMIT;
END;
/
 
 
 
Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2013 Daniel A. Morgan All Rights Reserved