Flow To Go

Theprevious article peeked into Flow-Based Programming (FBP), a paradigm that puts the flow of the data above the code that makes the data flow. An FBP application can be described as a network of loosely coupled processing nodes, only connected through data pipelines. The article’s code made use of a quite sophisticated FBP library that made the magic of a convenient syntax happen through reflection (hidden within the library, but still).

The article triggered a couple of comments on Reddit
that were suggesting pure Go approaches, without using third-party libraries.

This made me curious.

How well would the code from the previous article translate into “just stdlib” code?

I gave it a try, and here is the result.

Constructing an FBP net in pure stdlib Go

The approach is as simple as it can get:

Data flow:Every node reads from one or more input channels, and writes to one or more output channels.

Network construction:Weaving the net happens in main()
through creating channels and connecting them to the input and output ports of the processing nodes.

Starting the net:The net starts by calling a Process()
method on each node and feeding data into the network’s input channel.

Stopping the net:The net stops when the net’s input channel is closed. Then every node whose input channels get closed closes his output channels and shuts down, and this way the shutdown propagates through the network until the last node (the “sink” node with no output channel) stops.

Changes to the code

The code below is based on a 1:1 copy of the code from the previous article. Then the following changes were applied.

Input channels

The goflow
framework takes care of each node’s input channels, and the nodes need special “ On...()
” functions that received a single channel item at a time.

I changed the nodes to have their own input channels, and I replaced the On...()
methods with Process()
methods that take no arguments and start a goroutine to read from the input channel(s) and write to the output channel(s). This is substantially more code compared to the On...()
methods that mostly were one-liners; however, in real life where each node would contain much more code, the overhead for input and output handling would be negligible.

No more fan-in

The original code used one channel between the two counter nodes and the printer node. Go channels trivially support a fan-in scenario with multiple writers and one reader on the same channel.

I had to change this so that the printer node now has two input channels, and the two counter nodes do not share the same output channel anymore but send their results into separate channels.

Why? The reason is the network’s shutdown mechanism. As described above, each node shuts down when its input channels are closed. Piece of cake, you might think, but things get difficult when a channel has multiple writers, as in the counter/printer part of our network.

As you know, closing a channel closes it completely, and other writers panic when trying to write to this channel. (Personally, I would prefer a fan-in semantic where all writers except the last one would only close their own end of the channel they share rather than the whole channel at once, but this is not how channels work in Go.)

So we need to split every multi-writer channel into separate channels. Then we can write a merge
function that merges all the channels into one, and also takes care of closing the output channels when the last of the input channels closes.

Or, rather than writing one, we can take a ready-made merge()
function from the Go blog
With some very minor changes, the merge
function is now a method of the printer
node. Problem solved!

Signaling shutdown completion to the outside

Without the goflow
framework, we also need to add a mechanism to tell the outside that the network has shut down. This is the duty of the final node in the network. Similar to how goflow
does it, our printer
node closes a channel of empty structs when concluding work.

An empty, unbuffered channel blocks its readers. When it is closed, however, it starts delivering the channels zero value. Any read operation on this channel then unblocks, and this is how we can make main()
wait for the network to shut down.

(Side note: This behavior may seem counterintuitive and difficult to deal with, but remember that the “comma, ok” idiom for the receive operator
tells you if the channel has been closed.)

The code

Ok, enough talking about the whats and whys, now let’s dive into the code!

稿源:Applied Go (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 综合编程 » Flow To Go

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录