State Example Arrow Row
This tutorial is a continuation from the previous state example. This tutorial shows how to use arrow-rows to store more complicated data types not available in primitive key-values. Read more about arrow rows. We will continue from the tutorial from the merge example to have a saved balance.
Prerequisites
This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
In this example, we will first show how to create a state, update the state as data enters from the source, and how to interface with the state. The state and update will be defined in the mergeservice and the interfacing will be defined in the interface service.
 
Mergeservice
1. Define the state
For this state, we will simply only track the balance as a float.
states:
  tracker: 
    type: keyed-state
    properties:
      key: 
        type: string
      value: 
        type: arrow-row
        properties:
          balance:
            type: f32
Here, the key is a string but the value is stored as an arrow-row which can contain multiple properties(acts like columns).
2. Assign key
Like our previous example, we will use a trivial key to store the balance.
partition:
  assign-key:
    run: |
      fn map_cash(order: f32) -> Result<String> {
        Ok("cash".to_string())
      }
  update-state:
    (...)
3. Updating State
To update the state in an arrow-row, we need to update the individual row's columns manual and call an update().
partition:
  assign-key:
    (...)
  update-state:
    run: |
      fn add_count(order: f32) -> Result<()> {
        let mut tracker = tracker();
        tracker.balance += order; 
        tracker.update()?;
        Ok(())
States are terminal so no other action will be run.
Iterface
The second service serves as a way to read from the state.
interface:
  sources:
    - type: topic
      id: command
  states:
    tracker:
      from: mergeservice.tracker
  sinks:
    - type: topic
      id: message
      transforms:
        - operator: map 
          run: |
            fn new_input(_input: String) -> Result<String> {
              let track = tracker();
              let trackrow = track.sql(&format!("select * from `tracker`"))?;
              let rows = trackrow.rows()?;
              if !rows.next() {
                return Ok("empty".to_string())
              }
              let balancecol = trackrow.col("balance")?;
              let balance = rows.f32(&balancecol)?;
              Ok(format!("{:#?}",balance))
            }
The service first has to refer to the state created by the mergeservice. Inside the sink is the transform that will iterface with the state. For simplicity, whatever is sent to the source command will result in the service message outputting how much the balance is. For the transform function:
1. We use a sql statement to read from track, a LazyDf.
let trackrow = track.sql(&format!("select * from `tracker`"))?;
2. Afterwards, we can select the column balance.
let rows = trackrow.rows()?;
if !rows.next() {
  return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml.
# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: arrow-example
  version: 0.1.0
  namespace: examples
config:
  converter: json
types:
  order:
    type: object
    properties:
      name:
        type: string
      amount:
        type: u32
      price:
        type: f32  
topics:
  buy:
    schema:
      value:
        type: order
  sell:
    schema:
      value:
        type: order
  command:
    schema:
      value:
        type: string
        converter: raw
  message:
    schema:
      value:
        type: string
  
services:
  interface:
    sources:
      - type: topic
        id: command
    states:
      tracker:
        from: mergeservice.tracker
    sinks:
      - type: topic
        id: message
        transforms:
          - operator: map 
            run: |
              fn new_input(_input: String) -> Result<String> {
                let track = tracker();
                let trackrow = track.sql(&format!("select * from `tracker`"))?;
                let rows = trackrow.rows()?;
                if !rows.next() {
                  return Ok("empty".to_string())
                }
                let balancecol = trackrow.col("balance")?;
                let balance = rows.f32(&balancecol)?;
                Ok(format!("{:#?}",balance))
              }
  mergeservice:
    sources:
      - type: topic
        id: buy
        transforms:
          - operator: map
            run: |
              fn buy_order(order: Order) -> Result<f32> {
                Ok(order.amount as f32 * order.price * -1.0)
              }
      - type: topic
        id: sell 
        transforms:
          - operator: map
            run: |
              fn sell_order(order: Order) -> Result<f32> {
                Ok(order.amount as f32 * order.price)
              }
    states:
      tracker: 
        type: keyed-state
        properties:
          key: 
            type: string
          value: 
            type: arrow-row
            properties:
              balance:
                type: f32
    partition:
      assign-key:
        run: |
          fn map_cash(order: f32) -> Result<String> {
            Ok("cash".to_string())
          }
      update-state:
        run: |
          fn add_count(order: f32) -> Result<()> {
            let mut tracker = tracker();
            tracker.balance += order; 
            tracker.update()?;
            Ok(())
          }
Running SDF
To run example:
$ sdf run
Produce data
We will produce some data for the first service through the buy and sell topics.
$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell
Make sure the state exists by entering show state in sdf. It should have the following states:
>> show state
 Namespace                                Keys  Type   
 (...)
 mergeservice/tracker/state               1     u32    
 (...)
And when running a show state on the that state
>> show state mergeservice/tracker/state
 Key   Window  balance  
 cash  *       921.6001
Consume data
Then lets send any string to command and consume the output found in message
$ echo 'Do stuff' | fluvio produce command
$ fluvio consume message -Bd
921.6001
Cleanup
Exit sdf terminal and clean-up. The --force flag removes the topics:
$ sdf clean --force
Conclusion
We just implement example using arrow states. The following link contains another example with arrow-states.