RejectedSoftware Forums

Sign up

Communicating with worker tasks

Hi,

is there a way to communicate with a task that is spawned in a working thread with runWorkerTask()?
In general, is there a way to send messages to a task running in a different event loop?
Since runWorkerTask does not return a Task object I cannot use send/receive, and I cannot pass an explicit MessageQueue as an argument to the function since it is not isolated.

I also tried doing something like:

ManualEvent e = createManualEvent();
runWorkerTask(function void(ManualEvent e) {
    scope (exit) e.emit();
    // do something synchronous
}, e);
e.wait();

But compilation fails with the rather unhelpful message.

Running dmd...
FAIL ../../vibe.d/.dub/build/libevent-debug-linux.posix-x86_64-dmd-AB0707232CA963B5DA23C2232BBED51B vibe-d staticLibrary
Error executing command build: DMD compile run failed with exit code -9

and still, it would allow me just to wait for the worker task to finish, not to retrieve data from it.
Am I missing something?
Something like this for instance would be helpful in ThreadedFileStream, not to block the main event loop when doing file I/O.

Re: Communicating with worker tasks

On Tue, 25 Mar 2014 16:42:29 GMT, Luca Niccoli wrote:

Hi,

is there a way to communicate with a task that is spawned in a working thread with runWorkerTask()?
In general, is there a way to send messages to a task running in a different event loop?
Since runWorkerTask does not return a Task object I cannot use send/receive, and I cannot pass an explicit MessageQueue as an argument to the function since it is not isolated.

I also tried doing something like:

ManualEvent e = createManualEvent();
runWorkerTask(function void(ManualEvent e) {
    scope (exit) e.emit();
    // do something synchronous
}, e);
e.wait();

What should work is passing the Task handle like this:

void workerTaskFunc(Task caller)
{
  caller.send(Task.getThis());
}

runWorkerTask(&caller, Task.getThis());
auto task = receiveOnly!Task();

I'm not 100% sure if there isn't currently something that fails here, but in theory message passing between different threads/event loops should work fine. Maybe it makes sense to add a second form of runWorkerTask that actually waits and returns the handle. Currently this isn't done for performance reasons.

But compilation fails with the rather unhelpful message.

Running dmd...
FAIL ../../vibe.d/.dub/build/libevent-debug-linux.posix-x86_64-dmd-AB0707232CA963B5DA23C2232BBED51B vibe-d staticLibrary
Error executing command build: DMD compile run failed with exit code -9

Looks like a DMD bug. Can you file a quick bug report at http://https://d.puremagic.com/issues/enter_bug.cgi? When I get some time, I'd try to make a reduced test case.

and still, it would allow me just to wait for the worker task to finish, not to retrieve data from it.
Am I missing something?
Something like this for instance would be helpful in ThreadedFileStream, not to block the main event loop when doing file I/O.

This is indeed something that is missing and planned since the beginning (hence the "Threaded" in the name). I wasn't sure about the best approach, though. Probably using a separate thread pool with 1-3 threads just for ThreadedFileStream will be the best compromise, I guess.

Re: Communicating with worker tasks

On Tue, 25 Mar 2014 17:48:34 GMT, Sönke Ludwig wrote:

What should work is passing the Task handle like this:

void workerTaskFunc(Task caller)
{
  caller.send(Task.getThis());
}

runWorkerTask(&caller, Task.getThis());
auto task = receiveOnly!Task();

I'm not 100% sure if there isn't currently something that fails here, but in theory message passing between different threads/event loops should work fine. Maybe it makes sense to add a second form of runWorkerTask that actually waits and returns the handle. Currently this isn't done for performance reasons.

It didn't occur to me to check if Task was indeed an isolated type; it seems to be working (for a fairly simple test).
A general function could be maybe written as

Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
	foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
	Task caller = Task.getThis();
	runWorkerTask_unsafe({
		caller.prioritySend(Task.getThis());
		func(args);
	});
	Task callee;
	receive((Task val) { callee = val; });
	return callee;
}

with a symmetric function for method tasks.

Looks like a DMD bug. Can you file a quick bug report at http://https://d.puremagic.com/issues/enter_bug.cgi? When I get some time, I'd try to make a reduced test case.

I'm not sure it is a DMD bug:
the problem seems to be the in the template isWeaklyIsolated, more specifically in the last branch

static if(isAggregateType!(T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(FieldTypeTuple!(T[0]));

ManualEvent falls in this branch, the same does FieldTypeTuple!ManualEvent, like FieldTypeTuple!(FieldTypeTuple!ManualEvent), and so on.

This is indeed something that is missing and planned since the beginning (hence the "Threaded" in the name). I wasn't sure about the best approach, though. Probably using a separate thread pool with 1-3 threads just for ThreadedFileStream will be the best compromise, I guess.

I think a separate thread pool is the right solution as well; as for the size of the pool one could try measuring performance and adjusting it; it could perhaps be made tunable.
A first step could be formalizing the concept of thread pool in a class and make a version of runWorkerTask that takes an explicit thread pool. I hope to be able to play a bit with that in the next days.

Re: Communicating with worker tasks

On Wed, 26 Mar 2014 15:40:57 GMT, Luca Niccoli wrote:

A general function could be maybe written as

Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
	foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
	Task caller = Task.getThis();
	runWorkerTask_unsafe({
		caller.prioritySend(Task.getThis());
		func(args);
	});
	Task callee;
	receive((Task val) { callee = val; });
	return callee;
}

with a symmetric function for method tasks.

I took a look and

void runWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)

doesn't seem to work: if vibe.d is compiled with MultiThreadTest, for instance, compilation fails at libevent2_tcp.d(529), which uses runWorkerTask:

runWorkerTask(&task.execute);

That line seems wrong on its own, though, as I think it is passing a delegate, not a member function and an object.
I tried

class Class
{
	void execute() shared
	{
		writeln("Executed");
	}
}
shared Class instance = new shared Class();
runWorkerTask!(Class.execute)(instance);

but it fails with

../../vibe.d/source/vibe/core/core.d(224): Error: no property 'method' for type 'shared(Class)'
source/app.d(15): Error: template instance vibe.core.core.runWorkerTask!(execute, Class, ) error instantiating

I'm not sure if this is not supposed to work or it's a bug in DMD...

Re: Communicating with worker tasks

On Wed, 26 Mar 2014 15:40:57 GMT, Luca Niccoli wrote:

On Tue, 25 Mar 2014 17:48:34 GMT, Sönke Ludwig wrote:

What should work is passing the Task handle like this:

void workerTaskFunc(Task caller)
{
  caller.send(Task.getThis());
}

runWorkerTask(&caller, Task.getThis());
auto task = receiveOnly!Task();

I'm not 100% sure if there isn't currently something that fails here, but in theory message passing between different threads/event loops should work fine. Maybe it makes sense to add a second form of runWorkerTask that actually waits and returns the handle. Currently this isn't done for performance reasons.

It didn't occur to me to check if Task was indeed an isolated type; it seems to be working (for a fairly simple test).

I'll mention this in the documentation, it's indeed not obvious at all.

A general function could be maybe written as

Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
	foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
	Task caller = Task.getThis();
	runWorkerTask_unsafe({
		caller.prioritySend(Task.getThis());
		func(args);
	});
	Task callee;
	receive((Task val) { callee = val; });
	return callee;
}

with a symmetric function for method tasks.

Yes, this is what I had in mind, too. The only change I'd make is to wrap the Task in a private struct type, so that it can't interfere with user messages.

Looks like a DMD bug. Can you file a quick bug report at http://https://d.puremagic.com/issues/enter_bug.cgi? When I get some time, I'd try to make a reduced test case.

I'm not sure it is a DMD bug:
the problem seems to be the in the template isWeaklyIsolated, more specifically in the last branch

static if(isAggregateType!(T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(FieldTypeTuple!(T[0]));

ManualEvent falls in this branch, the same does FieldTypeTuple!ManualEvent, like FieldTypeTuple!(FieldTypeTuple!ManualEvent), and so on.

Hmm, FieldTypeTuple!T should not fall in that branch, but rather go as isWeaklyIsolated!(FieldTypeTuple!(T)[0], FieldTypeTuple!(T)[1], ...). If that's not the case, there is definitely a bug. But what I meant was that the compiler crashes instead of outputting an error message (assuming that there actually is an error).

This is indeed something that is missing and planned since the beginning (hence the "Threaded" in the name). I wasn't sure about the best approach, though. Probably using a separate thread pool with 1-3 threads just for ThreadedFileStream will be the best compromise, I guess.

I think a separate thread pool is the right solution as well; as for the size of the pool one could try measuring performance and adjusting it; it could perhaps be made tunable.
A first step could be formalizing the concept of thread pool in a class and make a version of runWorkerTask that takes an explicit thread pool. I hope to be able to play a bit with that in the next days.

Yeah, some testing would be good. Basically what is needed is as many threads as the number of I/O devices that is accessed in parallel. I wouldn't go for something too fancy here, though, because the plan is eventually to use asynchronous I/O here, too, on all platforms. But keeping the number of threads configurable is trivial, so that will be a good idea in any case.

On Wed, 26 Mar 2014 20:11:52 GMT, Luca Niccoli wrote:

On Wed, 26 Mar 2014 15:40:57 GMT, Luca Niccoli wrote:

A general function could be maybe written as

Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
	foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
	Task caller = Task.getThis();
	runWorkerTask_unsafe({
		caller.prioritySend(Task.getThis());
		func(args);
	});
	Task callee;
	receive((Task val) { callee = val; });
	return callee;
}

with a symmetric function for method tasks.

I took a look and

void runWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)

doesn't seem to work: if vibe.d is compiled with MultiThreadTest, for instance, compilation fails at libevent2_tcp.d(529), which uses runWorkerTask:

runWorkerTask(&task.execute);

That line seems wrong on its own, though, as I think it is passing a delegate, not a member function and an object.

The line is very old and predates the safe runWorkerTask signature. I'll just remove it, since multi-threaded TCP serving is now implemented differently (using TCPListenOptions.distribute/runWorkerTaskDist).

I tried

class Class
{
	void execute() shared
	{
		writeln("Executed");
	}
}
shared Class instance = new shared Class();
runWorkerTask!(Class.execute)(instance);

but it fails with

../../vibe.d/source/vibe/core/core.d(224): Error: no property 'method' for type 'shared(Class)'
source/app.d(15): Error: template instance vibe.core.core.runWorkerTask!(execute, Class, ) error instantiating

I'm not sure if this is not supposed to work or it's a bug in DMD...

That was a bug in the code because I foolishly tested with a class method named method. Fixed by 02b0aa7.

Re: Communicating with worker tasks

On Thu, 27 Mar 2014 14:30:18 GMT, Sönke Ludwig wrote:

Yes, this is what I had in mind, too. The only change I'd make is to wrap the Task in a private struct type, so that it can't interfere with user messages.

I submitted a pull request using Typedef!Task as an opaque type.

Hmm, FieldTypeTuple!T should not fall in that branch, but rather go as isWeaklyIsolated!(FieldTypeTuple!(T)[0], FieldTypeTuple!(T)[1], ...). If that's not the case, there is definitely a bug. But what I meant was that the compiler crashes instead of outputting an error message (assuming that there actually is an error).

ManualEvent is an interface, not a struct, class or union, so according to the documentation FieldTypeTuple!ManualEvent returns a tuple with a single element, ManualEvent. I think the infinite recursion is legitimate.
I submitted a pull request for isWeaklyIsolated and a bug report to dmd ( https://d.puremagic.com/issues/show_bug.cgi?id=12487 ) for the template infinite recursion.

Yeah, some testing would be good. Basically what is needed is as many threads as the number of I/O devices that is accessed in parallel. I wouldn't go for something too fancy here, though, because the plan is eventually to use asynchronous I/O here, too, on all platforms. But keeping the number of threads configurable is trivial, so that will be a good idea in any case.

Unfortunately asynchronous disk I/O seems to be in a very sorry state on many platforms (see for example http://blog.libtorrent.org/2012/10/asynchronous-disk-io/ ), so threaded I/O could remain a very useful fallback.

Re: Communicating with worker tasks

On Fri, 28 Mar 2014 14:29:55 GMT, Luca Niccoli wrote:

On Thu, 27 Mar 2014 14:30:18 GMT, Sönke Ludwig wrote:

Yes, this is what I had in mind, too. The only change I'd make is to wrap the Task in a private struct type, so that it can't interfere with user messages.

I submitted a pull request using Typedef!Task as an opaque type.

Hmm, FieldTypeTuple!T should not fall in that branch, but rather go as isWeaklyIsolated!(FieldTypeTuple!(T)[0], FieldTypeTuple!(T)[1], ...). If that's not the case, there is definitely a bug. But what I meant was that the compiler crashes instead of outputting an error message (assuming that there actually is an error).

ManualEvent is an interface, not a struct, class or union, so according to the documentation FieldTypeTuple!ManualEvent returns a tuple with a single element, ManualEvent. I think the infinite recursion is legitimate.
I submitted a pull request for isWeaklyIsolated and a bug report to dmd ( https://d.puremagic.com/issues/show_bug.cgi?id=12487 ) for the template infinite recursion.

Yeah, some testing would be good. Basically what is needed is as many threads as the number of I/O devices that is accessed in parallel. I wouldn't go for something too fancy here, though, because the plan is eventually to use asynchronous I/O here, too, on all platforms. But keeping the number of threads configurable is trivial, so that will be a good idea in any case.

Unfortunately asynchronous disk I/O seems to be in a very sorry state on many platforms (see for example http://blog.libtorrent.org/2012/10/asynchronous-disk-io/ ), so threaded I/O could remain a very useful fallback.

Thanks a lot for preparing the pull requests and I agree with regards to the threaded disk I/O. That blog post surely looks discouraging regarding Linux AIO. Fortunately, at least using overlapped I/O on Windows works and is already implemented.