RejectedSoftware Forums

Sign up

vibe.d as a networking component of an application

Hi,

I am trying to build the networking part of my application around vibe.d, given that the support for concurrency has been added, but I am having some issues. Perhaps some of you can help me with it...

What I want to achieve is to have a single server class that takes care of all the networking needs, i.e.
1) on request, it starts to listen on given addresses and ports
2) on request, it ceases to listen
3) on request, terminates some or all active connections
4) each accepted connection can do the request processing in the context of a fiber or serialize the request and send it to another thread for processing and wait until it is finished

For 1), let's say I have this code:

class Server {
  private void _listeningFn()
  {
    ...                                                                                       
  }
  
  public void run(string address, uint port)
  {
    _listeningThread = runTask(&_listeningFn);
  }
}

Given that runTask takes void delegate() as a parameter, what is the most appropriate way to pass the address and the port, so that the _listeningFn can use it?

  • set it as a Server variable and let the function use it?
  • pass it via send/receive mechanism of Task?
  • use task-local storage?

For 2), if I weren't using vibe.d, I would create connected socket that would be used to interrupt the select call. What should I do here? Create a connection that would trigger processing in _listeningFn and special-case it so that it breaks out of listening loop?

For 3), is there a way to signal to given/all connections that I want them to end, thus breaking them of their reads?

For 4), would this be a proper way to execute the work in some other thread?

{
  ...
  auto workerThread = spawn(&workerFn, getThis);
  workerThread.send(serializedData);
  receive(//wait for workerThread to finish its job)
}

The docs says that Tid is an alias for Task. Can I supply spawned function with Task and be able to receive properly from other thread?

Many thanks for reading this far.

Regards,
Martin

Re: vibe.d as a networking component of an application

On Sat, 28 Sep 2013 18:44:28 GMT, Martin Drasar wrote:

Hi,

I am trying to build the networking part of my application around vibe.d, given that the support for concurrency has been added, but I am having some issues. Perhaps some of you can help me with it...

What I want to achieve is to have a single server class that takes care of all the networking needs, i.e.
1) on request, it starts to listen on given addresses and ports
2) on request, it ceases to listen
3) on request, terminates some or all active connections
4) each accepted connection can do the request processing in the context of a fiber or serialize the request and send it to another thread for processing and wait until it is finished

For 1), let's say I have this code:

class Server {
  private void _listeningFn()
  {
    ...                                                                                       
  }
  
  public void run(string address, uint port)
  {
    _listeningThread = runTask(&_listeningFn);
  }
}

Given that runTask takes void delegate() as a parameter, what is the most appropriate way to pass the address and the port, so that the _listeningFn can use it?

  • set it as a Server variable and let the function use it?
  • pass it via send/receive mechanism of Task?
  • use task-local storage?

In the case of runTask (which will execute in the same thread, but on a different fiber) you can use a delegate/closure:

class Server {
  private void _listeningFn(string address, ushort port)
  {
    ...                                                                                       
  }
  
  public void run(string address, ushort port)
  {
    _listeningThread = runTask({ _listeningFn(address, port); });
  }
}

runWorkerTask on the other hand requires a static function and would take the arguments explicitly, but this is not needed here.

However, listenTCP already listens in the background, so creating a task is actually not necessary in this particular case:

class Server {
  private TCPListener _listeningHandle;
  public void run(string address, ushort port)
  {
    _listeningHandle = listenTCP(address, port);
  }
}

_listeningHandle can later be used to stop listening.

For 2), if I weren't using vibe.d, I would create connected socket that would be used to interrupt the select call. What should I do here? Create a connection that would trigger processing in _listeningFn and special-case it so that it breaks out of listening loop?

For 3), is there a way to signal to given/all connections that I want them to end, thus breaking them of their reads?

I would recommend to use interrupt() on each task. This will cause an InterruptException in the task, which you can use to shut it down. It could look like this:

Task[] s_connectionTasks; // TODO: use something more efficient than an array

void connectionHandler(TCPConnection conn)
{
  s_connectionTasks ~= Task.getThis();
  scope(exit) {
    s_connectionTasks = s_connectionTasks.remove(s_connectionTasks.indexOf(Task.getThis()));
    conn.close();
  }
  conn.readAll(); // or whatever, will throw the InterruptException
}

void closeAll()
{
  // interrupt them all
  foreach (t; s_connectionTasks)
    t.interrupt();

  // wait for them to actually finish, optional
  foreach (t; s_connectionTasks)
    t.join();
}

For 4), would this be a proper way to execute the work in some other thread?

{
  ...
  auto workerThread = spawn(&workerFn, getThis);
  workerThread.send(serializedData);
  receive(//wait for workerThread to finish its job)
}

Sending to threads instead of tasks requires std.concurrency, which doesn't mix well with the event loop based processing. Typically, if it really is a worker kind of function (i.e. CPU heavy), you could use runWorkerTask instead, which also saves the thread startup overhead of spawn (uses one of N pre-started threads, where N is the number of CPU cores):

void workerFn(Task task, string serialized_data)
{
  result = ...;
  task.send(result);
}
void connectionHandler(TCPConnection conn)
{
  ...
  // serializedData must be either shared, immutable or Isolated to
  // be able to savely be passed between threads, just use a cast if
  // needed
  runWorkerTask(&workerFn, getThis, serializedData);
  receive(...); // receive result data or just wait for finish
}

The docs says that Tid is an alias for Task. Can I supply spawned function with Task and be able to receive properly from other thread?

That should work fine AFAIR. If I'm not mistaken, the only thing that doesn't work cross-thread right now is calling yield() on a task in a foreign thread (but there may actually be more after all).

Many thanks for reading this far.

It wasn't that much, after all ;)

Regards,
Sönke

Re: vibe.d as a networking component of an application

On Sun, 29 Sep 2013 06:42:42 GMT, Sönke Ludwig wrote:

...

Thanks a lot Sönke, I have managed to make it working and coexist peacefully with std.concurrency.

Many thanks for reading this far.

It wasn't that much, after all ;)

Yeah... I wrote this before I redacted the text and shrink it to the half by removing unnecessary questions :-)

Martin