Saturday, September 04, 2010

More Erlang: Building A File Poller

Suppose you want to monitor a directory and receive notification whenever its contents change? This problem crops up from time to time in various contexts; frequently the directory in question serves as a staging area which holds files prior to processing. When a file is added to this staging area the program doing the processing needs to learn about it in some fashion so that it can be added to the work queue. There are a bunch of ways you might go about implementing such a system, but at heart they generally use one of two mechanisms:

  • Asynchronous event notification: The modification of the directory causes an appropriate message to be propagated to interested parties.
  • Synchronous polling: Some process polls the directory at a pre-determined interval and reports any changes to interested parties.

The former method is more efficient but generally requires assistance from the underlying storage system (i.e. inotify). As far as I've been able to determine1 Erlang lacks support for such a mechanism, so we'll have to use the second approach. Because this is Erlang our goal is to take a basic polling loop and package it up into a separate process that can emit events to some parent.

So let's start with a basic polling loop, omitting function arguments for the moment:

loop() ->
    if 
        sumpin_changed() ->
            send_a_message();
        true -> 
            ok
    end;
    loop().

This would be a horrible CPU hog since it polls continuously, so let's improve the loop by having it sleep between iterations:

loop() ->
    timer:sleep(<some number of milliseconds>),
    if 
        sumpin_changed() ->
            send_a_message();
        true -> 
            ok
    end;
    loop().

We'll also want some way to terminate the polling loop cleanly. In Erlang this is typically accomplished by sending the process a message telling it to exit. In order for the process to catch this message we'll have to introduce a "receive" block in the main loop, which leads to a complication. How do we sleep while simultaneously waiting for an exit message? The Erlang gods have foreseen this particular problem and provided us with a variant of the basic receive block which includes a built-in timeout:

loop() ->
    receive
        exit -> ok;
    after 
        <some number of milliseconds> ->
            if 
                sumpin_changed() ->
                    send_a_message();
                true -> 
                    ok
            end,
            loop()
    end.

That's good enough for now; we'll go back and fill in the details later. Let's move on to detecting new files. This function will need to know what directory to monitor and, for added flexibility, we'll provide it with a regex2 telling it what file name pattern to look for:

get_new_files(Directory, Regex) ->
 Files = get_list_of_files(Directory),
 FilteredFiles = filter_list_of_files(Files, Regex),
 detect_changes(Filtered_Files)
end.

So how do we implement this? list_dir, from the file module, will return a list of all the files in a given directory. re:run can be used as the basis of a predicate for lists:filter to extract only those files which match the provided regular expression.

get_new_files(Directory, Regex) ->
    {ok, Files} = list_dir(Directory),
    FilteredFiles = lists:filter(
        fun(X) -> 
            re:run(X,Regex,[{capture,none}]) == match end,
        Files
    ),
    detect_changes(FilteredFiles),
end.

Now let's stop and think about the best way to detect changes. We could keep a list of files that we've seen before, but such a list may grow infinitely long as time progresses. A less resource-intensive method, which should work well in practice, is to compare the file's mtime attribute (obtained via file:read_file_info) to the time the last polling cycle completed.

This means that we now have a piece of state, the last poll time, to keep track of. Questions to answer:

  • How is this state initialized?
  • How is it updated?
  • How is it passed from iteration to iteration?

To initialize the state we'll want to create an init() function which passes an appropriate initial value to loop(). But what value to pass? In Erlang dates and times are usually passed around as {{YYYY,MM,DD},{HH,MM,SS}} tuples, which makes comparisons a little bit of a pain. To simplify this process we'll convert dates/times to Gregorian seconds3, in which case the last poll time should be initialized to 0.

We want to update the state in such a fashion as to minimize the possibility that some file will slip through the cracks. There's no way to completely avoid a race condition between reading the contents of the directory and getting a new poll time since updates to the directory occur outside of the Erlang process. So what we'll want to do instead is calculate the new poll time immediately after the call to list_dir. Putting this all together with state passing leads to the following:

init(PID, Directory, Regex, PollInterval) -> 
 loop(PID, Directory, Regex, PollInterval, 0).

loop(PID, Directory, Regex, PollInterval, LastPollTime) ->
    receive
        exit -> ok
    after 
        PollInterval ->
            {NewFiles, NewPollTime} = 
                get_new_files(Directory, Regex, LastPollTime),
            if 
                length(NewFiles) > 0 ->
                    PID ! { new_files, NewFiles };
                true -> ok
            end,
            loop(PID, Directory, Regex, PollInterval, NewPollTime)
    end.

get_new_files(Directory, Regex, LastPollTime) ->
    {ok, Files} = list_dir(Directory),
 NewPollTime = calendar:datetime_to_gregorian_seconds(
        { date(), time() }
    ),
    FilteredFiles = lists:map(
        fun(X) -> filename:join([Directory,X]) end,
        lists:filter(
            fun(Y) -> 
                re:run(Y,Regex,[{capture,none}]) == match end,
            Files
        )
    ),
    NewFiles = lists:filter (
        fun(Filename) ->
            {ok, FileInfo} = read_file_info(Filename),
            calendar:datetime_to_gregorian_seconds(
                FileInfo#file_info.mtime 
            ) > LastPollTime
        end,
        FilteredFiles
    ),
    { New_Files, NewPollTime }.

There are a few more changes needed to bring the code up to snuff (see the full source for details), but that's the gist of it. Now let's test it our, shall we?

$ erl
Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0]
[hipe] [kernel-poll:false]
Eshell V5.7.5  (abort with ^G)
1> c(file_poller).
{ok,file_poller}
2> PollerPid = file_poller:init(".",".+\.test",5000).
<0.42.0>
3> lists:keysearch(messages, 1, process_info(self())).4
{value,{messages,[]}}
4> os:cmd("touch 1.test").
[]
5> lists:keysearch(messages, 1, process_info(self())).
{value,{messages,[{new_files,["./1.test"]}]}}
6> os:cmd("touch 2.test 3.test").
[]
7> lists:keysearch(messages, 1, process_info(self())).
{value,{messages,[{new_files,["./1.test"]},
                  {new_files,["./3.test","./2.test"]}]}}
8> PollerPid ! exit.
exit
9> os:cmd("touch 4.test").
[]
10> lists:keysearch(messages, 1, process_info(self())).
{value,{messages,[{new_files,["./1.test"]},
                  {new_files,["./3.test","./2.test"]}]}}

I hook up the Erlang shell to a file_poller process and then create some zero-byte files via touch. Following each invocation of touch the file poller process kindly sends me a message letting me know that there are new files. The process terminated once I sent it the atom exit; no message is generated for the creation of 4.test.


1 One of the challenges of working with Erlang is that, while releases come with decent documentation on a module-by-module basis, such documentation requires you to already have a pretty good idea of what you're looking for. It's less helpful if you need to figure out whether some esoteric bit of functionality is available in the standard library. When I want to know how to do something in Java or Ruby I can just go ask Google, but that approach works less well for Erlang because the community of practitioners is much smaller.
2 The filelib module provides the wildcard function which does something akin to this, but is limited to bare-bones, command-line type regexes.
3 Erlang, being Scandinavian, is an egalitarian language designed to run on a wide variety of platforms. Consequently there's no built-in support (again, AFAIK) for Unix timestamps.
4 process_info is another handy function that could be better publicized. I was looking for a way to peek at a processes' messages when I noticed that pman did just that. process_info is what turned up after digging through the pman source.

0 Comments:

Post a Comment

<< Home

Blog Information Profile for gg00