Stream Expressions

Stream Expressions

Streams are very powerful and concise once the necessary tools (stream builders, combinators, and other utilities) are in place. However, the code behind these tools can be a chore to write, at times resembling state machines more than elegant functional expressions. The Camlp4 preprocessor adds syntax support for stream and stream parser expressions, raising the level of abstraction a bit higher and simplifying many common stream-oriented tasks.

Interactive Use

To enable stream expression support in the toplevel, you can type the following:

# #load "dynlink.cma";;
# #load "camlp4o.cma";;

Or, if you are using findlib:

# #use "topfind";;
# #camlp4o;;

Stream Literals

Stream expressions are enclosed by "[<" and ">]" brackets, and using them is a lot like using lists. The simplest stream possible is the empty stream:

[< >]

Literal values in stream expressions are prefixied by single-quotes:

[< '1; '2; '(1 + 2) >]  (* Equivalent to Stream.of_list [1; 2; 3] *)

This is to distinguish them from streams, which are automatically concatenated:

[< '1; '2; more_numbers; '99 >]

In the above example, the stream will produce the integers 1 and 2, followed by all of the values generated by the more_numbers stream, and once more_numbers as been exhausted, it will produce the integer 99.

Recursive Definition

Streams can be defined with recursive functions, providing a straightforward and familiar mechanism to produce infinite sequences. The following defines an never-ending stream of 1s:

let ones =
  let rec aux () =
    [< '1; aux () >] in
  aux ()

Note the auxiliary function, aux, which is called recursively to form a loop. This is necessarily different from infinite lists, which can be defined with "let rec" without a helper function:

# let rec ones = 1 :: ones;;
val ones : int list =
  [1; 1; 1; 1; 1; ...]

The stream expression syntax is not able to interpret recursive values like this, and attempts to do this will result in a syntax error:

# let rec ones = [< '1; ones >];;
Error: This kind of expression is not allowed
as right-hand side of `let rec'

This is only a minor inconvenience, since most streams will be built from one or more parameters. For example, here is the const_stream from the Streams chapter, redefined using stream expression syntax:

let rec const_stream k = [< 'k; const_stream k >]

Similarly, this simple one-liner is all it takes to produce a counter:

let rec count_stream i = [< 'i; count_stream (i + 1) >]

Below is a slightly more complicated example, using two stream expressions to define a Fibonacci sequence:

# let fib =
    let rec aux a b =
      [< '(a + b); aux b (a + b) >] in
    [< '0; '1; aux 0 1 >];;
  val fib : int Stream.t = <abstr>
# Stream.npeek 10 fib;;
- : int list = [0; 1; 1; 2; 3; 5; 8; 13; 21; 34]

Example: Directory walker

As a more practical example, we can define a recursive directory walker that avoids loading the entire directory tree into memory:

let rec walk dir =
  let items =
    try
      Array.map
        (fun fn ->
           let path = Filename.concat dir fn in
           try
             if Sys.is_directory path
             then `Dir path
             else `File path
           with e -> `Error (path, e))
        (Sys.readdir dir)
    with e -> [| `Error (dir, e) |] in
  Array.fold_right
    (fun item rest ->
       match item with
         | `Dir path -> [< 'item; walk path; rest >]
         | _ -> [< 'item; rest >])
    items
    [< >]

This function works by first assembling an array of paths for the specified base directory. Each path is wrapped in a variant type that keeps track of which path is a file and which is a directory. The array is then folded into a stream, expanding each subdirectory by recursively calling walk again. Errors are included in the variant so that exceptions can be handled or ignored as needed.

The expanding of subdirectories

[< 'item; walk path; rest >]

illustrates a convenient feature of stream expressions: any number of sub-streams can appear in any order, and they will be lazily evaluated as needed. walk path and rest both evaluate to streams that are concatenated with item at the front. The results are flattened into a single stream, just as if we had used something like stream_concat, defined in the Streams chapter.

With little effort, we can now print the names of all the directories and files underneath "/var/log":

let () =
  Stream.iter
    (function
       | `Dir path ->
           Printf.printf "dir: %s%!\n" path
       | `File path ->
           Printf.printf "file: %s%!\n" path
       | `Error (path, e) ->
           Printf.printf "error: %s (%s)%!\n" path
             (Printexc.to_string e))
    (walk "/var/log")