RejectedSoftware Forums

Sign up

Pages: 1 2

Can't flush a socket

Hi,

I want to have a server and a client that do not close connection when doing request/response rounds. However, I can't convince my client to flush the output and send data without closing a connection. This is the code:

client.d

auto conn = connectTcp("127.0.0.1", 1234);
string text;
while(1)
{
  text = stdin.readln();
  if (text == "exit") break;
 
  conn.write(text, true);
}

I have tried the write with and without the flush parameter. Also setting conn.tcpNoDelay = true. None of this works. conn must be closed for the data to be sent. Confirmed with wireshark. What to do?

Also, how should a propper server that listens for such persistent connections look like? So far I have this, but it does not seem right. Especially the part with waitForData...

server.d

listenTcp(1234, (conn)
{
  ubyte[] data;
  auto part = appender!(ubyte[])();

  while (conn.connected)
  {
    if (!conn.waitForData(dur!"msecs"(250))) continue;

    while (!conn.empty)
    {
      size_t chunk = cast(size_t)min(conn.leastSize, BUFFER_SIZE);
      conn.read(_buffer[0..chunk]);
      part.put(_buffer[0..chunk]);
    }

    data ~= part.data;

    ...
    // Fiddle with data
    ...
  }
});

Thanks,
Martin

Re: Can't flush a socket

I think the server loop does not do what is indended (although this doesn't explain the Wireshark results).

server.d

   (...)
   if (!conn.waitForData(dur!"msecs"(250))) continue;

   while (!conn.empty)
   {
     size_t chunk = cast(size_t)min(conn.leastSize, BUFFER_SIZE);
     conn.read(_buffer[0..chunk]);
     part.put(_buffer[0..chunk]);
   }
   (...)
});

In that code, conn.empty will block until data becomes available and in consequence will keep the loop running until all data is read/the connection is closed. It has to do so, because is is supposed to provide a logical end-of-stream marker. If you want to read packet-by-packet, you can use the non-blocking conn.dataAvailableForRead property instead:

while(conn.connected)
{
	if (!conn.waitForData(dur!"msecs"(250))) continue;
	while (conn.dataAvailableForRead)
	{
		size_t chunk = cast(size_t)min(conn.leastSize, BUFFER_SIZE);
		conn.read(_buffer[0..chunk]);
		part.put(_buffer[0..chunk]);
	}
	// ...
}

If the timeout is not explicitly needed, then this should be enough:

while (true)
{
	size_t chunk = cast(size_t)min(conn.leastSize, BUFFER_SIZE);
	if (chunk == 0) break; // leastSize == 0 <=> empty <=> !connected
	conn.read(_buffer[0..chunk]);
	// ...
}

leastSize will block as empty would do until data comes available, or until the connection is closed, in which case 0 is returned. I'll add some proper descriptions with the exact behavior of these properties to the documentation, as this is definitely not clear in the current state.

Re: Can't flush a socket

Hi,

thanks, the altered version of a server is indeed working as it should be. This was tested with netcat.

However the problem with client is still there. I have tried to dig inside with the debugger and both these functions are called:

bufferevent_write(m_event, cast(char*)bytes.ptr, bytes.length)
bufferevent_flush(m_event, EV_WRITE, bufferevent_flush_mode.BEV_NORMAL)

It would almost seem that the libevent is the one responsible for not sending the data. I have even tried to write some data into connection in an endless loop and nothing got through. Any suggestions on what to do next?

Thanks,
Martin

Re: Can't flush a socket

Am 25.11.2012 11:56, schrieb Martin Drasar:

Hi,

thanks, the altered version of a server is indeed working as it should be. This was tested with netcat.
However the problem with client is still there. I have tried to dig inside with the debugger and
both these functions are called:

bufferevent_write(m_event, cast(char*)bytes.ptr, bytes.length)
bufferevent_flush(m_event, EV_WRITE, bufferevent_flush_mode.BEV_NORMAL)

It would almost seem that the libevent is the one responsible for not sending the data. I have even
tried to write some data into connection in an endless loop and nothing got through. Any suggestions
on what to do next?

Thanks,
Martin

I'll try to reproduce this tomorrow. Sounds like something strange must be going on, since the HTTP
client and the database clients are working fine, even with persistent connections.

Sönke

Re: Can't flush a socket

On Sun, 25 Nov 2012 16:36:57 +0100, Sönke Ludwig wrote:

I'll try to reproduce this tomorrow. Sounds like something strange must be going on, since the HTTP
client and the database clients are working fine, even with persistent connections.

Thanks a lot.

Just for the record I am including my system specs:

Linux uriel 2.6.32-5-amd64 #1 SMP Thu Mar 22 17:26:33 UTC 2012 x86_64 GNU/Linux
OS Debian squeeze 6.0.5

Package: vibe
Version: 0.7.9-0
Architecture: all
Maintainer: Jordi Sayol <g.sayol@yahoo.es>
Installed-Size: 2365
Depends: dmd (>= 2.060), libevent-dev (>= 2), libssl-dev, pkg-config
Package: libevent-dev
Priority: optional
Section: libdevel
Installed-Size: 524
Maintainer: Anibal Monsalve Salazar <anibal@debian.org>
Architecture: amd64
Source: libevent
Version: 1.4.13-stable-1
Depends: libevent-1.4-2 (= 1.4.13-stable-1), libevent-core-1.4-2 (= 1.4.13-stable-1), libevent-extra-1.4-2 (= 1.4.13-stable-1)
Package: libevent-dev
Source: libevent
Version: 2.0.19-stable-3
Installed-Size: 1403
Maintainer: Anibal Monsalve Salazar <anibal@debian.org>
Architecture: amd64
Depends: libevent-2.0-5 (= 2.0.19-stable-3), libevent-core-2.0-5 (= 2.0.19-stable-3), libevent-extra-2.0-5 (= 2.0.19-stable-3), libevent-pthreads-2.0-5 (= 2.0.19-stable-3), libevent-openssl-2.0-5 (= 2.0.19-stable-3)
$ dpkg -l | grep libevent
ii  libev4                                                      1:4.11-1                     high-performance event loop library modelled after libevent
ii  libevent-1.4-2                                              1.4.13-stable-1              An asynchronous event notification library
ii  libevent-2.0-5:amd64                                        2.0.19-stable-3              Asynchronous event notification library
ii  libevent-core-2.0-5:amd64                                   2.0.19-stable-3              Asynchronous event notification library (core)
ii  libevent-dev                                                2.0.19-stable-3              Asynchronous event notification library (development files)
ii  libevent-extra-2.0-5:amd64                                  2.0.19-stable-3              Asynchronous event notification library (extra)
ii  libevent-openssl-2.0-5:amd64                                2.0.19-stable-3              Asynchronous event notification library (openssl)
ii  libevent-pthreads-2.0-5:amd64                               2.0.19-stable-3              Asynchronous event notification library (pthreads)

It may have something to do with the fact that I have both versions of libevent installed. But that is just a guess...

Martin

Re: Can't flush a socket

One more thing to add - responses from server (i.e. data from netcat were already written to and read from the stream) with flush set to true go through immediately.
So the problem is only with client initiating the request/response round.

Re: Can't flush a socket

On Sun, 25 Nov 2012 16:24:33 GMT, Martin Drasar wrote:

One more thing to add - responses from server (i.e. data from netcat were already written to and read from the stream) with flush set to true go through immediately.
So the problem is only with client initiating the request/response round.

I'm not sure what libevent exactly does internally, but the cause turns out to be the blocking stdin.readln() call. The following version of the client correctly sends the two strings one by one. However, replacing the sleep() with foreach(i; 0 .. 1_000_000_000){} will case both strings to be sent at once after a delay. Looks like an asynchronous way to access stdin needs to be added to vibe.

import vibe.vibe;

void main()
{
	auto conn = connectTcp("127.0.0.1", 1234);
	string[] lines = ["hello", "world"];
	while(lines.length > 0){
		conn.write(lines[0], true);
		lines = lines[1 .. $];
		sleep(dur!"seconds"(5));
	}
	conn.close();
}

If you need to read from stdin, you could do something like this for now and read in a separate thread:

import vibe.vibe;
import core.sync.mutex;
import core.thread;
import std.stdio;

__gshared core.sync.mutex.Mutex linesMutex;
__gshared string[] lines;

void stdinReader()
{
	while(true){
		auto str = stdin.readln();
		synchronized(linesMutex) lines ~= str;
	}
}

void main()
{
	linesMutex = new core.sync.mutex.Mutex;

	(new Thread(&stdinReader)).start();

	auto conn = connectTcp("127.0.0.1", 1234);
	while(true){
		string line;
		while(true){
			synchronized(linesMutex){
				if( lines.length ){
					line = lines[0];
					lines = lines[1 .. $];
					break;
				}
			}
			sleep(dur!"msecs"(10));
		}

		conn.write(line);
	}
}

Not pretty because of the spinning wait, but should work.

Re: Can't flush a socket

On Mon, 26 Nov 2012 19:54:47 GMT, Sönke Ludwig wrote:

I'm not sure what libevent exactly does internally, but the cause turns out to be the blocking stdin.readln() call. The following version of the client correctly sends the two strings one by one. However, replacing the sleep() with foreach(i; 0 .. 1_000_000_000){} will case both strings to be sent at once after a delay. Looks like an asynchronous way to access stdin needs to be added to vibe.

...

If you need to read from stdin, you could do something like this for now and read in a separate thread:

...

Not pretty because of the spinning wait, but should work.

It seems that the problem is not in reading the stdin per se... Look at the following code:

import vibe.vibe;
import std.stdio;
import std.concurrency;

void stringReader(Tid parent)
{
  string line;
  while (line != "exit")
  {
    line = stdin.readln();
    parent.send(line);
  }
}

void main()
{
  auto conn = connectTcp("127.0.0.1", 1234);
  auto reader = spawn(&stringReader, thisTid);
  string message;
  while (1)
  {
    message = receiveOnly!string();
    if (message == "exit") break;
    conn.write(message, true);
    //sleep(dur!"secs"(1)); <-- o_O
  }
}

Regardless of readln being in separate thread, this code does not work without the sleep being uncommented.
Probably the `core.sync.Condition.wait() in receiveOnly() that uses pthread_cond_timedwait blocks the sending, while the nanosleep in sleep()` does not.

While reading in a separate thread would be a non-issue for me, this seems like a much bigger problem.

Any ideas?

Thanks,
Martin

Re: Can't flush a socket

On Tue, 27 Nov 2012 10:52:41 GMT, Martin Drasar wrote:

(...)

Regardless of readln being in separate thread, this code does not work without the sleep being uncommented.
Probably the `core.sync.Condition.wait() in receiveOnly() that uses pthread_cond_timedwait blocks the sending, while the nanosleep in sleep()` does not.

While reading in a separate thread would be a non-issue for me, this seems like a much bigger problem.

Ah no no, sleep is actually vibe.core.core.sleep and that will keep the event loop running. But you are right, any blocking operation that prohibits event processing will cause this behavior.

It could be worked around in a limited way (probably by not using libevent but directly the OS facilities), but generally blocking operations will always inhibit I/O, it's a property of the asynchronous model (*). There is vibe.core.core.signal.Signal that can be used to wait for other threads or tasks without blocking the event loop. std.concurrency cannot be used as is, it would have to use the fiber aware Signal and Mutex classes of vibe.d so that it doesn't interfere with it.

(*) assuming that the I/O is running inside a task/fiber. Would that actually be the case for you, or are you never calling runTask or runEventLoop in the client? If that's the case, it is probably possible to add some code for TcpConnection.flush() that waits until the data is actually delivered. But this would fail again as soon as tasks and the event loop are used.

Re: Can't flush a socket

On Tue, 27 Nov 2012 11:54:01 GMT, Sönke Ludwig wrote:

Ah no no, sleep is actually vibe.core.core.sleep and that will keep the event loop running. But you are right, any blocking operation that prohibits event processing will cause this behavior.

Oh, I see. Now it makes sense why the sleep is a special case.

It could be worked around in a limited way (probably by not using libevent but directly the OS facilities), but generally blocking operations will always inhibit I/O, it's a property of the asynchronous model (*). There is vibe.core.core.signal.Signal that can be used to wait for other threads or tasks without blocking the event loop. std.concurrency cannot be used as is, it would have to use the fiber aware Signal and Mutex classes of vibe.d so that it doesn't interfere with it.

(*) assuming that the I/O is running inside a task/fiber. Would that actually be the case for you, or are you never calling runTask or runEventLoop in the client? If that's the case, it is probably possible to add some code for TcpConnection.flush() that waits until the data is actually delivered. But this would fail again as soon as tasks and the event loop are used.

I will have to use the runEventLoop(), a server will push information to a client from time to time.
If you say there are vibe-specific mutex and signal, then I can try to come up with some solution using these two. I will probably don't have a time until tomorrow, but I will try to report my progress for anyone interested. If you have any ideas that might help, feel free to throw them at me ;-)

Thanks,
Martin

Pages: 1 2