Automatic deqeuing with multi level XML (Advanced Queing)
Using Advanced Queing is a good idea when you have some integration or other stuff you want to execute asynchronous. With Advanced Queing it's also possible to automte the deque process using a subscriber. This means, as soon as you put anything on the que it will be dequed as soon as possible.
The below example will illustrate that. The code will also show how to read a muli level XML using PL/SQL.
Create que table (also stops the que and drops the que table if it's existing).
The below example will illustrate that. The code will also show how to read a muli level XML using PL/SQL.
Create que table (also stops the que and drops the que table if it's existing).
BEGIN BEGIN DBMS_AQADM.STOP_QUEUE(queue_name => 'XXDEMO_INV_QUE'); EXCEPTION WHEN others THEN NULL; END; BEGIN DBMS_AQADM.DROP_QUEUE(queue_name => 'XXDEMO_INV_QUE'); EXCEPTION WHEN others THEN NULL; END; BEGIN DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'XXDEMO_INV_AQTAB'); EXCEPTION WHEN others THEN NULL; END; DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'XXDEMO_INV_AQTAB', queue_payload_type => 'SYS.XMLtype', multiple_consumers => TRUE); END;Create and start the que.
BEGIN DBMS_AQADM.CREATE_QUEUE ( queue_name => 'XXDEMO_INV_QUE', queue_table => 'XXDEMO_INV_AQTAB'); DBMS_AQADM.START_QUEUE ( queue_name => 'XXDEMO_INV_QUE'); END;Create tables to save the message and the detailed information from the XML.
DROP TABLE xxdemo_AQ_mess_table; DROP TABLE xxdemo_AQ_data_table_head; DROP TABLE xxdemo_AQ_data_table_det; CREATE TABLE xxdemo_AQ_mess_table (message XMLTYPE, creation_date DATE); CREATE TABLE xxdemo_AQ_data_table_head (id NUMBER, first VARCHAR2(600), last VARCHAR2(600), creation_date DATE); CREATE TABLE xxdemo_AQ_data_table_det (id NUMBER, address VARCHAR2(600), creation_date DATE);To get the automatic dequing functionality to work we need to add a subscriber and register a procedure with the logic we want to execute.
BEGIN DBMS_AQADM.ADD_SUBSCRIBER ( queue_name => 'XXDEMO_INV_QUE', subscriber => SYS.AQ$_AGENT('xxdemo_queue_subscriber', NULL, NULL)); DBMS_AQ.REGISTER (SYS.AQ$_REG_INFO_LIST(SYS.AQ$_REG_INFO('XXDEMO_INV_QUE:xxdemo_queue_subscriber', DBMS_AQ.NAMESPACE_AQ, 'plsql://xxdemo_AQ_callback_proc', HEXTORAW('FF')) ),1); END;The procedure that will execute.
create or replace PROCEDURE xxdemo_AQ_callback_proc(context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER) AS r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); v_xmldata xmltype; BEGIN r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; -- deque message DBMS_AQ.DEQUEUE( queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => v_xmldata, msgid => v_message_handle); -- insert raw message into table for reference INSERT INTO xxdemo_AQ_mess_table (message, creation_date) VALUES (v_xmldata, SYSDATE); /********************************************************* * Inserts into master and details table with 1 statemant * **********************************************************/ INSERT ALL WHEN rn = 1 THEN INTO xxdemo_AQ_data_table_head VALUES (id, first, last) WHEN address IS NOT NULL THEN INTO xxdemo_AQ_data_table_det VALUES (id, address) SELECT row_number() over (PARTITION BY id ORDER BY null) rn, id, first, last, address FROM (SELECT EXTRACTVALUE(VALUE(headers), '//ID') id, EXTRACTVALUE(VALUE(headers), '//VALUE1') first, EXTRACTVALUE(VALUE(headers), '//VALUE2') last, EXTRACTVALUE (VALUE(details), '*/ADDRESS') address FROM TABLE(XMLSEQUENCE(EXTRACT(v_xmldata, '*/INVOICE'))) headers, TABLE(XMLSEQUENCE(EXTRACT(VALUE(headers), '*/DISTRIBUTION'))) details); /********************************** * The same as above but with loop * ***********************************/ --FOR rec in (SELECT row_number() over (PARTITION BY id ORDER BY NULL) rn, -- id, -- first, -- last, -- address -- FROM (SELECT EXTRACTVALUE(VALUE(headers), '//ID') id, -- EXTRACTVALUE(VALUE(headers), '//VALUE1') first, -- EXTRACTVALUE(VALUE(headers), '//VALUE2') last, -- EXTRACTVALUE (VALUE(details), '*/ADDRESS') address -- FROM TABLE(XMLSEQUENCE(EXTRACT(v_xmldata, '*/INVOICE'))) headers, -- TABLE (XMLSEQUENCE(EXTRACT(VALUE(headers), '*/DISTRIBUTION'))) details)) LOOP -- -- IF rec.rn = 1 THEN -- INSERT INTO xxdemo_AQ_data_table_head VALUES (rec.id, rec.first, rec.last, SYSDATE); -- END IF; -- -- IF rec.address IS NOT NULL THEN -- INSERT INTO xxdemo_AQ_data_table_det VALUES (rec.id, rec.address, SYSDATE); -- END IF; -- --END LOOP; COMMIT; END xxdemo_AQ_callback_proc;Test by putting a message on the que.
DECLARE r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); v_data CLOB; v_xmldata XMLTYPE; BEGIN DELETE FROM xxdemo_AQ_data_table_head; DELETE FROM xxdemo_AQ_data_table_det; v_data := '<?xml version="1.0"?><DATA>'; v_data := v_data||'<INVOICE><ID>1</ID><VALUE1>Daniel</VALUE1><VALUE2>Borgstrom</VALUE2><DISTRIBUTION><ADDRESS>1 Broadway</ADDRESS></DISTRIBUTION><DISTRIBUTION><ADDRESS>26 Wall Street</ADDRESS></DISTRIBUTION></INVOICE>'; v_data := v_data||'<INVOICE><ID>2</ID><VALUE1>Sven</VALUE1><VALUE2>Svensson</VALUE2><DISTRIBUTION><ADDRESS>23 Houston Street</ADDRESS></DISTRIBUTION></INVOICE>'; v_data := v_data||'<INVOICE><ID>3</ID><VALUE1>Tom</VALUE1><VALUE2>Olsson</VALUE2><DISTRIBUTION><ADDRESS>44 Lexington Ave</ADDRESS></DISTRIBUTION></INVOICE>'; v_data := v_data||'<INVOICE><ID>4</ID><VALUE1>Arne</VALUE1><VALUE2>Jonsson</VALUE2><DISTRIBUTION><ADDRESS>81 Rue de Paris</ADDRESS></DISTRIBUTION><DISTRIBUTION><ADDRESS>71 Champs Elysee</ADDRESS></DISTRIBUTION></INVOICE>'; v_data := v_data||'</DATA>'; v_xmldata := xmltype(v_data); DBMS_AQ.ENQUEUE( queue_name => 'XXDEMO_INV_QUE', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => v_xmldata, msgid => v_message_handle); COMMIT; END;When the que has processed the message data should appear in xxdemo_AQ_mess_table, xxdemo_AQ_data_table_head and xxdemo_AQ_data_table_det.
Comments
Post a Comment