Tuesday, September 20, 2011

Automatic deqeuing with multi level XML (Advanced Queing)

Using Advanced Queing is a good idea when you have some integration or other stuff you want to execute asynchronous. With Advanced Queing it's also possible to automte the deque process using a subscriber. This means, as soon as you put anything on the que it will be dequed as soon as possible.

The below example will illustrate that. The code will also show how to read a muli level XML using PL/SQL.

Create que table (also stops the que and drops the que table if it's existing).
BEGIN
  BEGIN
    DBMS_AQADM.STOP_QUEUE(queue_name => 'XXDEMO_INV_QUE');
  EXCEPTION
    WHEN others THEN
      NULL;
  END;

  BEGIN
    DBMS_AQADM.DROP_QUEUE(queue_name => 'XXDEMO_INV_QUE');
  EXCEPTION
    WHEN others THEN
      NULL;
  END;

  BEGIN
    DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'XXDEMO_INV_AQTAB');
  EXCEPTION
    WHEN others THEN
      NULL;
  END;

  DBMS_AQADM.CREATE_QUEUE_TABLE (
    queue_table        => 'XXDEMO_INV_AQTAB',
    queue_payload_type => 'SYS.XMLtype',
    multiple_consumers => TRUE);
END;
Create and start the que.
BEGIN
  DBMS_AQADM.CREATE_QUEUE (
    queue_name  => 'XXDEMO_INV_QUE',
    queue_table => 'XXDEMO_INV_AQTAB');

  DBMS_AQADM.START_QUEUE (
    queue_name => 'XXDEMO_INV_QUE');
END;
Create tables to save the message and the detailed information from the XML.
DROP TABLE xxdemo_AQ_mess_table;
DROP TABLE xxdemo_AQ_data_table_head;
DROP TABLE xxdemo_AQ_data_table_det;

CREATE TABLE xxdemo_AQ_mess_table (message       XMLTYPE,
                                   creation_date DATE);

CREATE TABLE xxdemo_AQ_data_table_head (id            NUMBER,
                                        first         VARCHAR2(600),
                                        last          VARCHAR2(600),
                                        creation_date DATE);

CREATE TABLE xxdemo_AQ_data_table_det (id            NUMBER,
                                       address       VARCHAR2(600),
                                       creation_date DATE);

To get the automatic dequing functionality to work we need to add a subscriber and register a procedure with the logic we want to execute.
BEGIN
   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'XXDEMO_INV_QUE',
      subscriber => SYS.AQ$_AGENT('xxdemo_queue_subscriber',
                                   NULL,
                                   NULL));

    DBMS_AQ.REGISTER (SYS.AQ$_REG_INFO_LIST(SYS.AQ$_REG_INFO('XXDEMO_INV_QUE:xxdemo_queue_subscriber',
                                                              DBMS_AQ.NAMESPACE_AQ,
                                                             'plsql://xxdemo_AQ_callback_proc',
                                                              HEXTORAW('FF'))
                                           ),1);
END;
The procedure that will execute.
create or replace
PROCEDURE xxdemo_AQ_callback_proc(context  RAW,
                                  reginfo  SYS.AQ$_REG_INFO,
                                  descr    SYS.AQ$_DESCRIPTOR,
                                  payload  RAW,
                                  payloadl NUMBER) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   v_xmldata            xmltype;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   -- deque message
   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => v_xmldata,
      msgid              => v_message_handle);

   -- insert raw message into table for reference
   INSERT INTO xxdemo_AQ_mess_table (message, creation_date)
   VALUES (v_xmldata, SYSDATE);

  /*********************************************************
  * Inserts into master and details table with 1 statemant *
  **********************************************************/
  INSERT ALL
    WHEN rn = 1 THEN
      INTO xxdemo_AQ_data_table_head VALUES (id, first, last)
    WHEN address IS NOT NULL THEN
      INTO xxdemo_AQ_data_table_det VALUES (id, address)
  SELECT row_number() over (PARTITION BY id ORDER BY null) rn,
         id,
         first,
         last,
         address
  FROM (SELECT EXTRACTVALUE(VALUE(headers), '//ID') id,
               EXTRACTVALUE(VALUE(headers), '//VALUE1') first,
               EXTRACTVALUE(VALUE(headers), '//VALUE2') last,
               EXTRACTVALUE (VALUE(details), '*/ADDRESS') address
          FROM TABLE(XMLSEQUENCE(EXTRACT(v_xmldata, '*/INVOICE'))) headers,
               TABLE(XMLSEQUENCE(EXTRACT(VALUE(headers), '*/DISTRIBUTION'))) details);

  /**********************************
  * The same as above but with loop *
  ***********************************/
  --FOR rec in (SELECT row_number() over (PARTITION BY id ORDER BY NULL) rn,
  --                   id,
  --                   first,
  --                   last,
  --                   address
  --            FROM (SELECT EXTRACTVALUE(VALUE(headers), '//ID') id,
  --                         EXTRACTVALUE(VALUE(headers), '//VALUE1') first,
  --                         EXTRACTVALUE(VALUE(headers), '//VALUE2') last,
  --                         EXTRACTVALUE (VALUE(details), '*/ADDRESS') address
  --            FROM TABLE(XMLSEQUENCE(EXTRACT(v_xmldata, '*/INVOICE'))) headers,
  --                 TABLE (XMLSEQUENCE(EXTRACT(VALUE(headers), '*/DISTRIBUTION'))) details)) LOOP
  --
  --  IF rec.rn = 1 THEN
  --    INSERT INTO xxdemo_AQ_data_table_head VALUES (rec.id, rec.first, rec.last, SYSDATE);
  --  END IF;
  --
  --  IF rec.address IS NOT NULL THEN
  --    INSERT INTO xxdemo_AQ_data_table_det VALUES (rec.id, rec.address, SYSDATE);
  --  END IF;
  --
  --END LOOP;
  COMMIT;
END xxdemo_AQ_callback_proc;
Test by putting a message on the que.
DECLARE
  r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle     RAW(16);
  v_data               CLOB;
  v_xmldata            XMLTYPE;
BEGIN

  DELETE FROM xxdemo_AQ_data_table_head;
  DELETE FROM xxdemo_AQ_data_table_det;

  v_data := '<?xml version="1.0"?><DATA>';
  v_data := v_data||'<INVOICE><ID>1</ID><VALUE1>Daniel</VALUE1><VALUE2>Borgstrom</VALUE2><DISTRIBUTION><ADDRESS>1 Broadway</ADDRESS></DISTRIBUTION><DISTRIBUTION><ADDRESS>26 Wall Street</ADDRESS></DISTRIBUTION></INVOICE>';
  v_data := v_data||'<INVOICE><ID>2</ID><VALUE1>Sven</VALUE1><VALUE2>Svensson</VALUE2><DISTRIBUTION><ADDRESS>23 Houston Street</ADDRESS></DISTRIBUTION></INVOICE>';
  v_data := v_data||'<INVOICE><ID>3</ID><VALUE1>Tom</VALUE1><VALUE2>Olsson</VALUE2><DISTRIBUTION><ADDRESS>44 Lexington Ave</ADDRESS></DISTRIBUTION></INVOICE>';
  v_data := v_data||'<INVOICE><ID>4</ID><VALUE1>Arne</VALUE1><VALUE2>Jonsson</VALUE2><DISTRIBUTION><ADDRESS>81 Rue de Paris</ADDRESS></DISTRIBUTION><DISTRIBUTION><ADDRESS>71 Champs Elysee</ADDRESS></DISTRIBUTION></INVOICE>';
  v_data := v_data||'</DATA>';

  v_xmldata := xmltype(v_data);

   DBMS_AQ.ENQUEUE(
      queue_name         => 'XXDEMO_INV_QUE',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => v_xmldata,
      msgid              => v_message_handle);

  COMMIT;

END;
When the que has processed the message data should appear in xxdemo_AQ_mess_table, xxdemo_AQ_data_table_head and xxdemo_AQ_data_table_det.

No comments:

Post a Comment

javascript:void(0)