Oracle DBMS_KAFKA
Version 23c

General Information
Library Note Morgan's Library Page Header
ACE Director Alum Daniel Morgan, founder of Morgan's Library, is scheduling complimentary technical Workshops on Database Security for the first 30 Oracle Database customers located anywhere in North America, EMEA, LATAM, or APAC that send an email to asra_us@oracle.com. Request a Workshop for your organization today.
Purpose Oracle SQL Access to Kafka (OSAK) is a feature of Oracle Database that allows data in Kafka topics to be queried from Oracle SQL. OSAK allows Kafka to be queried in one of three ways, each abstracted as a type of application created using one of three PL/SQL calls: CREATE_LOAD_APP, CREATE_STREAMING_APP, or CREATE_SEEKABLE_APP.
AUTHID CURRENT_USER
Constants
Name Data Type Value
 Application Types
APPLICATION_TYPE_LOAD VARCHAR2(30) 'LOAD'
APPLICATION_TYPE_SEEKABLE VARCHAR2(30) 'SEEKABLE'
APPLICATION_TYPE_STREAMING VARCHAR2(30) 'STREAMING'
 AVRO Options
OPT_AVRODECIMALTYPE VARCHAR2(30) 'avrodecimaltype'
 DSV Options
OPT_BLANKASNULL VARCHAR2(30) 'blankasnull'
 Debugging/Logging Options
OPT_DEBUG VARCHAR2(30) 'debug'
OPT_LOGEXEC VARCHAR2(30) 'logexec'
OPT_LOGOPT VARCHAR2(30) 'logopt'
OPT_LOGQC VARCHAR2(30) 'logqc'
 General Options
OPT_BUFSIZE VARCHAR2(30) 'bufsize'
OPT_SEPARATOR VARCHAR2(30) 'separator'
OPT_STRICT VARCHAR2(30) 'strict'
OPT_TERMINATOR VARCHAR2(30) 'terminator'
 Kafka Value Options
OPT_AVROSCHEMA VARCHAR2(30) 'avroschema'
OPT_FMT VARCHAR2(30) 'fmt'
OPT_JSONDT VARCHAR2(30) 'jsondt'
OPT_REFTABLE VARCHAR2(30) 'reftable'
 Kafka Key Options
OPT_KEYFMT VARCHAR2(30) 'keyfmt'
 Not Implemented Option Character Sets
OPT_CONVERSIONERRS VARCHAR2(30) 'conversionerrs'
OPT_DATEFMT VARCHAR2(30) 'datefmt'
OPT_ESCAPEDBY VARCHAR2(30) 'escapedby'
OPT_NULLDEFINEDAS VARCHAR2(30) 'nulldefinedas'
 Not Yet Implemented Option Columns
OPT_QUOTE VARCHAR2(30) 'quote'
OPT_REJECTLMT VARCHAR2(30) 'rejectlmt'
OPT_REMOVEQUOTES VARCHAR2(30) 'removequotes'
OPT_TSFMT VARCHAR2(30) 'tsfmt'
OPT_TSLTZFMT VARCHAR2(30) 'tsltzfmt'
OPT_TSTZFMT VARCHAR2(30) 'tstzfmt'
OPT_TRIMSPACES VARCHAR2(30) 'trimspaces'
OPT_TRUNCATECOL VARCHAR2(30) 'truncatecol'
 NYI Option Key Columns
OPT_KEYAVROSCHEMA VARCHAR2(30) 'keyavroschema'
OPT_KEYREFTABLE VARCHAR2(30) 'keyreftable'
 Topic Formats
TOPIC_FORMAT_AVRO VARCHAR2(30) 'AVRO'
TOPIC_FORMAT_DSV VARCHAR2(30) 'DSV'
TOPIC_FORMAT_JSON VARCHAR2(30) 'JSON'
 Watermark
WATER_MARK_HIGH VARCHAR2(30) 'HIGH'
WATER_MARK_LOW VARCHAR2(30) 'LOW'
Dependencies
DBMS_ASSERT DBMS_STANDARD USER_KAFKA_OPS
DBMS_KAFKA_MD DBMS_SYS_ERROR USER_KAFKA_OPS_RESULTS
DBMS_KAFKA_UTL DUAL USER_KAFKA_PARTITIONS
DBMS_LOB JSON_OBJECT_T USER_VIEWS
DBMS_OUTPUT USER_KAFKA_APPLICATIONS  
Documented No
First Available 23.1 Beta
Pragmas PRAGMA SUPPLEMENTAL_LOG_DATA(default, NONE);
Security Model Owned by SYS with EXECUTE granted to PUBLIC
Source {ORACLE_HOME}/rdbms/admin/dbmskafka.sql
{ORACLE_HOME}/rdbms/admin/prvtkafka.plb
Subprograms
 
ADD_PARTITIONS (new 23c)
Adds additional Kafka partitions to an existing set of OSAK views dbms_kafka.add_partitions(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
CONVERT_MS_TO_TIMESTAMP (new 23c)
Convert the input number of milliseconds since the epoch time to a TIMESTAMP WITH TIME ZONE and return this value dbms_kafka.convert_ms_to_timestamp(
milliseconds IN INTEGER,
timezone     IN VARCHAR2 DEFAULT NULL)
RETURN TIMESTAMP WITH TIME ZONE;
PRAGMA SUPPLEMENTAL_LOG_DATA(CONVERT_MS_TO_TIMESTAMP_TZ, READ_ONLY);
TBD
 
CONVERT_MS_TO_TIMESTAMP_TZ (new 23c)
Convert the input number of milliseconds since the epoch time to a TIMESTAMP WITH TIME ZONE and return this value dbms_kafka.convert_ms_to_timestamp(
milliseconds IN INTEGER,
timezone     IN VARCHAR2 DEFAULT NULL)
RETURN TIMESTAMP WITH TIME ZONE;
PRAGMA SUPPLEMENTAL_LOG_DATA(CONVERT_MS_TO_TIMESTAMP_TZ, READ_ONLY);
TBD
 
CONVERT_TIMESTAMP_TO_MS (new 23c)
Calculate and return the number of milliseconds since epoch time from the input TIMESTAMP WITH TIME ZONE

Overload 1
dbms_kafka.timestamp_to_ms(datetime_tz IN TIMESTAMP WITH TIME ZONE) RETURN INTEGER;
PRAGMA SUPPLEMENTAL_LOG_DATA(CONVERT_TIMESTAMP_TO_MS, READ_ONLY);
TBD
Overload 2 dbms_kafka.timestamp_to_ms(
datetime IN TIMESTAMP,
timezone IN VARCHAR2 DEFAULT NULL)
RETURN INTEGER;
PRAGMA SUPPLEMENTAL_LOG_DATA(CONVERT_TIMESTAMP_TO_MS, READ_ONLY);
TBD
 
CREATE_LOAD_APP (new 23c)
Creates an Oracle SQL Access to Kafka LOAD application that will retrieve data from all partitions in a Kafka topic for the purpose of loading Kafka data into an Oracle Database table dbms_kafka.create_load_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB);
TBD
 
CREATE_SEEKABLE_APP (new 23c)
Creates one OSAK view and an associated global temporary table to seek and load Kafka records between a user specified window of time dbms_kafka.create_seekable_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB);
TBD
 
CREATE_STREAMING_APP (new 23c)
Creates an OSAK streaming application which includes a set of dedicated OSAK global temporary tables and OSAK views used for retrieving new, unread records from partitions in a Kafka topic. dbms_kafka.create_streaming_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
topic_name       IN VARCHAR2,
options          IN CLOB,
view_count       IN INTEGER DEFAULT 1);
TBD
 
DROP_ALL_APPS (new 23c)
Drop all applications for the Kafka cluster dbms_kafka.drop_all_apps(cluster_name IN VARCHAR2);
TBD
 
DROP_LOAD_APP (new 23c)
Drop LOAD application and remove the related metadata dbms_kafka.drop_load_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
DROP_SEEKABLE_APP (new 23c)
Dropa seekable application and remove the related metadata dbms_kafka.drop_seekable_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
DROP_STREAMING_APP (new 23c)
Dropa streaming application and remove the related metadata dbms_kafka.drop_streaming_app(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2);
TBD
 
ENABLE_VIEW_QUERY (new 23c)
Sets session context to allow an OSAK application to query an OSAK view directly dbms_kafka.enableview_query(view_name IN VARCHAR2);
TBD
 
EXECUTE_LOAD_APP (new 23c)
Loads a user defined target table with content from a Kafka topic.

For subsequent calls, only new unread Kafka records are inserted into the target table.
dbms_kafka.execute_load_app(
cluster_name     IN  VARCHAR2,
application_name IN  VARCHAR2,
target_table     IN  VARCHAR2,
records_loaded   OUT INTEGER,
parallel_hint    IN  INTEGER DEFAULT 0);
TBD
 
GET_GRANULE_COUNT (new 23c)
Returnss the number of granules associated with an OSAK view dbms_kafka.get_granule_count(view_name IN VARCHAR2) RETURN INTEGER;
PRAGMA SUPPLEMENTAL_LOG_DATA(GET_GRANULE_COUNT, READ_ONLY);
TBD
 
INIT_OFFSET (new 23c)
Specifies a starting offset by specifying a delta number of records from either the high or low watermark of every partition dbms_kafka.init_offset(
view_name    IN VARCHAR2,
record_count IN INTEGER,
water_mark   IN VARCHAR2 DEFAULT WATER_MARK_HIGH);
TBD
 
INIT_OFFSET_TS (new 23c)
Positions the processing of Kafka records to a point that is relatively current, potentially skipping unprocessed older records in the Kafka partitions

Overload 1
dbms_kafka.init_offset_ts(
view_name          IN VARCHAR2,
start_timestamp_ms IN INTEGER);
TBD
Overload 2 dbms_kafka.init_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP WITH TIME ZONE);
TBD
Overload 3 dbms_kafka.init_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP,
timezone        IN VARCHAR2 DEFAULT NULL);
TBD
 
LOAD_TEMP_TABLE (new 23c)
Loads a global temporary table from a SELECT * FROM <view_name>, where view name is an OSAK view dbms_kafka.load_temp_table(
temp_table_name IN VARCHAR2,
parallel_hint   IN INTEGER DEFAULT 0);
PRAGMA SUPPLEMENTAL_LOG_DATA(LOAD_TEMP_TABLE, READ_ONLY);
TBD
 
SEEK_OFFSET_TS (new 23c)
Positions an OSAK view to start reading Kafka records between two user specified epoch timestamps

Overload 1
dbms_kafka.seek_offset_ts(
view_name          IN VARCHAR2,
start_timestamp_ms IN INTEGER,
end_timestamp_ms   IN INTEGER);
TBD
Overload 2 dbms_kafka.seek_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP WITH TIME ZONE,
end_timestamp   IN TIMESTAMP WITH TIME ZONE);
TBD
Overload 3 dbms_kafka.seek_offset_ts(
view_name       IN VARCHAR2,
start_timestamp IN TIMESTAMP,
end_timestamp   IN TIMESTAMP,
timezone        IN VARCHAR2 DEFAULT NULL);
TBD
 
SET_TRACING (new 23c)
Enables or disables debug level tracing for the external table driver code dbms_kafka.set_tracing(
cluster_name     IN VARCHAR2,
application_name IN VARCHAR2,
enable           IN BOOLEAN);
TBD
 
UPDATE_OFFSET (new 23c)
Updates the last Kafka partition offsets read after a STREAMING application instance has has finished querying a set of Kafka records captured in an OSAK temporary table dbms_kafka.update_offset(view_name IN VARCHAR2);
TBD

Related Topics
Built-in Functions
Built-in Packages
Database Security
DBMS_KAFKA_ADM
DBMS_KAFKA_INT
DBMS_KAFKA_MD
DBMS_KAFKA_UTL
What's New In 21c
What's New In 23c

Morgan's Library Page Footer
This site is maintained by Dan Morgan. Last Updated: This site is protected by copyright and trademark laws under U.S. and International law. © 1998-2023 Daniel A. Morgan All Rights Reserved
  DBSecWorx