Multiple stream functions

源代码: https://github.com/yomorun/yomo/edit/master/example/3-multi-sfn

该示例展示了 YoMo 如何支持多个 stream functions。

代码结构

  • source: 噪声传感器的模拟数据。
  • stream-fn-1: 实时计算噪声分贝值。
  • stream-fn-2: 当实时分贝值超过阈值时,打印出警告信息。
  • stream-fn-3: 计算时间滑动窗口内的平均分贝值。
  • zipper: 协调一个工作流,从 source 接收数据,并将数据发给在三个 stream-fn 进行流计算`。

准备

安装 YoMo CLI

Binary (推荐)

$ curl -fsSL "https://get.yomo.run" | sh

  ==> Resolved version latest to v1.1.1
  ==> Downloading asset for darwin arm64
  ==> Permissions required for installation to /usr/local/bin
Password:
  ==> Installation complete

或者从源代码编译

$ go install github.com/yomorun/cli/yomo@latest

选择 1: 自动运行

task run

$ task run

task: [zipper] yomo serve -c ./zipper/workflow.yaml
task: [sfn-3] go build -o ./bin/sfn-3 ./stream-fn-3/app.go
task: [source-build] go build -o ./bin/source source/main.go
task: [sfn-1] go build -o ./bin/sfn-1 ./stream-fn-1/app.go
task: [sfn-2] go build -o ./bin/sfn-2 ./stream-fn-2/app.go
[zipper] Using config file: ./zipper/workflow.yaml
[zipper] ℹ️   Running YoMo-Zipper...
[zipper] 2022-02-20 17:10:03.764    [yomo:zipper] Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...
[zipper] 2022-02-20 17:10:03.773    [core:server][Service] Listening on: 127.0.0.1:9000, QUIC: [v1 draft-29], AUTH: [None]
task: [sfn-1] ./bin/sfn-1
task: [sfn-2] ./bin/sfn-2
task: [sfn-3] ./bin/sfn-3
[sfn-3] 2022-02-20 17:10:04.121 [core:client] use credential: [None]
[sfn-1] 2022-02-20 17:10:04.122 [core:client] use credential: [None]
[sfn-2] 2022-02-20 17:10:04.122 [core:client] use credential: [None]
[sfn-3] 2022-02-20 17:10:04.124 [core:client] ❤️  [Noise-3]([::]:60308) is connected to YoMo-Zipper localhost:9000
[sfn-2] 2022-02-20 17:10:04.124 [core:client] ❤️  [Noise-2]([::]:59332) is connected to YoMo-Zipper localhost:9000
[sfn-1] 2022-02-20 17:10:04.125 [core:client] ❤️  [Noise-1]([::]:53199) is connected to YoMo-Zipper localhost:9000
[zipper] 2022-02-20 17:10:04.125    [core:server] ❤️  <Stream Function> [::Noise-2](127.0.0.1:59332) is connected!
[zipper] 2022-02-20 17:10:04.125    [core:server] ❤️  <Stream Function> [::Noise-3](127.0.0.1:60308) is connected!
[zipper] 2022-02-20 17:10:04.125    [core:server] ❤️  <Stream Function> [::Noise-1](127.0.0.1:53199) is connected!
task: [source] go build -o ./bin/source source/main.go
task: [source] ./bin/source
[source] 2022-02-20 17:10:05.032    [core:client] use credential: [None]
[source] 2022-02-20 17:10:05.038    [core:client] ❤️  [yomo-source]([::]:64310) is connected to YoMo-Zipper localhost:9000
[source] 2022-02-20 17:10:05.038    ✅ Emit {121.417854 1645348205038 localhost} to YoMo-Zipper
[zipper] 2022-02-20 17:10:05.038    [core:server] ❤️  <Source> [::yomo-source](127.0.0.1:64310) is connected!
[sfn-1] 2022-02-20 17:10:05.039 ✅ [localhost] 1645348205038 > value: 12.141786 ⚡️=1ms
[sfn-2] ✅ receive noise value: 12.141786
[sfn-3] 2022-02-20 17:10:05.040 ✅ [fn3] observe <- 12.141786
[sfn-3] 2022-02-20 17:10:05.125 🧩 average value in last 10000 ms: 12.141786!
[sfn-3] 2022-02-20 17:10:06.125 🧩 average value in last 10000 ms: 12.141786!
[sfn-3] 2022-02-20 17:10:07.126 🧩 average value in last 10000 ms: 12.141786!
[sfn-3] 2022-02-20 17:10:08.127 🧩 average value in last 10000 ms: 12.141786!
[sfn-3] 2022-02-20 17:10:09.127 🧩 average value in last 10000 ms: 12.141786!
[source] 2022-02-20 17:10:10.039    ✅ Emit {87.23943 1645348210039 localhost} to YoMo-Zipper
[sfn-1] 2022-02-20 17:10:10.041 ✅ [localhost] 1645348210039 > value: 8.723944 ⚡️=2ms
[sfn-2] ✅ receive noise value: 8.723944
[sfn-3] 2022-02-20 17:10:10.045 ✅ [fn3] observe <- 8.723944
[sfn-3] 2022-02-20 17:10:10.128 🧩 average value in last 10000 ms: 12.141786!
[sfn-3] 2022-02-20 17:10:11.128 🧩 average value in last 10000 ms: 12.141786!

选择 2: 手动运行

运行 YoMo-Zipper

yomo serve -c ./zipper/workflow.yaml

ℹ️   Found 3 stream functions in YoMo-Zipper config
ℹ️   Stream Function 1: Noise-1
ℹ️   Stream Function 2: Noise-2
ℹ️   Stream Function 3: Noise-3
ℹ️   Running YoMo Zipper...

运行 stream-fn-1

go run ./stream-fn-1/app.go

2021/07/05 19:14:24 [core:client] use credential: [None]
2021/07/05 19:14:24 [core:client] ❤️  [Noise-1]([::]:64869) is connected to YoMo-Zipper localhost:9000

运行 stream-fn-2

go run ./stream-fn-2/app.go

2021/07/05 19:14:24 [core:client] use credential: [None]
2021/07/05 19:14:24 [core:client] ❤️  [Noise-2]([::]:55565) is connected to YoMo-Zipper localhost:9000

运行 stream-fn-3

go run ./stream-fn-3/app.go

2021/07/05 19:14:24 [core:client] use credential: [None]
2021/07/05 19:14:24 [core:client] ❤️  [Noise-3]([::]:50019) is connected to YoMo-Zipper localhost:9000

运行 yomo-source

go run ./source/main.go

2021/07/05 19:15:00 Connecting to YoMo-Zipper localhost:9000 ...
2021/07/05 19:15:00 ✅ Connected to YoMo-Zipper localhost:9000
2021/07/05 19:15:00 ✅ Emit {157.14272 1621491060839 localhost} to YoMo-Zipper
2021/07/05 19:15:00 ✅ Emit {149.61421 1621491060942 localhost} to YoMo-Zipper
2021/07/05 19:15:00 ✅ Emit {187.12460 1621491061043 localhost} to YoMo-Zipper
2021/07/05 19:15:00 ✅ Emit {164.58117 1621491061146 localhost} to YoMo-Zipper

结果

stream-fn-1 窗口将会打印实时分贝值.

[localhost] 1621491060839 > value: 15.714272 ⚡️=1ms
[localhost] 1621491060942 > value: 14.961421 ⚡️=1ms
[localhost] 1621491061043 > value: 18.712460 ⚡️=1ms
[localhost] 1621491061146 > value: 1.071311 ⚡️=1ms
[localhost] 1621491061246 > value: 16.458117 ⚡️=1ms

stream-fn-2 窗口当实时分贝值超过阈值时将会打印警告信息。

receive noise value: 15.714272
receive noise value: 14.961421
receive noise value: 18.712460
❗ value: 18.712460 reaches the threshold 16! 𝚫=2.712460
[localhost] 1621491061146 > value: 1.071311 ⚡️=1ms
[localhost] 1621491061246 > value: 16.458117 ⚡️=1ms
❗ value: 16.458117 reaches the threshold 16! 𝚫=0.458117

stream-fn-3 窗口将计算时间滑动窗口内的平均分贝值。

[StdOut]:  15.714272
[StdOut]:  14.961421
[StdOut]:  18.712460
[StdOut]:  1.071311
[StdOut]:  16.458117
🧩 average value in last 10000 ms: 10.931099!
在 GitHub 上编辑本页面 更新时间: Mon, Aug 1, 2022