streamsx.avro package

Avro integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides functions for serialization and deserialization of messages in an Apache Avro format.

The following transformations are supported:
  • JSON -> AVRO
  • AVRO -> JSON

Sample

A simple example of a Streams application that serializes and deserializes messages:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.avro as avro

topo = Topology()

avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
s = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
# convert json to avro blob
o = s.map(avro.JSONToAvro(avro_schema))
# convert avro blob to json
res = o.map(avro.AvroToJSON(avro_schema))
res.print()

submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo, cfg)
streamsx.avro.download_toolkit(url=None, target_dir=None)

Downloads the latest Avro toolkit from GitHub.

Example for updating the Avro toolkit for your topology with the latest toolkit from GitHub:

import streamsx.avro as avro
# download Avro toolkit from GitHub
avro_toolkit_location = avro.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)

Example for updating the topology with a specific version of the Avro toolkit using a URL:

import streamsx.avro as avro
url122 = 'https://github.com/IBMStreams/streamsx.avro/releases/download/v1.2.2/streamsx.avro-1.2.2-95c5cd9-20190311-1233.tgz'
avro_toolkit_location = avro.download_toolkit(url=url122)
streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded Avro toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

New in version 1.1.

streamsx.avro.json_to_avro(stream, message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None, name=None)

Converts JSON strings into binary Avro messages with schema CommonSchema.Binary.

Parameters:
  • stream (Stream) – Stream of tuples containing the JSON records. Supports CommonSchema.Json as input.
  • message_schema (str|file) – Avro schema to serialize the Avro message from JSON input.
  • embed_avro_schema (bool) – Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.
  • time_per_file (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.
  • tuples_per_file (int) – The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.
  • bytes_per_file (int) – The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.
  • name (str) – Operator name in the Streams context, defaults to a generated name.
Returns:

Output Stream with schema CommonSchema.Binary (Avro records in binary format).

Deprecated since version 1.2.0: Use the JSONToAvro.

streamsx.avro.avro_to_json(stream, message_schema=None, name=None)

Converts binary Avro messages to JSON strings.

Parameters:
  • stream (Stream) – Stream of tuples containing the binary Avro records. Supports CommonSchema.Binary as input.
  • message_schema (str|file) – Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.
  • name (str) – Operator name in the Streams context, defaults to a generated name.
Returns:

Output Stream with schema CommonSchema.Json.

Deprecated since version 1.2.0: Use the AvroToJSON.

class streamsx.avro.AvroToJSON(message_schema)

Bases: streamsx.topology.composite.Map

Converts binary Avro messages to JSON strings.

Supports CommonSchema.Binary as input and returns output stream with schema CommonSchema.Json.

Example mapping stream b with binary AVRO messages to JSON strings in stream j:

import streamsx.avro as avro
topo = Topology()
...
avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
j = b.map(avro.AvroToJSON(message_schema=avro_schema))

New in version 1.2.

message_schema

Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.

Type:str|file
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type:str
class streamsx.avro.JSONToAvro(message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None)

Bases: streamsx.topology.composite.Map

Converts JSON strings into binary Avro messages with schema CommonSchema.Binary

Supports CommonSchema.Json as input and returns output stream with schema CommonSchema.Binary.

Example mapping stream j with JSON string to stream b with binary AVRO messages:

import streamsx.avro as avro
topo = Topology()

avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
j = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
b = j.map(avro.JSONToAvro(message_schema=avro_schema))

New in version 1.2.

message_schema

Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.

Type:str|file
embed_avro_schema

Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.

Type:bool
time_per_file

Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type:int|float|datetime.timedelta
tuples_per_file

The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type:int
bytes_per_file

The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type:int
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type:str

Indices and tables