streamsx.avro package

Avro integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides functions for serialization and deserialization of messages in an Apache Avro format.

The following transformations are supported:
  • JSON -> AVRO

  • AVRO -> JSON

Sample

A simple example of a Streams application that serializes and deserializes messages:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.avro as avro

topo = Topology()

avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
s = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
# convert json to avro blob
o = s.map(avro.JSONToAvro(avro_schema))
# convert avro blob to json
res = o.map(avro.AvroToJSON(avro_schema))
res.print()

submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo, cfg)
class streamsx.avro.AvroToJSON(message_schema)

Bases: streamsx.topology.composite.Map

Converts binary Avro messages to JSON strings.

Supports CommonSchema.Binary as input and returns output stream with schema CommonSchema.Json.

Example mapping stream b with binary AVRO messages to JSON strings in stream j:

import streamsx.avro as avro
topo = Topology()
...
avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
j = b.map(avro.AvroToJSON(message_schema=avro_schema))

New in version 1.2.

message_schema

Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.

Type

str|file

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type

str

class streamsx.avro.JSONToAvro(message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None)

Bases: streamsx.topology.composite.Map

Converts JSON strings into binary Avro messages with schema CommonSchema.Binary

Supports CommonSchema.Json as input and returns output stream with schema CommonSchema.Binary.

Example mapping stream j with JSON string to stream b with binary AVRO messages:

import streamsx.avro as avro
topo = Topology()

avro_schema = '{"type" : "record", "name" : "hw_schema", "fields" : [{"name" : "a", "type" : "string"}]}'
j = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
b = j.map(avro.JSONToAvro(message_schema=avro_schema))

New in version 1.2.

message_schema

Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.

Type

str|file

embed_avro_schema

Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.

Type

bool

time_per_file

Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type

int|float|datetime.timedelta

tuples_per_file

The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type

int

bytes_per_file

The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

Type

int

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type

str

streamsx.avro.download_toolkit(url=None, target_dir=None)

Downloads the latest Avro toolkit from GitHub.

Example for updating the Avro toolkit for your topology with the latest toolkit from GitHub:

import streamsx.avro as avro
# download Avro toolkit from GitHub
avro_toolkit_location = avro.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)

Example for updating the topology with a specific version of the Avro toolkit using a URL:

import streamsx.avro as avro
url122 = 'https://github.com/IBMStreams/streamsx.avro/releases/download/v1.2.2/streamsx.avro-1.2.2-95c5cd9-20190311-1233.tgz'
avro_toolkit_location = avro.download_toolkit(url=url122)
streamsx.spl.toolkit.add_toolkit(topology, avro_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Avro toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.1.

streamsx.avro.json_to_avro(stream, message_schema, embed_avro_schema=False, time_per_message=None, tuples_per_message=None, bytes_per_message=None, name=None)

Converts JSON strings into binary Avro messages with schema CommonSchema.Binary.

Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the JSON records. Supports CommonSchema.Json as input.

  • message_schema (str|file) – Avro schema to serialize the Avro message from JSON input.

  • embed_avro_schema (bool) – Embed the schema in the generated Avro message. When generating Avro messages that must be persisted to a file system, the schema is expected to be included in the file. If this parameter is set to true, incoming JSON tuples are batched and a large binary object that contains the Avro schema and 1 or more messages is generated. Also, you must specify one of the parameters (bytes_per_message, tuples_per_message, time_per_message) that controls when Avro message block is submitted to the output port, otherwise it would expect a window punctuation marker. After submitting the Avro message to the output port, a punctuation is generated so that the receiving operator can potentially create a new file.

  • time_per_file (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after before the Avro message block is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

  • tuples_per_file (int) – The minimum number of tuples that the Avro message block should contain before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

  • bytes_per_file (int) – The minimum size in bytes that the Avro message block should be before it is submitted to the output port. Only valid if Avro schema is embedded. The bytes_per_message, time_per_message and tuples_per_message parameters are mutually exclusive.

  • name (str) – Operator name in the Streams context, defaults to a generated name.

Returns

Output Stream with schema CommonSchema.Binary (Avro records in binary format).

Return type

streamsx.topology.topology.Stream

Deprecated since version 1.2.0: Use the JSONToAvro.

streamsx.avro.avro_to_json(stream, message_schema=None, name=None)

Converts binary Avro messages to JSON strings.

Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the binary Avro records. Supports CommonSchema.Binary as input.

  • message_schema (str|file) – Avro schema to deserialize the binary Avro message to JSON. If not specified, it is expected that the schema is embedded in the message.

  • name (str) – Operator name in the Streams context, defaults to a generated name.

Returns

Output Stream with schema CommonSchema.Json.

Return type

streamsx.topology.topology.Stream

Deprecated since version 1.2.0: Use the AvroToJSON.

Indices and tables