RejectedSoftware Forums

Sign up

Pages: 1 2

listenTCP with threads

listenTCP and listenTCP_s take a delegate/function and runs it in its own fiber. Is there any way to pass this fiber to another thread?

What I want to do is divide the connections up between threads in a thread pool, each thread handling only its own fibers and corresponding sockets.

Is there a way to do that in vibe.d? Thanks in advance,

Atila

Re: listenTCP with threads

On Mon, 18 Nov 2013 15:17:07 GMT, Atila Neves wrote:

listenTCP and listenTCP_s take a delegate/function and runs it in its own fiber. Is there any way to pass this fiber to another thread?

What I want to do is divide the connections up between threads in a thread pool, each thread handling only its own fibers and corresponding sockets.

Is there a way to do that in vibe.d? Thanks in advance,

Atila

You can achieve that using TCPListenOptions.distribute:

void handleConnection(TCPConnection conn)
{
	// ...
}

void startup()
{
	enableWorkerThreads();
	listenTCP(1234, &handleConnection, TCPListenOptions.distribute);
}

This will listen in each of vibe.d's worker threads (equal to the number of CPU cores) on the same socket and let the OS decide which thread handles each incoming connection. This has the advantage of not requiring any inter-thread communication (such as sending the connection using message passing).

Re: listenTCP with threads

That crashed my app as soon as the first client connected.

Even if it hadn't, I'd actually want to have explicit message passing between threads. At least, I think I do. This if for a publish/subscribe system and I'm trying to optimise it further. New subscription requests aren't that common, but publishing is, and that's where most of the time is spent. I naively tried looping over the subscriptions using taskPool.parallel to see which ones to publish to and the app never went anywhere because I think all the threads were trying to write to the same socket at once. I'm not sure.

So my new idea is to keep the connections in separate threads, and when one of the connections issues a publish request, I'd send a message to all the worker threads. Then they'd be responsible for searching only their own connections to see which ones to publish to. So I'd need a way to get their Tids to do that.

On Mon, 18 Nov 2013 16:55:07 GMT, Sönke Ludwig wrote:

On Mon, 18 Nov 2013 15:17:07 GMT, Atila Neves wrote:

listenTCP and listenTCP_s take a delegate/function and runs it in its own fiber. Is there any way to pass this fiber to another thread?

What I want to do is divide the connections up between threads in a thread pool, each thread handling only its own fibers and corresponding sockets.

Is there a way to do that in vibe.d? Thanks in advance,

Atila

You can achieve that using TCPListenOptions.distribute:

void handleConnection(TCPConnection conn)
{
	// ...
}

void startup()
{
	enableWorkerThreads();
	listenTCP(1234, &handleConnection, TCPListenOptions.distribute);
}

This will listen in each of vibe.d's worker threads (equal to the number of CPU cores) on the same socket and let the OS decide which thread handles each incoming connection. This has the advantage of not requiring any inter-thread communication (such as sending the connection using message passing).

Re: listenTCP with threads

You should have a Channel setup to which different sockets can
subscribe. Have the channel running in it's own task, and use
Tid.send(message) to send a message to them, then have them write to
their own socket.

On 2013-11-18 17:15:04 +0000, Atila Neves said:

That crashed my app as soon as the first client connected.

Even if it hadn't, I'd actually want to have explicit message passing
between threads. At least, I think I do. This if for a
publish/subscribe system and I'm trying to optimise it further. New
subscription requests aren't that common, but publishing is, and that's
where most of the time is spent. I naively tried looping over the
subscriptions using taskPool.parallel to see which ones to publish to
and the app never went anywhere because I think all the threads were
trying to write to the same socket at once. I'm not sure.

So my new idea is to keep the connections in separate threads, and when
one of the connections issues a publish request, I'd send a message to
all the worker threads. Then they'd be responsible for searching only
their own connections to see which ones to publish to. So I'd need a
way to get their Tids to do that.

On Mon, 18 Nov 2013 16:55:07 GMT, Sönke Ludwig wrote:

On Mon, 18 Nov 2013 15:17:07 GMT, Atila Neves wrote:

listenTCP and listenTCP_s take a delegate/function and runs it in
its own fiber. Is there any way to pass this fiber to another thread?

What I want to do is divide the connections up between threads in a
thread pool, each thread handling only its own fibers and corresponding
sockets.

Is there a way to do that in vibe.d? Thanks in advance,

Atila

You can achieve that using TCPListenOptions.distribute:

void handleConnection(TCPConnection conn)

{

	// ...

}



void startup()

{

	enableWorkerThreads();

	listenTCP(1234, &handleConnection, TCPListenOptions.distribute);

}



This will listen in each of vibe.d's worker threads (equal to the
number of CPU cores) on the same socket and let the OS decide which
thread handles each incoming connection. This has the advantage of not
requiring any inter-thread communication (such as sending the
connection using message passing).

Re: listenTCP with threads

Am 18.11.2013 18:15, schrieb Atila Neves:

That crashed my app as soon as the first client connected.

I'll look into it.

Even if it hadn't, I'd actually want to have explicit message passing between threads. At least, I think I do. This if for a publish/subscribe system and I'm trying to optimise it further. New subscription requests aren't that common, but publishing is, and that's where most of the time is spent. I naively tried looping over the subscriptions using taskPool.parallel to see which ones to publish to and the app never went anywhere because I think all the threads were trying to write to the same socket at once. I'm not sure.

So my new idea is to keep the connections in separate threads, and when one of the connections issues a publish request, I'd send a message to all the worker threads. Then they'd be responsible for searching only their own connections to see which ones to publish to. So I'd need a way to get their Tids to do that.

You can still do that. vibe.core.concurrency uses Task as the Tid
type, Task.getThis() will yield the proper Tid for each connection
handler.

To reduce overhead, you could have a single task running in each thread
that accepts messages from other threads and distributes them to the
connection taks of its own thread. runWorkerTaskDist can be used to
start a task in each worker thread at once.

Re: listenTCP with threads

On Tue, 19 Nov 2013 10:44:00 +0100, Sönke Ludwig wrote:

Am 18.11.2013 18:15, schrieb Atila Neves:

That crashed my app as soon as the first client connected.

I'll look into it.

Thanks!

Even if it hadn't, I'd actually want to have explicit message passing between threads. At least, I think I do. This if for a publish/subscribe system and I'm trying to optimise it further. New subscription requests aren't that common, but publishing is, and that's where most of the time is spent. I naively tried looping over the subscriptions using taskPool.parallel to see which ones to publish to and the app never went anywhere because I think all the threads were trying to write to the same socket at once. I'm not sure.

So my new idea is to keep the connections in separate threads, and when one of the connections issues a publish request, I'd send a message to all the worker threads. Then they'd be responsible for searching only their own connections to see which ones to publish to. So I'd need a way to get their Tids to do that.

You can still do that. vibe.core.concurrency uses Task as the Tid
type, Task.getThis() will yield the proper Tid for each connection
handler.

To reduce overhead, you could have a single task running in each thread
that accepts messages from other threads and distributes them to the
connection taks of its own thread. runWorkerTaskDist can be used to
start a task in each worker thread at once.

Nice. I'll try that as soon as I get distribute connections.

Re: listenTCP with threads

Am 21.11.2013 11:24, schrieb Atila Neves:

On Tue, 19 Nov 2013 10:44:00 +0100, Sönke Ludwig wrote:

Am 18.11.2013 18:15, schrieb Atila Neves:

That crashed my app as soon as the first client connected.

I'll look into it.

Thanks!

Okay, the recent tests with the bench-http-server included with vibe.d
are indicating that everything runs stable. Can you check with the
latest 0.7.18-rc.2 or GIT master?

Re: listenTCP with threads

On Fri, 22 Nov 2013 10:10:30 +0100, Sönke Ludwig wrote:

Am 21.11.2013 11:24, schrieb Atila Neves:

On Tue, 19 Nov 2013 10:44:00 +0100, Sönke Ludwig wrote:

Am 18.11.2013 18:15, schrieb Atila Neves:

That crashed my app as soon as the first client connected.

I'll look into it.

Thanks!

Okay, the recent tests with the bench-http-server included with vibe.d
are indicating that everything runs stable. Can you check with the
latest 0.7.18-rc.2 or GIT master?

I did dub upgrade and recompiled. It no longer crashes, but it exits with error code -11 after accepting the connection.

Re: listenTCP with threads

Am 23.11.2013 17:19, schrieb Atila Neves:

On Fri, 22 Nov 2013 10:10:30 +0100, Sönke Ludwig wrote:

Am 21.11.2013 11:24, schrieb Atila Neves:

On Tue, 19 Nov 2013 10:44:00 +0100, Sönke Ludwig wrote:

Am 18.11.2013 18:15, schrieb Atila Neves:

That crashed my app as soon as the first client connected.

I'll look into it.

Thanks!

Okay, the recent tests with the bench-http-server included with vibe.d
are indicating that everything runs stable. Can you check with the
latest 0.7.18-rc.2 or GIT master?

I did dub upgrade and recompiled. It no longer crashes, but it exits with error code -11 after accepting the connection.

Can you prepare a little (or large) test case, so that I can debug
locally? I think exit code -11 indicates a segmentation fault.

Re: listenTCP with threads

Can you prepare a little (or large) test case, so that I can debug
locally? I think exit code -11 indicates a segmentation fault.

https://github.com/atilaneves/mqtt

Add "enableWorkerThreads()" and TCPListenOptions.distribute to mqttd/app.d, "dub" to run it then "telnet localhost 1883" with any input reproduces the problem. Comment out "enableWorkerThreads" and the app complains about an invalid message instead (since telnet isn't sending anything meaningful) and carries on. Put it back in and the complaint is still there but it exits soon thereafter.

Atila

Pages: 1 2