streamsx.avro package¶
Avro integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
Provides functions for serialization and deserialization of messages in an Apache Avro format.
- The following transformations are supported:
JSON -> AVRO
AVRO -> JSON
Sample¶
A simple example of a Streams application that serializes and deserializes messages:
from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.avro as avro
topo = Topology()
avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
s = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
# convert json to avro blob
o = s.map(avro.JSONToAvro(avro_schema))
# convert avro blob to json
res = o.map(avro.AvroToJSON(avro_schema))
res.print()
submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo, cfg)
-
class
streamsx.avro.
AvroToJSON
(message_schema)¶ Bases:
streamsx.topology.composite.Map
Converts binary Avro messages to JSON strings.
Supports
CommonSchema.Binary
as input and returns output stream with schemaCommonSchema.Json
.Example mapping stream
b
with binary AVRO messages to JSON strings in streamj
:import streamsx.avro as avro topo = Topology() ... avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}' j = b.map(avro.AvroToJSON(message_schema=avro_schema))
New in version 1.2.
-
message_schema
¶ Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.
- Type
str|file
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed to the Streams operator
- Type
str
-
-
class
streamsx.avro.
JSONToAvro
(message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None)¶ Bases:
streamsx.topology.composite.Map
Converts JSON strings into binary Avro messages with schema
CommonSchema.Binary
Supports
CommonSchema.Json
as input and returns output stream with schemaCommonSchema.Binary
.Example mapping stream
j
with JSON string to streamb
with binary AVRO messages:import streamsx.avro as avro topo = Topology() avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}' j = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json() b = j.map(avro.JSONToAvro(message_schema=avro_schema))
New in version 1.2.
-
message_schema
¶ Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.
- Type
str|file
-
embed_avro_schema
¶ Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.
- Type
bool
-
time_per_file
¶ Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.- Type
int|float|datetime.timedelta
-
tuples_per_file
¶ The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.- Type
int
-
bytes_per_file
¶ The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.- Type
int
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed to the Streams operator
- Type
str
-
-
streamsx.avro.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest Avro toolkit from GitHub.
Example for updating the Avro toolkit for your topology with the latest toolkit from GitHub:
import streamsx.avro as avro # download Avro toolkit from GitHub avro_toolkit_location = avro.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)
Example for updating the topology with a specific version of the Avro toolkit using a URL:
import streamsx.avro as avro url122 = 'https://github.com/IBMStreams/streamsx.avro/releases/download/v1.2.2/streamsx.avro-1.2.2-95c5cd9-20190311-1233.tgz' avro_toolkit_location = avro.download_toolkit(url=url122) streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded Avro toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.1.
-
streamsx.avro.
json_to_avro
(stream, message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None, name=None)¶ Converts JSON strings into binary Avro messages with schema
CommonSchema.Binary
.- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples containing the JSON records. Supports
CommonSchema.Json
as input.message_schema (str|file) – Avro schema to serialize the Avro message from JSON input.
embed_avro_schema (bool) – Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.
time_per_file (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.tuples_per_file (int) – The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.bytes_per_file (int) – The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The
bytes_per_message
,time_per_message
andtuples_per_message
parameters are mutually exclusive.name (str) – Operator name in the Streams context, defaults to a generated name.
- Returns
Output Stream with schema
CommonSchema.Binary
(Avro records in binary format).- Return type
Deprecated since version 1.2.0: Use the
JSONToAvro
.
-
streamsx.avro.
avro_to_json
(stream, message_schema=None, name=None)¶ Converts binary Avro messages to JSON strings.
- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples containing the binary Avro records. Supports
CommonSchema.Binary
as input.message_schema (str|file) – Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.
name (str) – Operator name in the Streams context, defaults to a generated name.
- Returns
Output Stream with schema
CommonSchema.Json
.- Return type
Deprecated since version 1.2.0: Use the
AvroToJSON
.