Types
The types define the schema of the object used in the dataflow. Once defined, the types can be used in the state, operators, source and sinks. Using built-in serializer, the dataflow can be serialize and deserialize from/to the topic.
Types are in defined types
section of the dataflow. They can be defined in the package which can be shared across multiple dataflows.
Primitive Types
The primitive types represents basic primitive types. The following are the list of primitive types:
bool
boolean valueu8
,u16
,u32
,u64
unsigned integers of 8, 16, 32, 64 bitsi8
,i16
,i32
,i64
signed integers of 8, 16, 32, 64 bitsf32
,f64
floating point numbers of 32, 64 bits
Primitive types can be alias or used as part of the complex types. Following is type alias for u16
and f64
. So instead of using u16
and f64
, you can use range
and latitude
respectively.
types:
range:
type: u16
latitude:
type: f64
weight:
type: f8
String
String is a sequence of characters. It is defined as string
type.
Object
Object type represents a complex type that has multiple properties. It is defined as object
type. The properties are defined as key-value pairs.
For example, the following is a simple object type representing a person.
types:
person:
type: object
properties:
name:
type: string
weight:
type: u8
The property type can be any primitive type or complex type. So using alias
defined above, you can define person
as follows:
types:
person:
type: object
properties:
name:
type: string
weight:
type: weight
Enum
Can represents different variant of the type. It can represent a simple enum or sum type. To define enum, use enum
followed by oneOf
properties.
For example, the following is a simple enum type representing fruits.
types:
fruit:
type: enum
oneOf:
apple:
type: null
banana:
type: null
grape:
type: null
This can represent enum value such as apple
, banana
, grape
depends on serialization scheme. By default, enum variant doesn't need have value type. Value type is useful if variant has associated value.
For example, the following is a enum type representing vehicle type with associated value.
types:
vehicle:
type: enum
oneOf:
car:
type: car
airplane:
type: airplane
car:
type: object
properties:
model:
type: string
range:
type: u16
airplane:
type: object
properties:
model:
type: string
engines:
type: u8
celing:
type: u32
If this is serialized as JSON, it will look like this:
{
"vehicle": {
"car": {
"model": "tesla",
"range": 300
}
},
"vehicle": {
"airplane": {
"model": "737",
"engines": 2,
"celing": 35000
}
}
}
List
List represents an ordered sequence of items. It is defined as list
type. The item must be same type. For example, the following is a list type representing list of fruits.
types:
fruits:
type: list
items:
type: fruit
If this is serialized as JSON, it will look like this:
{
"fruits": ["apple","banana","grape"]
}
Key-Value
Key-Value type is used by partitioned state. Key-Value
can be defined in the type
section or as part of the state
definition. The following is a key-value type representing a word count in the state.
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: u16
or it can be defined in the type
section as follows:
types:
word-count:
type: keyed-state
properties:
key:
type: string
value:
type: u16
Types in the Operator
Once the types are defined, it can be used in the operator. For example, the following is a map
operator that takes Car
type and return CarLocation
type.
transforms:
- operator: map
run: |
fn get_car_location(car: Car) -> Result<CarLocation> {
Ok(CarLocation {
car: format!("{} {}", car.maker, car.model),
color: car.color,
location: car.location,
})
}
Note that when used in the operator, type name will be translated according to languages convention. For example, in Rust, the type name will be CarLocation
because Rust uses CamelCase. In Python, it will be car_location
because Python uses snake_case.
Types in the Source and Sink for Serialization and Deserialization
Types are used in the source and sink for serialization and deserialization. It is configured globally in the dataflow as default for all topics. It can be also configured per source or sink..
For example, following set JSON as default serialization for all topics.
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End