Answering a Library's Call in Your Own Sweet Time


Situation

How many times has this happened to you? A library provides a hook to do something, let’s say, optimize a function’s output. The library wants a function, and if your function has one or more of the following properties:

  • Your function is pure.
  • Your function can compute the needed value within the optimizer’s thread.
  • This output can be computed on demand.

Great! You don’t have a problem. You’re using the library as the author intended. You don’t need what this article is offering.

// Your pure, blessed, little function!
Func<int[], double> f = (int[] genotype) => genotype.Average();
optimizer.SetEvaluator(f);
optimizer.Run(); // Calls `f` deep within its bowels.

Callee Problems

But maybe your function isn’t pure and you can’t stop the world to compute its value. Some times you need to wait for something to happen without halting the main thread. For instance, you might need to let the physics engine run for a few ticks.

  // Your impure, tainted, little non-function!
  f = (int[] genotype) => {
    var robot = MakeRobot(genotype);
    // Wait 60 seconds.
    yield return new WaitForSeconds(60f);
    return robot.position.x;
  }; // Uh oh, this doesn't compile. f can't yield.
  optimizer.SetEvaluator(f);
  optimizer.Run(); // Wants to call `f` but can't.

Sometimes the library calling this function isn’t so deeply embedded that you can’t just break the library’s execution in two: 1) before function evaluation and 2) after function evaluation. It’s not the prettiest solution, and it leaks a lot of internals, but it puts you back in the driver’s seat.

If you’d like to maintain the integrity of the library’s API and work around this limitation, here’s a trick I’ve used a number of times.

Raw Solution

  1. Instantiate a thread-safe queue for input.
  2. Instantiate a thread-safe queue for output.
  3. Run the library in a separate thread.
  4. When the library asks you to compute f, put its arguments into the input queue.
  5. Then f waits, blocking the thread the library’s running in.
  6. In your own sweet time, you determine the output of f and send it to the output queue.
  7. The f function returns the output from the queue, thereby resuming the thread with the library.

I’ve written this in various forms a number of times, but today I finally encapsulated it into its own little class. Cue the QueueProxy.

C# Solution

The purpose of the QueueProxy class is to allow you to stick a proxy in your function’s stead. Run the library in another thread, and its inputs will be available to some worker thread.

// Proxy for a function that accepts a int[] and returns a double.
var proxy = new QueueProxy<int[], double>();
// Run the optimization algorithm in another thread.
Task.Run(() => {
  optimizer.SetEvaluator((int[] genotype) => proxy.EnqueueAndWait(genotype));
  optimizer.Run();
});

Then in some way that’s conducive to not blocking the world, you can do the work you need to do elsewhere unconstrained by the libraries needs. Read the input.

int[] genotype;
if (proxy.input.TryDequeue(out genotype)) {
  // Instantiate thing.
  phenotype = CreatePhenotype(genotype);
}

And write the output when you can.

if (physics.ticks > 100) {
  double result = phenotype.position.x;
  proxy.output.Enqueue(result);
}

Note: This doesn’t totally decouple the serial nature of the optimization algorithm, and it in fact relies on it. The optimization, if single threaded, will block waiting for a result.

Code

This is the code as written when this post was published. Please refer to this gist for the most up to date version.

/* Original code Copyright (c) 2017 Shane Celis[1]
   Licensed under the MIT License[2]

   Original code posted here[3].

   This comment generated by code-cite[4].

   [1]: https://github.com/shanecelis
   [2]: https://opensource.org/licenses/MIT
   [3]: https://gist.github.com/shanecelis/e5d76ead850df257f11a679920a5d851
   [4]: https://github.com/shanecelis/code-cite
*/

using System;
using System.Threading;
using System.Collections.Concurrent;

public class QueueProxy<TInput, TOutput> {
  public ConcurrentQueue<TInput> inputs = new ConcurrentQueue<TInput>();
  public ConcurrentQueue<TOutput> outputs = new ConcurrentQueue<TOutput>();
  // XXX One could also throw exceptions through the proxy.
  // public ConcurrentQueue<Exception> error;

  public TOutput EnqueueAndWait(TInput input) {
    inputs.Enqueue(input);
    TOutput result;
    while (! outputs.TryDequeue(out result))
      Thread.Sleep(100);
    return result;
  }

  public bool TryRespond(Func<TInput, TOutput> f) {
    TInput input;
    if (inputs.TryDequeue(out input)) {
      outputs.Enqueue(f(input));
      return true;
    }
    return false;
  }
}

Posted on November 25, 2017 by Shane Celis, tags:

Don't miss the next article!