Oracle Pipelined Table Functions
Version 12.1.0.2

General Information
Library Note This page has been updated for Oracle release 12cR1 and shown to be compatible in the Container DB.
Data Dictionary Objects
ALL_ARGUMENTS CDB_OBJECT_SIZE ERROR$
ALL_ERRORS CDB_PROCEDURES SOURCE$
ALL_OBJECT_SIZE CDB_SOURCE_AE USER_ARGUMENTS
ALL_PROCEDURES DBA_ARGUMENTS USER_ERRORS
ALL_SOURCE_AE DBA_ERRORS USER_OBJECT_SIZE
ARGUMENT$ DBA_OBJECT_SIZE USER_PROCEDURES
CDB_ARGUMENTS DBA_PROCEDURES USER_SOURCE_AE
CDB_ERRORS DBA_SOURCE_AE  
Object Privileges Privileges to tables and views granted through roles may not be valid within a procedure. See the section on AUTHID.

GRANT execute ON <object_name> TO <user_name>;
System Privileges
ALTER ANY PROCEDURE DEBUG ANY PROCEDURE EXECUTE ANY PROCEDURE
CREATE ANY PROCEDURE DROP ANY PROCEDURE SELECT ANY TABLE
CREATE PROCEDURE    
Syntax CREATE OR REPLACE FUNCTION <schema_name>.<function_name>
(<argument> [IN | OUT | IN OUT] [NOCOPY] <data type>)
RETURN <data type>
[AUTHID <CURRENT USER | DEFINER>]
[<AGGREGATE | PIPELINED>]
[PARALLEL_ENABLE (PARTITION <argument> BY [<HASH, RANGE> (<column_list>), ANY])] IS
 <constant_and_variable_declarations>
 <exception_declarations>
 <pragma_declarations>
BEGIN
  <function_body>
END <function_name>;
/
 
Tables And Data For Demo
Table Definition CREATE TABLE stocktable (
ticker       VARCHAR2(4),
open_price   NUMBER(10),
close_price  NUMBER(10));
Demo Data INSERT INTO stocktable VALUES ('ORCL', 13, 16);
INSERT INTO stocktable VALUES ('MSFT', 35, 29);
INSERT INTO stocktable VALUES ('SUNW', 7, 11);
COMMIT;
Type Definition CREATE OR REPLACE TYPE TickerType AUTHID DEFINER AS OBJECT(
ticker    VARCHAR2(4),
pricetype VARCHAR2(1),
price     NUMBER(10));
/
Create Table Type CREATE OR REPLACE TYPE TickerTypeSet AS TABLE OF TickerType;
/
Create Package CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
 TYPE refcur_t IS REF CURSOR RETURN StockTable%ROWTYPE;
END refcur_pkg;
/
Create Table Function CREATE OR REPLACE FUNCTION stockpivot(p refcur_pkg.refcur_t)
RETURN TickerTypeSet AUTHID DEFINER PIPELINED IS
 out_rec  TickerType := TickerType(NULL,NULL,NULL);
 in_rec   p%ROWTYPE;
BEGIN
  LOOP
    FETCH p INTO in_rec;
    EXIT WHEN p%NOTFOUND;

    out_rec.ticker := in_rec.Ticker;
    out_rec.pricetype := 'O';
    out_rec.price := in_rec.Open_Price;
    PIPE ROW(out_rec);

    out_rec.PriceType := 'C';
    out_rec.price := in_rec.Close_Price;
    PIPE ROW(out_rec);
  END LOOP;
  CLOSE p;
  RETURN;
END stockpivot;
/

desc stockpivot

set linesize 121
col pipelined format a10

SELECT object_name, pipelined, authid
FROM user_procedures;
Sample Query SELECT *
FROM TABLE(stockpivot(CURSOR(SELECT * FROM StockTable)));
A related pivot based on a different table compared with straight SQL CREATE OR REPLACE TYPE AirType AUTHID DEFINER AS OBJECT(
program_id VARCHAR2(3),
line_number NUMBER(10),
customer_id VARCHAR2(4),
order_date DATE,
delivered_date DATE);
/

CREATE OR REPLACE TYPE AirTypeSet AS TABLE OF AirType;
/

CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
TYPE refcur_t IS REF CURSOR RETURN airplanes%ROWTYPE;
END refcur_pkg;
/

CREATE OR REPLACE FUNCTION airpivot(
p refcur_pkg.refcur_t)
RETURN AirTypeSet AUTHID DEFINER PIPELINED IS

out_rec AirType := AirType(NULL,NULL,NULL,NULL,NULL);
in_rec p%ROWTYPE;
BEGIN
LOOP
FETCH p INTO in_rec;
EXIT WHEN p%NOTFOUND;

out_rec.program_id := in_rec.program_id;
out_rec.line_number := in_rec.line_number;
out_rec.customer_id := in_rec.customer_id;
out_rec.order_date := in_rec.order_date;
out_rec.delivered_date := in_rec.delivered_date;

PIPE ROW(out_rec);
END LOOP;
CLOSE p;
RETURN;
END airpivot;
/

set timing on

SELECT program_id, SUM(line_number)
FROM TABLE(airpivot(CURSOR(SELECT * FROM airplanes)))
GROUP BY program_id;

SELECT program_id, SUM(line_number)
FROM airplanes
GROUP BY program_id;

CREATE OR REPLACE TYPE d_vtyp AS VARRAY(5000) OF AirType;
/

SELECT program_id, SUM(e.line_number)
FROM TABLE(CAST(MULTISET(SELECT * FROM airplanes) AS d_vtyp)) e
GROUP BY program_id;

-- the lesson is clear ... only use a pipelined table function when SQL will not work
 
Date Generator
Generate Date List CREATE OR REPLACE TYPE date_array AS TABLE OF DATE;
/

CREATE OR REPLACE FUNCTION date_table(sdate DATE, edate DATE)
RETURN date_array AUTHID CURRENT_USER PIPELINED AS

BEGIN
  FOR i IN 0 .. (edate - sdate) LOOP
    PIPE ROW(sdate + i);
  END LOOP;
  RETURN;
END date_table;
/

desc date_table

SELECT object_name, pipelined, authid
FROM user_procedures;

SELECT *
FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE))
AS date_array));

-- joined with another table

CREATE TABLE testdata (
datecol DATE,
someval NUMBER);

INSERT INTO testdata VALUES (TRUNC(SYSDATE-25), 25);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-20), 20);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-15), 15);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-10), 10);
INSERT INTO testdata VALUES (TRUNC(SYSDATE-5), 5);
COMMIT;

SELECT * FROM testdata;

SELECT da.column_value AS DATECOL, td.someval
FROM TABLE(CAST(date_table(TRUNC(SYSDATE-30), TRUNC(SYSDATE))
AS date_array)) da, testdata td
WHERE da.COLUMN_VALUE = td.datecol(+);

Note: A SQL alternative would be:
SELECT iv.datecol, td.someval
FROM (
WITH dates AS (SELECT SYSDATE-30 dt_start, SYSDATE dt_end FROM dual)
SELECT dt_start+rownum-1 AS DATECOL
FROM dates
CONNECT BY LEVEL <= dt_end-dt_start) iv, testdata td
WHERE TRUNC(iv.datecol) = TRUNC(td.datecol (+))
ORDER BY datecol;
 
Comma Delimited List
Turning a comma delimited list into a row-by-row output CREATE OR REPLACE TYPE str_array AS TABLE OF VARCHAR2(10);
/

CREATE OR REPLACE FUNCTION ptf(stringin VARCHAR2) RETURN str_array PIPELINED IS
 i   PLS_INTEGER;
 str VARCHAR2(100);
 tab sys.dbms_utility.uncl_array;
BEGIN
  str := '"' || REPLACE(stringin, ',', '","') || '"';
  sys.dbms_utility.comma_to_table(str, i, tab);

  FOR j IN 1 .. 5 LOOP
    PIPE ROW(TRANSLATE(tab(j),'A"','A'));
  END LOOP;
  RETURN;
END ptf;
/

SELECT *
FROM TABLE(CAST(ptf('1001,1002,1003,1004,1005') AS str_array));
 
Real-Time DBMS_OUTPUT Output
A demonstration of using a PTE to force real-time output from inside a loop with the DBMS_OUTPUT package CREATE OR REPLACE TYPE msgType AS TABLE OF VARCHAR2(60);
/

CREATE OR REPLACE FUNCTION msgOutStream RETURN msgType
PIPELINED
AUTHID CURRENT_USER AS
BEGIN
  PIPE ROW('start run ' || sysTimestamp);
  FOR i IN 1 .. 10 LOOP
    PIPE ROW('output ' || TO_CHAR(i) || ' ' || sysTimestamp);
    dbms_lock.sleep(1);
    PIPE ROW('output ' || TO_CHAR(i) || ' completed');
  END LOOP;
  RETURN;
END;
/

SELECT * FROM TABLE(msgOutStream);

SET ARRAYSIZE 1

SELECT * FROM TABLE(msgOutStream);

-- remember to return your arraysize to its original value
SET ARRAYSIZE 250
 
Another Pivot Demo
I'm not sure where or when I got this but it looks like something Tom would have written so I will give that attribution until some other origin is established. CREATE OR REPLACE TYPE virtual_table_type AS TABLE OF number;
/

CREATE OR REPLACE FUNCTION virtual_table(p_num_rows IN NUMBER)
RETURN virtual_table_type AUTHID CURRENT_USER PIPELINED IS
BEGIN
  FOR i IN 1 .. p_num_rows LOOP
    dbms_output.put_line('going to pipe');
    PIPE ROW( i );
    dbms_output.put_line('done pipeing');
  END LOOP;
  RETURN;
END virtual_table;
/

SELECT * FROM TABLE(virtual_table(5));
SELECT * FROM TABLE(virtual_table(10));

set serveroutput on

BEGIN
  FOR x IN (SELECT * FROM TABLE(virtual_table(10))) LOOP
    dbms_output.put_line('Fetching.... ' || x.column_value);
  END LOOP;
END;
/

CREATE OR REPLACE TYPE myScalarType AS OBJECT (
c1 VARCHAR2(9),
c2 VARCHAR2(9),
c3 VARCHAR2(9),
c4 VARCHAR2(9),
c5 VARCHAR2(9),
c6 VARCHAR2(9),
c7 VARCHAR2(9));
/

desc myScalarType

CREATE OR REPLACE TYPE myArrayType AS TABLE OF myScalarType;
/

desc myArrayType

CREATE OR REPLACE FUNCTION pivot(p_cur IN sys_refcursor)
RETURN myArrayType AUTHID CURRENT_USER PIPELINED IS
 l_c1 varchar2(4000);
 l_c2 varchar2(4000);
 l_last varchar2(4000);
 l_cnt number ;
 l_data myScalarType;
BEGIN
  LOOP
    FETCH p_cur INTO l_c1, l_c2;
    EXIT WHEN p_cur%NOTFOUND;

    IF (l_last IS NULL OR l_c1 <> l_last) THEN
      IF (l_data IS NOT NULL) THEN
        pipe row(l_data);
      END IF;

      l_data := myScalarType(l_c1, l_c2, NULL, NULL, NULL, NULL, NULL);
      l_cnt := 3;
      l_last := l_c1;
    ELSE
      CASE l_cnt
      WHEN 3 THEN l_data.c3 := l_c2;
      WHEN 4 THEN l_data.c4 := l_c2;
      WHEN 5 THEN l_data.c5 := l_c2;
      WHEN 6 THEN l_data.c6 := l_c2;
      WHEN 7 THEN l_data.c7 := l_c2;
      ELSE raise program_error;
      END CASE;

      l_cnt := l_cnt+1;
    END IF;
  END LOOP;

  IF (l_data IS NOT NULL) THEN
    PIPE ROW(l_data);
  END IF;
  CLOSE p_cur;
  RETURN;
END pivot;
/

SELECT *
FROM TABLE(pivot(CURSOR(
  SELECT deptno, ename FROM scott.emp ORDER BY deptno)));

SELECT *
FROM TABLE(pivot(
CURSOR(SELECT deptno, hiredate FROM scott.emp ORDER BY deptno)));
 
Service Stats Solution Demo
Another PTF Demo based on code posted by James Colestock conn / as sysdba

ALTER SESSION ENABLE PARALLEL DML;

CREATE SEQUENCE metrics_seq;

CREATE TABLE metrics (
metric_id  NUMBER,
class_id   NUMBER       NOT NULL,
class_name VARCHAR2(20) NOT NULL,
stat_id    NUMBER       NOT NULL,
stat_name  VARCHAR2(64) NOT NULL);

ALTER TABLE metrics
ADD CONSTRAINT pk_metrics
PRIMARY KEY;

ALTER TABLE metrics
ADD CONSTRAINT uk_metrics
UNIQUE (class_id, class_name, stat_id, stat_name);

CREATE OR REPLACE TYPE rec_metrics_type AUTHID DEFINER IS OBJECT (
metric_id  NUMBER,
class_id   NUMBER,
class_name VARCHAR2(20),
stat_id    NUMBER,
stat_name  VARCHAR2(64));
/

CREATE OR REPLACE TYPE tbl_metrics_type AS TABLE OF rec_metrics_type;
/

CREATE DIMENSION metrics_dim
LEVEL     class_level IS (metrics.class_id)
LEVEL     stat_level  IS (metrics.stat_id)
HIERARCHY metric_hier (stat_level CHILD OF class_level)
ATTRIBUTE class_level DETERMINES (metrics.class_name)
ATTRIBUTE stat_level  DETERMINES (metrics.stat_name);

CREATE OR REPLACE VIEW metrics_vw AS
SELECT /*+ PARALLEL (wss, 2) */ s.class class_id,
       DECODE (s.class,
       1, 'User',
       2, 'Redo',
       4, 'Enqueue',
       8, 'Cache',
      16, 'OS',
      32, 'Parallelism',
      40, 'RAC',
      64, 'SQL',
      72, 'Internal Debugging',
     128, 'Debug') class_name,
       wss.stat_id, s.name
FROM sys.wrh$_service_stat wss, sys.v$sysstat s
WHERE wss.stat_id = s.stat_id
ORDER BY s.class;

CREATE OR REPLACE PACKAGE load_dim_pkg AUTHID DEFINER AS
 TYPE metrics_cur IS REF CURSOR RETURN metrics_vw%ROWTYPE;
 FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
   PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED;
END load_dim_pkg;
/

CREATE OR REPLACE PACKAGE BODY load_dim_pkg AS
 FUNCTION get_data(p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
 PARALLEL_ENABLE(PARTITION p_cur BY ANY) PIPELINED IS
  rec_in rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL);
  rec_out rec_metrics_type := rec_metrics_type(NULL, NULL, NULL, NULL, NULL);
 BEGIN
   LOOP
     FETCH p_cur INTO rec_in.class_id, rec_in.class_name, rec_in.stat_id,rec_in.stat_name;
     EXIT WHEN p_cur%NOTFOUND;

     rec_out.metric_id := NULL;
     rec_out.class_id := rec_in.class_id;
     rec_out.class_name := INITCAP (rec_in.class_name);
     rec_out.stat_id := rec_in.stat_id;
     rec_out.stat_name := INITCAP (rec_in.stat_name);
     PIPE ROW (rec_out);
   END LOOP;
   RETURN;
 END;
END load_dim_pkg;
/

set autotrace on
set linesize 161

MERGE INTO metrics m USING (
  SELECT * FROM TABLE(load_dim_pkg.get_data(CURSOR(SELECT DISTINCT * FROM metrics_vw)))) s1
  ON (m.stat_id = s1.stat_id)
WHEN MATCHED THEN
  UPDATE SET m.stat_name = s1.stat_name, m.class_id = s1.class_id,
             m.class_name = s1.class_name
WHEN NOT MATCHED THEN
  INSERT
  (metrics_id, class_id, class_name, stat_id, stat_name)
  VALUES
  (metrics_seq.NEXTVAL, s1.class_id, s1.class_name, s1.stat_id, s1.stat_name);

COMMIT;

set autotrace off

SELECT *
FROM sys.v$pq_sesstat
WHERE statistic IN ('Queries Parallelized', 'Server Threads');
 
XML Demo
PTF Returning XML conn uwclass/uwclass

CREATE OR REPLACE TYPE clob_array IS TABLE OF CLOB;
/

CREATE OR REPLACE FUNCTION ret_xml(query_str IN VARCHAR2) RETURN clob_array PIPELINED AUTHID DEFINER IS
 l_xml XMLTYPE;
BEGIN
  l_xml := XMLTYPE(dbms_xmlgen.getxml(query_str)) ;
  FOR rec IN (SELECT * FROM TABLE(xmlsequence(l_xml.extract('/ROWSET/ROW')))) LOOP
    PIPE ROW (rec.column_value.getCLOBVal()) ;
  END LOOP;
END ret_xml;
/

SELECT * FROM TABLE(ret_xml('SELECT * FROM scott.emp'));
 
Parallel Enabled PTF
Parallel Enable Demo CREATE OR REPLACE PACKAGE pkg_test AUTHID CURRENT_USER IS
 TYPE rt_Src IS RECORD(id NUMBER, col1 VARCHAR2(100));
 TYPE rc_Src IS REF CURSOR RETURN rt_Src;

 TYPE rt_Out IS RECORD(id NUMBER, col1 VARCHAR2(100), sid NUMBER);
 TYPE tt_Out IS TABLE OF rt_Out;

 FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION
airc_Src BY ANY);

 FUNCTION f_Pipe_Hash(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION airc_Src BY HASH(id));

 FUNCTION f_Pipe_Range(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
 PARALLEL_ENABLE(PARTITION
airc_Src BY RANGE(id));
END;
/

CREATE OR REPLACE PACKAGE BODY pkg_test IS
FUNCTION f_Pipe_Any(airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY ANY) IS
 lr_Src rt_Src;
 lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;

FUNCTION f_Pipe_Hash (airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY HASH(id)) IS
lr_Src rt_Src;
lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;

FUNCTION f_Pipe_Range (airc_Src IN rc_Src) RETURN tt_Out PIPELINED
PARALLEL_ENABLE(PARTITION
airc_Src BY RANGE(id)) IS
 lr_Src rt_Src;
 lr_Out rt_Out;
BEGIN
  LOOP
    FETCH airc_Src INTO lr_Src;
    EXIT WHEN airc_Src%NOTFOUND;

    SELECT sid
    INTO lr_Out.sid
    FROM v$mystat
    WHERE ROWNUM = 1;

    lr_Out.id := lr_Src.id;
    lr_Out.col1 := lr_Src.col1;
    PIPE ROW(lr_Out);
  END LOOP;
  CLOSE airc_Src;
END;
END;
/

CREATE TABLE dt_src (
id   NUMBER,
col1 VARCHAR2(100));

INSERT INTO dt_src
SELECT ROWNUM, TO_CHAR(ROWNUM)
FROM dual
CONNECT BY LEVEL <= 100000;

CREATE TABLE dt_out (
id       NUMBER,
col1     VARCHAR2(100),
sid      NUMBER,
src_proc VARCHAR2(32));

set autotrace trace stat explain

INSERT INTO dt_out
SELECT id, col1, sid, 'ANY'
FROM TABLE (pkg_test.f_Pipe_Any(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

INSERT INTO dt_out
SELECT id, col1, sid, 'HASH'
FROM TABLE(pkg_test.f_Pipe_Hash(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

INSERT INTO dt_out
SELECT id, col1, sid, 'RANGE'
FROM TABLE (pkg_test.f_Pipe_Range(CURSOR(SELECT --+parallel(dt_src)
id, col1 FROM dt_src)));

set autotrace off

SELECT COUNT(*), sid, src_proc
FROM dt_out
GROUP BY sid, src_proc
ORDER BY sid, src_proc;
 
Drop Pipelined Table Functions
Drop PTF DROP FUNCTION <function_name>;
DROP FUNCTION ptf;

DROP FUNCTION pivot;

DROP FUNCTION stockpivot;

Related Topics
Accessible By Clause
Anonymous Block
DBMS_XMLGEN
Exception Handling
Functions
NO_DATA_NEEDED Exception
Pseudocolumns
Tables
Types

Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2014 Daniel A. Morgan All Rights Reserved