| Oracle Pipelined Table Functions Version 11.2.0.3 |
|---|
| General Information | |||||||||||||||||||
| Noterow | |||||||||||||||||||
| Data Dictionary Objects |
|
||||||||||||||||||
| Related System Privileges |
|
||||||||||||||||||
| Syntax | CREATE OR REPLACE FUNCTION <schema_name>.<function_name> (<argument> [IN | OUT | IN OUT] [NOCOPY] <data type>) RETURN <data type> [AUTHID <CURRENT USER | DEFINER>] [<AGGREGATE | PIPELINED>] [PARALLEL_ENABLE (PARTITION <argument> BY [<HASH, RANGE> (<column_list>), ANY])] IS |
||||||||||||||||||
| Tables And Data For Demo | |||||||||||||||||||
| Table Definition | CREATE TABLE stocktable ( ticker VARCHAR2(4), open_price NUMBER(10), close_price NUMBER(10)); |
||||||||||||||||||
| Demo Data | INSERT INTO stocktable VALUES ('ORCL', 13, 16); INSERT INTO stocktable VALUES ('MSFT', 35, 29); INSERT INTO stocktable VALUES ('SUNW', 7, 11); COMMIT; |
||||||||||||||||||
| Type Definition | CREATE OR REPLACE TYPE TickerType AS OBJECT ( ticker VARCHAR2(4), pricetype VARCHAR2(1), price NUMBER(10)); / |
||||||||||||||||||
| Create Table Type | CREATE OR REPLACE TYPE TickerTypeSet AS TABLE OF TickerType; / |
||||||||||||||||||
| Create Package | CREATE OR REPLACE PACKAGE refcur_pkg IS TYPE refcur_t IS REF CURSOR RETURN StockTable%ROWTYPE; END refcur_pkg; / |
||||||||||||||||||
| Create Table Function | CREATE OR REPLACE FUNCTION stockpivot( p refcur_pkg.refcur_t) RETURN TickerTypeSet AUTHID DEFINER PIPELINED IS out_rec TickerType := TickerType(NULL,NULL,NULL); in_rec p%ROWTYPE; BEGIN LOOP FETCH p INTO in_rec; EXIT WHEN p%NOTFOUND; out_rec.ticker := in_rec.Ticker; out_rec.pricetype := 'O'; out_rec.price := in_rec.Open_Price; PIPE ROW(out_rec); out_rec.PriceType := 'C'; out_rec.price := in_rec.Close_Price; PIPE ROW(out_rec); END LOOP; CLOSE p; RETURN; END stockpivot; / desc stockpivot set linesize 121 col pipelined format a10 SELECT object_name, pipelined, authid FROM user_procedures; |
||||||||||||||||||
| Sample Query | SELECT * FROM TABLE(stockpivot(CURSOR(SELECT * FROM StockTable))); |
||||||||||||||||||
| Date Generator | |||||||||||||||||||
| Generate Date List | CREATE OR REPLACE TYPE date_array AS TABLE OF DATE; / CREATE OR REPLACE FUNCTION date_table(sdate DATE, edate DATE) RETURN date_array AUTHID CURRENT_USER PIPELINED AS BEGIN FOR i IN 0 .. (edate - sdate) LOOP PIPE ROW(sdate + i); END LOOP; RETURN; END date_table; / desc date_table SELECT object_name, pipelined, authid FROM user_procedures; SELECT * FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE)) AS date_array)); -- joined with another table CREATE TABLE testdata ( datecol DATE, someval NUMBER); INSERT INTO testdata VALUES (TRUNC(SYSDATE-25), 25); INSERT INTO testdata VALUES (TRUNC(SYSDATE-20), 20); INSERT INTO testdata VALUES (TRUNC(SYSDATE-15), 15); INSERT INTO testdata VALUES (TRUNC(SYSDATE-10), 10); INSERT INTO testdata VALUES (TRUNC(SYSDATE-5), 5); COMMIT; SELECT * FROM testdata; SELECT da.column_value AS DATECOL, td.someval FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE)) AS date_array)) da, testdata td WHERE da.COLUMN_VALUE = td.datecol(+); Note: A SQL alternative would be: SELECT iv.datecol, td.someval FROM ( WITH dates AS (SELECT SYSDATE-30 dt_start, SYSDATE dt_end FROM dual) SELECT dt_start+rownum-1 AS DATECOL FROM dates CONNECT BY LEVEL <= dt_end-dt_start) iv, testdata td WHERE TRUNC(iv.datecol) = TRUNC(td.datecol (+)) ORDER BY datecol; |
||||||||||||||||||
| Comma Delimited List | |||||||||||||||||||
| Turning a comma delimited list into a row-by-row output | CREATE OR REPLACE TYPE str_array AS TABLE OF VARCHAR2(10); / CREATE OR REPLACE FUNCTION ptf(stringin VARCHAR2) RETURN str_array PIPELINED IS i PLS_INTEGER; str VARCHAR2(100); tab sys.dbms_utility.uncl_array; BEGIN str := '"' || REPLACE(stringin, ',', '","') || '"'; sys.dbms_utility.comma_to_table(str, i, tab); FOR j IN 1 .. 5 LOOP PIPE ROW(TRANSLATE(tab(j),'A"','A')); END LOOP; RETURN; END ptf; / SELECT * FROM TABLE(CAST(ptf('1001,1002,1003,1004,1005') AS str_array)); |
||||||||||||||||||
| Another Pivot Demo | |||||||||||||||||||
| I'm not sure where or when I got this but it looks like something Tom would have written so I will give that attribution until some other origin is established. | CREATE OR REPLACE TYPE virtual_table_type AS TABLE OF number; / CREATE OR REPLACE FUNCTION virtual_table(p_num_rows IN NUMBER) RETURN virtual_table_type AUTHID CURRENT_USER PIPELINED IS BEGIN FOR i IN 1 .. p_num_rows LOOP dbms_output.put_line('going to pipe'); PIPE ROW( i ); dbms_output.put_line('done pipeing'); END LOOP; RETURN; END virtual_table; / SELECT * FROM TABLE(virtual_table(5)); SELECT * FROM TABLE(virtual_table(10)); set serveroutput on BEGIN FOR x IN (SELECT * FROM TABLE(virtual_table(10))) LOOP dbms_output.put_line('Fetching.... ' || x.column_value); END LOOP; END; / CREATE OR REPLACE TYPE myScalarType AS OBJECT ( c1 VARCHAR2(9), c2 VARCHAR2(9), c3 VARCHAR2(9), c4 VARCHAR2(9), c5 VARCHAR2(9), c6 VARCHAR2(9), c7 VARCHAR2(9)); / desc myScalarType CREATE OR REPLACE TYPE myArrayType AS TABLE OF myScalarType; / desc myArrayType CREATE OR REPLACE FUNCTION pivot(p_cur IN sys_refcursor) RETURN myArrayType AUTHID CURRENT_USER PIPELINED IS l_c1 varchar2(4000); l_c2 varchar2(4000); l_last varchar2(4000); l_cnt number ; l_data myScalarType; BEGIN LOOP FETCH p_cur INTO l_c1, l_c2; EXIT WHEN p_cur%NOTFOUND; IF (l_last IS NULL OR l_c1 <> l_last) THEN IF (l_data IS NOT NULL) THEN pipe row(l_data); END IF; l_data := myScalarType(l_c1, l_c2, NULL, NULL, NULL, NULL, NULL); l_cnt := 3; l_last := l_c1; ELSE CASE l_cnt WHEN 3 THEN l_data.c3 := l_c2; WHEN 4 THEN l_data.c4 := l_c2; WHEN 5 THEN l_data.c5 := l_c2; WHEN 6 THEN l_data.c6 := l_c2; WHEN 7 THEN l_data.c7 := l_c2; ELSE raise program_error; END CASE; l_cnt := l_cnt+1; END IF; END LOOP; IF (l_data IS NOT NULL) THEN PIPE ROW(l_data); END IF; CLOSE p_cur; RETURN; END pivot; / SELECT * FROM TABLE(pivot(CURSOR( SELECT deptno, ename FROM scott.emp ORDER BY deptno))); SELECT * FROM TABLE(pivot( CURSOR(SELECT deptno, hiredate FROM scott.emp ORDER BY deptno))); |
||||||||||||||||||
| Service Stats Solution Demo | |||||||||||||||||||
| Another PTF Demo based on code posted by James Colestock | conn / as sysdba ALTER SESSION ENABLE PARALLEL DML; CREATE SEQUENCE metrics_seq; CREATE TABLE metrics ( metric_id NUMBER, class_id NUMBER NOT NULL, class_name VARCHAR2(20) NOT NULL, stat_id NUMBER NOT NULL, stat_name VARCHAR2(64) NOT NULL); ALTER TABLE metrics ADD CONSTRAINT pk_metrics PRIMARY KEY; ALTER TABLE metrics ADD CONSTRAINT uk_metrics UNIQUE (class_id, class_name, stat_id, stat_name); CREATE OR REPLACE TYPE rec_metrics_type AUTHID DEFINER IS OBJECT ( metric_id NUMBER, class_id NUMBER, class_name VARCHAR2(20), stat_id NUMBER, stat_name VARCHAR2(64)); / CREATE OR REPLACE TYPE tbl_metrics_type AS TABLE OF rec_metrics_type; / CREATE DIMENSION metrics_dim LEVEL class_level IS (metrics.class_id) LEVEL stat_level IS (metrics.stat_id) HIERARCHY metric_hier (stat_level CHILD OF class_level) ATTRIBUTE class_level DETERMINES (metrics.class_name) ATTRIBUTE stat_level DETERMINES (metrics.stat_name); CREATE OR REPLACE VIEW metrics_vw AS SELECT /*+ PARALLEL (wss, 2) */ s.class class_id, DECODE (s.class, 1, 'User', 2, 'Redo', 4, 'Enqueue', 8, 'Cache', 16, 'OS', 32, 'Parallelism', 40, 'RAC', 64, 'SQL', 72, 'Internal Debugging', 128, 'Debug') class_name, wss.stat_id, s.name FROM sys.wrh$_service_stat wss, sys.v$sysstat s WHERE wss.stat_id = s.stat_id ORDER BY s.class; CREATE OR REPLACE PACKAGE load_dim_pkg AUTHID DEFINER AS TYPE metrics_cur IS REF CURSOR RETURN metrics_vw%ROWTYPE; FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED; END load_dim_pkg; / CREATE OR REPLACE PACKAGE BODY load_dim_pkg AS FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED IS rec_in rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL); rec_out rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL); BEGIN LOOP FETCH p_cur INTO rec_in.class_id, rec_in.class_name, rec_in.stat_id,rec_in.stat_name; EXIT WHEN p_cur%NOTFOUND; rec_out.metric_id := NULL; rec_out.class_id := rec_in.class_id; rec_out.class_name := INITCAP (rec_in.class_name); rec_out.stat_id := rec_in.stat_id; rec_out.stat_name := INITCAP (rec_in.stat_name); PIPE ROW (rec_out); END LOOP; RETURN; END; END load_dim_pkg; / set autotrace on set linesize 161 MERGE INTO metrics m USING ( SELECT * FROM TABLE(load_dim_pkg.get_data(CURSOR(SELECT DISTINCT * FROM metrics_vw)))) s1 ON (m.stat_id = s1.stat_id) WHEN MATCHED THEN UPDATE SET m.stat_name = s1.stat_name, m.class_id = s1.class_id, m.class_name = s1.class_name WHEN NOT MATCHED THEN INSERT (metrics_id, class_id, class_name, stat_id, stat_name) VALUES (metrics_seq.NEXTVAL, s1.class_id, s1.class_name, s1.stat_id, s1.stat_name); COMMIT; set autotrace off SELECT * FROM sys.v$pq_sesstat WHERE statistic IN ('Queries Parallelized', 'Server Threads'); |
||||||||||||||||||
| XML Demo | |||||||||||||||||||
| PTF Returning XML | conn uwclass/uwclass CREATE OR REPLACE TYPE clob_array IS TABLE OF CLOB; / CREATE OR REPLACE FUNCTION ret_xml(query_str IN VARCHAR2) RETURN clob_array PIPELINED AUTHID DEFINER IS l_xml XMLTYPE; BEGIN l_xml := XMLTYPE(dbms_xmlgen.getxml(query_str)) ; FOR rec IN (SELECT * FROM TABLE(xmlsequence(l_xml.extract('/ROWSET/ROW')))) LOOP PIPE ROW (rec.column_value.getCLOBVal()) ; END LOOP; END ret_xml; / SELECT * FROM TABLE(ret_xml('SELECT * FROM scott.emp')); |
||||||||||||||||||
| Parallel Enabled PTF | |||||||||||||||||||
| Parallel Enable Demo | CREATE OR REPLACE PACKAGE pkg_test AUTHID CURRENT_USER IS TYPE rt_Src IS RECORD(id NUMBER, col1 VARCHAR2(100)); TYPE rc_Src IS REF CURSOR RETURN rt_Src; TYPE rt_Out IS RECORD(id NUMBER, col1 VARCHAR2(100), sid NUMBER); TYPE tt_Out IS TABLE OF rt_Out; FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY ANY); FUNCTION f_Pipe_Hash(airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY HASH(id)); FUNCTION f_Pipe_Range(airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY RANGE(id)); END; / CREATE OR REPLACE PACKAGE BODY pkg_test IS FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY ANY) IS lr_Src rt_Src; lr_Out rt_Out; BEGIN LOOP FETCH airc_Src INTO lr_Src; EXIT WHEN airc_Src%NOTFOUND; SELECT sid INTO lr_Out.sid FROM v$mystat WHERE ROWNUM = 1; lr_Out.id := lr_Src.id; lr_Out.col1 := lr_Src.col1; PIPE ROW(lr_Out); END LOOP; CLOSE airc_Src; END; FUNCTION f_Pipe_Hash (airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY HASH(id)) IS lr_Src rt_Src; lr_Out rt_Out; BEGIN LOOP FETCH airc_Src INTO lr_Src; EXIT WHEN airc_Src%NOTFOUND; SELECT sid INTO lr_Out.sid FROM v$mystat WHERE ROWNUM = 1; lr_Out.id := lr_Src.id; lr_Out.col1 := lr_Src.col1; PIPE ROW(lr_Out); END LOOP; CLOSE airc_Src; END; FUNCTION f_Pipe_Range (airc_Src IN rc_Src) RETURN tt_Out PIPELINED PARALLEL_ENABLE(PARTITION airc_Src BY RANGE(id)) IS lr_Src rt_Src; lr_Out rt_Out; BEGIN LOOP FETCH airc_Src INTO lr_Src; EXIT WHEN airc_Src%NOTFOUND; SELECT sid INTO lr_Out.sid FROM v$mystat WHERE ROWNUM = 1; lr_Out.id := lr_Src.id; lr_Out.col1 := lr_Src.col1; PIPE ROW(lr_Out); END LOOP; CLOSE airc_Src; END; END; / CREATE TABLE dt_src ( id NUMBER, col1 VARCHAR2(100)); INSERT INTO dt_src SELECT ROWNUM, TO_CHAR(ROWNUM) FROM dual CONNECT BY LEVEL <= 100000; CREATE TABLE dt_out ( id NUMBER, col1 VARCHAR2(100), sid NUMBER, src_proc VARCHAR2(32)); set autotrace trace stat explain INSERT INTO dt_out SELECT id, col1, sid, 'ANY' FROM TABLE (pkg_test.f_Pipe_Any(CURSOR(SELECT --+parallel(dt_src) id, col1 FROM dt_src))); INSERT INTO dt_out SELECT id, col1, sid, 'HASH' FROM TABLE(pkg_test.f_Pipe_Hash(CURSOR(SELECT --+parallel(dt_src) id, col1 FROM dt_src))); INSERT INTO dt_out SELECT id, col1, sid, 'RANGE' FROM TABLE (pkg_test.f_Pipe_Range(CURSOR(SELECT --+parallel(dt_src) id, col1 FROM dt_src))); set autotrace off SELECT COUNT(*), sid, src_proc FROM dt_out GROUP BY sid, src_proc ORDER BY sid, src_proc; |
||||||||||||||||||
| Drop Pipelined Table Functions | |||||||||||||||||||
| Drop PTF | DROP FUNCTION <function_name>; | ||||||||||||||||||
| DROP FUNCTION ptf; DROP FUNCTION pivot; DROP FUNCTION stockpivot; |
|||||||||||||||||||
| Related Topics |
| Anonymous Block |
| DBMS_XMLGEN |
| Exception Handling |
| Functions |
| NO_DATA_NEEDED Exception |
| Pseudocolumns |
| Tables |
| Types |
| This site is maintained by Dan Morgan. Last Updated: | This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2013 Daniel A. Morgan All Rights Reserved | |||||||||
|
|
||||||||||