Oracle AQ Demo 2
Version 11.1

 
 
Setup As SYS
conn / as sysdba

CREATE USER aquser
IDENTIFIED BY aquser
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 10M ON uwdata;

GRANT create session TO aquser;
GRANT create procedure TO aquser;
GRANT create session TO aquser;
GRANT create table TO aquser;
GRANT create type TO aquser;
GRANT aq_administrator_role TO aquser;

GRANT EXECUTE ON dbms_aq TO aquser;
GRANT EXECUTE ON dbms_aqadm TO aquser;
 
Setup As AQUSER
CONN aquser/aquser

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

CREATE SEQUENCE message_seq INCREMENT BY 1 START WITH 1000;

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

CREATE TYPE message_type AS OBJECT (
message_id NUMBER(15),
subject VARCHAR2(100),
text VARCHAR2(100),
dollar_value NUMBER(4,2));
/

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

BEGIN
/*
** +----------------------------------------------------------------------------+
** | CREATE QUEUE TABLE |
** | -------------------------------------------------------------------------- |
** | -> (QUEUE_TABLE) - Name of the queue table to create. |
** | -> (QUEUE_PAYLOAD_TYPE) - Name of either the object type or RAW. |
** | |
** | NOTE: The "create_queue_table" procedure will also create the following |
** | items: |
** | |
** | AQ$QUEUE_TABLE_NAME - A read-only view for information on the queue |
** | table. |
** | AQ$QUEUE_TABLE_NAME_E - A default exception queue. |
** | AQ$QUEUE_TABLE_NAME_T - An index for time manager operations. |
** | AQ$QUEUE_TABLE_NAME_I - An index or index-organized table to handle |
** | dequeueing on queues with multiple consumers. |
** | |
** | ADDITIONAL NOTES: |
** | - Payload type can be either RAW or as a custom object type. |
** | - Maximum payload size is 32 KB. |
** | - When a user-defined object type is being used as a payload, the |
** | maximum number of attributes allowed for the object type is 900. |
** | - Messages must be in READY state to be dequeued unless a MSGID value is |
** | specified. Messages can be sorted for dequeue based on msgid or |
** | correlation values. |
** | |
** +----------------------------------------------------------------------------+
*/

  dbms_aqadm.create_queue_table (
  queue_table => 'aquser.msg_qt',
  queue_payload_type => 'aquser.message_type');

/*
** +----------------------------------------------------------------------------+
** | CREATE QUEUE |
** | -------------------------------------------------------------------------- |
** | QUEUE_NAME  - Name of the queue to create and place in the queue table (below).
** | QUEUE_TABLE - Name of the queue table to store queue the (above) named queue in.
** | QUEUE_TYPE  - Type of queue to create. The types of queues are NORMAL and EXCEPTION
** | MAX_RETRIES - Used to limit the number of times a dequeue with the REMOVE mode can be attempted on the message.
** |               The default is 0 which allows no retries. When the maximum number of retries is reached, the
** |               message is moved to the exception queue. The value is incremented when the application issues a
** |               rollback after executing the dequeue.
** | RETRY_DELAY - Specifies the delay time, in seconds, before the message is scheduled for processing again after an
** |               application rollback. The default value is 0, which allows a message to be retried as soon as
** |               possible. If MAX_RETRIES is set to 0, the RETRY_DELAY argument will have no effect.
** | RETENTION_TIME      -
** | DEPENDENCY_TRACKING -
** | COMMENT             - Assign a comment to the queue in the data dictionary
** | AUTO_COMMIT - If the AUTO_COMMIT argument is set to 'TRUE', the current transaction, if any, will be committed
** |               before the operation is carried out. This is the default action. If you set the AUTO_COMMIT
** |               argument to 'FALSE', the operation will be part of the current transaction and will become
** |               persistent only when the user issues a COMMIT.
** +----------------------------------------------------------------------------+
*/

  dbms_aqadm.create_queue(
  queue_name => 'msg_queue',
  queue_table => 'aquser.msg_qt',
  queue_type => DBMS_AQADM.NORMAL_QUEUE,
  max_retries => 0,
  retry_delay => 0,
  retention_time => 1209600,
  dependency_tracking => FALSE,
  comment => 'Test Object Type Queue',
  auto_commit => FALSE);

  -- start queue
  dbms_aqadm.start_queue('msg_queue');
END;
/

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
 
Dequeue User
CONN aquser/aquser

set serveroutput on

DECLARE

dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aquser.message_type;

BEGIN
/*
** +----------------------------------------------------------------------------------+
** | DEQUEUE OPTIONS                                                                  |
** | -------------------------------------------------------------------------------- |
** | -> (CONSUMER_NAME) - Name of the application, process or user receiving the      |
** | message. Should be NULL for queues not set up to handle                          |
** | more than one consumer.                                                          |
** | -> (DEQUEUE_MODE) - Specifies locks, if any, to be acquired on the message by    |
** | the dequeue() process. Can be:                                                   |
** |                                                                                  |
** | BROWSE : For "read-only" access similar to that used in "select" statements.     |
** |                                                                                  |
** | LOCKED : For the ability to write to the message during the transaction, similar |
** | similar to a share lock  acquired in a "select for update" statement.            |
** |                                                                                  |
** | REMOVE : For the ability to read the message, updating it or deleting it. The    |
** | message is retained according to properties set in queue table creation          |
** |                                                                                  |
** | -> (NAVIGATION) - Determines the position of the message to be retrieved the,    |
** | first step in retrieving messages. The second step is applying search criteria.  |
** | The navigation variable can have one of three values:                            |
** |                                                                                  |
** | NEXT_MESSAGE : Is used for retrieving the next message available that matches    |
** | the search criteria.                                                             |
** |                                                                                  |
** | NEXT_TRANSACTION : Is used to skip remaining messages in the current transaction |
** | group and retrieve the first message of the next transaction group.              |
** |                                                                                  |
** | FIRST_MESSAGE : Used to retrieve the first message that fits the search          |
** | criteria, resetting the position to the beginning of queue.                      |
** |                                                                                  |
** | -> (VISIBILITY) - Defines visibility of the message within the transaction of    |
** | the application dequeueing it. Values are:                                       |
** |                                                                                  |
** | ON_COMMIT : If the message dequeued is part of the current transaction.          |
** | IMMEDIATE : If the message is its own transaction.                               |
** |                                                                                  |
** | -> (WAIT) - Specifies how long to wait if an attempt is made to                  |
** | enqueue() a message and there is no message to retrieve. Values are:             |
** |                                                                                  |
** | FOREVER : Wait forever.                                                          |
** | NO_WAIT : Do not wait for any message.                                           |
** | num : Number that represents the number of seconds it will wait.                 |
** |                                                                                  |
** | -> (MSGID) - The message identifier for the message to be dequeued. If           |
** | specified, the message will be dequeued even if expired.                         |
** | -> (CORRELATION) - The name of the message to be dequeued.                       |
** |                                                                                  |
** +----------------------------------------------------------------------------------+
*/

dequeue_options.CONSUMER_NAME := NULL;
dequeue_options.DEQUEUE_MODE := DBMS_AQ.REMOVE;
dequeue_options.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;
dequeue_options.VISIBILITY := DBMS_AQ.IMMEDIATE;
dequeue_options.WAIT := DBMS_AQ.FOREVER;
dequeue_options.MSGID := null;
dequeue_options.CORRELATION := 'Jeff, Melody and Alex Message';

dbms_output.put_line('+-----------------+');
dbms_output.put_line('| DEQUEUE OPTIONS |');
dbms_output.put_line('+-----------------+');
dbms_output.put_line(' -> CONSUMER_NAME := ' || NVL(dequeue_options.CONSUMER_NAME, -999));
dbms_output.put_line(' -> DEQUEUE_MODE := ' || NVL(dequeue_options.DEQUEUE_MODE, -999));
dbms_output.put_line(' -> NAVIGATION := ' || NVL(dequeue_options.NAVIGATION, -999));
dbms_output.put_line(' -> VISIBILITY := ' || NVL(dequeue_options.VISIBILITY, -999));
dbms_output.put_line(' -> WAIT := ' || NVL(dequeue_options.WAIT, -999));
dbms_output.put_line(' -> MSGID := ' || NVL(dequeue_options.MSGID, '<null>'));
dbms_output.put_line(' -> CORRELATION := ' || NVL(dequeue_options.CORRELATION, '<null>'));

/*
** +------------------------------------+
** | PRINT MESSAGE PROPERTIES |
** +------------------------------------+
*/

dbms_output.put_line('+--------------------+');
dbms_output.put_line('| MESSAGE PROPERTIES |');
dbms_output.put_line('+--------------------+');
dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999));
dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999));
dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999));
dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION, -999));
dbms_output.put_line(' -> ATTEMPTS := ' || NVL(message_properties.ATTEMPTS, -999));
dbms_output.put_line(' -> EXCEPTION_QUEUE := ' || NVL(message_properties.EXCEPTION_QUEUE, -999));
dbms_output.put_line(' -> ENQUEUE_TIME := ' || message_properties.ENQUEUE_TIME);
dbms_output.put_line(' -> STATE := ' || NVL(message_properties.STATE, -999));

/*
** +------------------------------------+
** | DEQUEUE THE MESSAGE |
** +------------------------------------+
*/

dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options,
message_properties => message_properties, payload => message, msgid => message_handle);

/*
** +------------------------------------+
** | PRINT THE DEQUEUED MESSAGE PAYLOAD |
** +------------------------------------+
*/

dbms_output.put_line('+-----------------+');
dbms_output.put_line('| MESSAGE PAYLOAD |');
dbms_output.put_line('+-----------------+');
dbms_output.put_line('- Message ID := ' || message.message_id);
dbms_output.put_line('- Subject := ' || message.subject);
dbms_output.put_line('- Message := ' || message.text);
dbms_output.put_line('- Dollar Value := ' || message.dollar_value);

COMMIT;

END;
/
 
Enqueue User
CONN aquser/aquser

set serveroutput on

DECLARE

enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
-- subscriber dbms_aq.aq$_recipient_list_t;
message_handle RAW(16);
message aquser.message_type;
message_id NUMBER;

BEGIN

/*
** +----------------------------------------------------------------------------+
** | GET NEXT MESSAGE ID |
** +----------------------------------------------------------------------------+
*/

SELECT message_seq.nextval
INTO message_id
FROM dual;

/*
** +----------------------------------------------------------------------------+
** | ASSIGN MESSAGE |
** +----------------------------------------------------------------------------+
*/

message := message_type(message_id, 'EXAMPLE MESSAGE', 'This is a sample message.', 10.2);

/*
** +----------------------------------------------------------------------------------+
** | ENQUEUE OPTIONS |
** | -------------------------------------------------------------------------------- |
** | -> (VISIBILITY) - Defines transactional behaviour of the queued |
** | request. Can be set to: |
** | |
** | ON_COMMIT : The enqueued message is part of the |
** | current transaction and that the |
** | operation will be complete when the |
** | transaction commits. The default |
** | value is "ON_COMMIT". |
** | IMMEDIATE : the enqueued message is its own |
** | transaction, not part of the current |
** | transaction. |
** | |
** | -> (RELATIVE_MSGID) - Only relevant when "BEFORE" is used in |
** | "sequence_deviation" (below). This variable |
** | defines the message identifier referenced in |
** | "sequence_deviation". |
** | |
** | -> (SEQUENCE_DEVIATION) - Identifies whether the message enqueued should be |
** | dequeued before other messages in the queue. Values |
** | permitted: |
** | |
** | BEFORE : This message should be dequeued |
** | before the message defined by |
** | "RELATIVE_MSGID" (above). |
** | TOP : This message is dequeued before any |
** | other messages. |
** | null : Says this message is dequeued in |
** | regular order. NULL is the default |
** | value. |
** | |
** +----------------------------------------------------------------------------------+
*/

enqueue_options.VISIBILITY := DBMS_AQ.ON_COMMIT;
-- enqueue_options.RELATIVE_MSGID := '02AB9AD2F4859C5';
enqueue_options.SEQUENCE_DEVIATION := null;

dbms_output.put_line('+-----------------+');
dbms_output.put_line('| ENQUEUE OPTIONS |');
dbms_output.put_line('+-----------------+');

dbms_output.put_line(' -> VISIBILITY := ' || NVL(enqueue_options.VISIBILITY, -999));
--dbms_output.put_line(' -> RELATIVE_MSGID := ' || NVL(enqueue_options.RELATIVE_MSGID,'<null>'));
dbms_output.put_line(' -> SEQUENCE_DEVIATION:=' || NVL(enqueue_options.SEQUENCE_DEVIATION, -999));

/*
** +----------------------------------------------------------------------------------+
** | MESSAGE PROPERTIES |
** | -------------------------------------------------------------------------------- |
** | -> (PRIORITY) - Specifies the prioriy of the message numerically. Both |
** | negatives and positives are allowed; the lower the |
** | number, the higher the priority. |
** | |
** | -> (DELAY) - Identifies a delay, in seconds, during which time the |
** | message may not be dequeued. Alternately, "NO_DELAY" |
** | may be specified for this variable. It relies on the |
** | setting of the time manager. |
** | |
** | -> (EXPIRATION) - Defines how long the message is available for |
** | dequeueing, in seconds, after which time the message |
** | expires. Alternately, "NEVER" may be specified for |
** | this variable. |
** | |
** | -> (CORRELATION) - Identifies the message with a name. |
** | |
** | -> (ATTEMPTS) - Number of times other consumers attempted to |
** | dequeue() the message. This is not set at time of |
** | enqueue(). |
** | |
** | -> (RECIPIENT_LIST) - Can be used only for queues allowing multiple consumers. |
** | Default recipients are the subscribers to the queue. |
** | Values for this variable cannot be returned in a |
** | dequeue(). The recipient list can be defined with |
** | variable, of type SYS.AQ$_AGENT, which takes three |
** | variables: name, address and protocol, of datatypes |
** | VARCHAR2, VARCHAR2 and NUMBER, respectively. |
** | |
** | -> (EXCEPTION_QUEUE) - Messages moved to the exception queue after value for |
** | "expire" has passed, or if "attempts" exceeded the |
** | maximum number of attempts allowed for the queue. |
** | |
** | -> (ENQUEUE_TIME) - Set internally by the system as the time "enqueue()" |
** | deposited the message. |
** | |
** | -> (STATE) - The current state of the message. This has four |
** | possible values: |
** | |
** | WAITING - If the message is still in delay. |
** | READY - If the message can be obtained via |
** | dequeue(). |
** | PROCESSED - If the message is processed and retained. |
** | EXPIRED - If the message moved to the location |
** | defined by exception queue. |
** | |
** +----------------------------------------------------------------------------------+
*/
message_properties.PRIORITY := -5;
message_properties.DELAY := DBMS_AQ.NO_DELAY;
message_properties.EXPIRATION := DBMS_AQ.NEVER;
message_properties.CORRELATION := 'Jeff, Melody and Alex Message';
-- message_properties.ATTEMPTS := (Not set at time of enqueue);
-- subscriber(1) := SYS.AQ$_AGENT('JEFF', null, null);
-- subscriber(2) := SYS.AQ$_AGENT('MELODY', null, null);
-- subscriber(3) := SYS.AQ$_AGENT('ALEX', null, null);
-- message_properties.RECIPIENT_LIST := subscriber;
-- message_properties.EXCEPTION_QUEUE := 'AQ$MSG_QT_E';
-- message_properties.ENQUEUE_TIME := (Not set by user);
-- message_properties.STATE := (Not set by user);

dbms_output.put_line('+--------------------+');
dbms_output.put_line('| MESSAGE PROPERTIES |');
dbms_output.put_line('+--------------------+');
dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999));
dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999));
dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999));
dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION, -999));
dbms_output.put_line(' -> ATTEMPTS := ' || '(Not set at time of enqueue)' );
dbms_output.put_line(' -> EXCEPTION_QUEUE := ' || NVL(message_properties.EXCEPTION_QUEUE, -999));
dbms_output.put_line(' -> ENQUEUE_TIME := ' || '(Not set by user)' );
dbms_output.put_line(' -> STATE := ' || '(Not set by user)' );

/*
** +------------------------------------+
** | ENQUEUE THE MESSAGE |
** +------------------------------------+
*/

  dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options,
  message_properties => message_properties, payload => message, msgid => message_handle);

  COMMIT;
END;
/
 
Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: © 2012 Daniel A. Morgan All Rights Reserved