Crate plumbum [−] [src]
Plumbum (latin for lead) is a port of Michael Snoyman's excellent
conduit
library.
It allows for production, transformation, and consumption of streams of data in constant memory. It can be used for processing files, dealing with network interfaces, or parsing structured data in an event-driven manner.
Features
Large and possibly infinite streams can be processed in constant memory.
Chunks of data are dealt with lazily, one piece at a time, instead of needing to read in the entire body at once.
The resulting components are pure computations, and allow us to retain composability while dealing with the imperative world of I/O.
Basics
There are three main concepts:
- A
Source
will produce a stream of data values and send them downstream. - A
Sink
will consume a stream of data values from upstream and produce a return value. - A
Conduit
will consume a stream of values from upstream and produces a new stream to send downstream.
In order to combine these different components, we have connecting and fusing.
The connect
method will combine a Source
and Sink
,
feeding the values produced by the former into the latter, and producing a final result.
Fusion, on the other hand, will take two components and generate a new component.
For example, fusing a Conduit
and Sink
together into a new Sink
,
will consume the same values as the original Conduit
and produce the same result as the original Sink
.
Primitives
There are four core primitives:
consume
takes a single value from upstream, if available.produce
sends a single value downstream.leftover
puts a single value back in the upstream queue, ready to be read by the next call toconsume
.defer
introduces a point of lazyiness, artifically deferring all further actions.
Example
use plumbum::*; fn source<'a>() -> Source<'a, i32> { defer() .and(produce(1)) .and(produce(2)) .and(produce(3)) .and(produce(4)) } fn conduit<'a>() -> Conduit<'a, i32, String> { // Get adjacent pairs from upstream consume().zip(consume()).and_then(|res| { match res { (Some(i1), Some(i2)) => { produce(format!("({},{})", i1, i2)) .and(leftover(i2)) .and(conduit()) }, _ => ().into() } }) } fn sink<'a>() -> Sink<'a, String, String> { consume().and_then(|res| { match res { None => "...".to_string().into(), Some(str) => sink().and_then(move |next| { format!("{}:{}", str, next).into() }) } }) } fn main() { let res = source().fuse(conduit()).connect(sink()); assert_eq!(res, "(1,2):(2,3):(3,4):...") }
Modules
io |
Interfacing with |
Macros
fuse! |
Provides a convient syntax for fusing conduits. |
pipe! |
Provides a convient syntax for conduit operations. |
Structs
Kleisli |
The Kleisli arrow from |
Enums
Chunk | |
ConduitM |
Represents a conduit, i.e. a sequence of await/yield actions. |
Void |
Functions
consume |
Wait for an input chunk from upstream. |
consume_chunk |
Wait for a single input value from upstream. |
defer |
Defers a conduit action. Can be used to introduce artifical laziness. |
leftover |
Provide a single piece of leftover input to be consumed by the next component in the current binding. |
leftover_chunk |
Provide a single piece of leftover input to be consumed by the next component in the current binding. |
produce |
Send a value downstream to the next component to consume. |
produce_chunk |
Send a chunk of values downstream to the next component to consume. |
Type Definitions
Conduit |
Consumes a stream of input values and produces a stream of output values, without producing a final result. |
Sink |
Consumes a stream of input values and produces a final result, without producing any output. |
Source |
Provides a stream of output values, without consuming any input or producing a final result. |