RejectedSoftware Forums

Sign up

Fiber-based condition

Hi,

let's say I have two threads. One thread is for vibe processing with fibers and such, the other is an ordinary thread doing its work. I want to send (via std.concurrency.send) message from one fiber in thread 1 to thread 2 and wait with the fiber, until the thread 2 finishes the computation and returns the result.

I know how to do all steps except for effective waiting in fiber in thread 1. I have looked at async and Future, but I am not sure if this is what I need. I would generally want a condition variable that I could notify when thread 1 receives result in its message box and then retrieve it with the awakened fiber.

Is it possible to do it with vibe? Or am I just trying to abuse fibers for thread-like actions?

Thanks,
Drasha

Re: Fiber-based condition

Ok... two things.

1) I suck at searching, because there is a TaskCondition. It seems a bit heavyhanded for me, though.
2) I could probably use yield() as the docs say:

Calling this function in short intervals is recommended if long CPU computations are carried out by a task. It can also be used in conjunction with Signals to implement cross-fiber events with no polling.

Are there any examples for this?

Thanks,
Drasha

Re: Fiber-based condition

You can use vibe.core.concurrency.

Re: Fiber-based condition

On Sat, 10 Jan 2015 19:50:06 GMT, Jack Applegame wrote:

You can use vibe.core.concurrency.

This is definitely the best approach for sending from the worker thread to the vibe.d thread. The other way around will be problematic though, unless you are starting an event loop there and run the workload within a task:

__gshared Task workertask, maintask;
static this()
{
	maintask = runTask({
		auto msg = receiveOnly!int;
		assert(msg == 1);
		send(workertask, 2);
	});

	auto th = new Thread({
		workertask = runTask({
			// send some message
			send(maintask, 1);
			// perform work
			// receive some message
			auto resp = receiveOnly!int();
			assert(resp == 2);
		});
		runEventLoop();
	});
	th.start();
}

runWorkerTaskH might be a vible alternative using the integrated thread pool:

__gshared Task workertask, maintask;
static this()
{
	maintask = runTask({
		auto msg = receiveOnly!int;
		assert(msg == 1);
		send(workertask, 2);
	});

	workertask = runWorkerTaskH({
		// send some message
		send(maintask, 1);
		// perform work
		// receive some message
		auto resp = receiveOnly!int();
		assert(resp == 2);
	});
}

The next version of DMD will include a plug-in API to integrate vibe.d with std.concurrency, so that it can also be used. However, there are still some shortcomings (e.g. only spawn vs. runTask and runWorkerTask for vibe.core.concurrency).

Re: Fiber-based condition

Hi,

thank you both for your responses.

In the system I am building right now, the vibe.d thread serves as a network interface to a system that is doing its own stuff. The thread is created as a std.concurrency thread that calls processEvents() in its main loop to process vibe.d events.

When a request from network comes, it is translated to a command that the system understands and passed via std.concurrency.send to another thread that enqueues it for execution. The execution may take some time, depending on the number of commands already waiting for execution. That is why I need to "pause" the calling fiber, but let the entire thread to continue. Because other requests may be comming from outside and they also can be served faster as different commands take different execution time.

Currently I am doing it like this:

The vibe.d thread:

bool finish = false;
while (true)
{
  processEvents();

  std.concurrency.receiveTimeout
  (
    20.msecs,
    (CommandResponse response)
    {
      // Pass the response to the appropriate interface 
      // emit a ManualEvent there
    },
    (EventFinish e)
    {
      // ...
      finish = true;
    }
  );

  if (finish)
  {
    // ...
    break;
  }
}

The fiber that sends a command then just waits for the ManualEvent, checks if the response was intended for it and eventually processes it. If not, it just waits again.

The problem with this approach is that the event wakes all waiting fibers, whereas I would just like to wake only the correct fiber. I know which one it is, as it is part of the request/response messages. I just don't know how...

Drasha