RejectedSoftware Forums

Sign up

How to receive 4096+ bytes request without length?

Hi,

I write msgpack-rpc-d which depends on vibe.d.
One user reports client is blocked when RPC server returns 4096+ response.

OK: client sends 4096+ request to server.

https://github.com/msgpack-rpc/msgpack-rpc-d/blob/27294674771d16bb2302d3c9fd0e0241db8050a0/src/msgpackrpc/transport/tcp.d#L62

In this point, this leastSize returns entire request body size. So server can process the request correctly.

NG: server sends 4096+ response to client.

https://github.com/msgpack-rpc/msgpack-rpc-d/blob/27294674771d16bb2302d3c9fd0e0241db8050a0/src/msgpackrpc/transport/tcp.d#L129

On the other hand, this leastSize returns 4096 if response body size is 4096+.
I changed the code to like below but leastSize is blocked when there is no remaining data.

    ubyte[] result;
    do {
        auto size = input.leastSize;
        if (size == 0) {
            proccessRequest(result);
            break;
        }
        ubyte[] data = new ubyte[](size);
        input.read(data);
        result ~= data;
    } while (_connection.connected);

I tried "dataAvailableForRead" method but this method always returns false.
msgpack-rpc is a binary based protocol and doesn't have a request size header.
Establishing new connection in each request can avoid this problem but it is high cost.

How to handle above case in vibe.d?

I tested this code with dmd 2.066.0, vibe.d 0.7.21-rc.3, libevent 2.0.0+2.0.16.

Thanks!

Re: How to receive 4096+ bytes request without length?

How about changing the loop to while(_connection.dataAvailableForRead) ? I doubt you'd ever block if you used that. Not sure about leastSize maxing out at 4096, that seems very unlikely. How did you measure it?

Re: How to receive 4096+ bytes request without length?

On Sun, 19 Oct 2014 03:04:28 GMT, Etienne Cimon wrote:

How about changing the loop to while(_connection.dataAvailableForRead) ? I doubt you'd ever block if you used that.

Thanks for the help. I tried dataAvailableForRead again but always returns false. It doesn't resolve the problem.

Not sure about leastSize maxing out at 4096, that seems very unlikely. How did you measure it?

I inserted writeln to after leastSize.

    ubyte[] result;
    do {
        writeln("before leastSize");
        auto size = input.leastSize;
        writeln("size: ", size);
        if (size == 0) {
            proccessRequest(result);
            break;
        }
        ubyte[] data = new ubyte[](size);
        input.read(data);
        result ~= data;
    } while (_connection.connected);

My test code:

  • test_server.d
import msgpackrpc;
import std.stdio;
import std.array, std.range, std.conv;

class HelloServer
{
    size_t len;

    this(size_t l)
    {
        len = l;
    }

    string hello(string msg)
    {
        return cast(string)array(repeat('f', len));
    }
}

void main(string[] args)
{ 
    auto len = to!size_t(args[1]);
    auto server = new TCPServer!(HelloServer)(new HelloServer(len));
    server.listen(Endpoint(18800, "127.0.0.1"));
    server.start();
}
  • test_client.d
import msgpackrpc;
import std.stdio;
import std.array, std.range;

void main()
{
    auto client = new TCPClient(Endpoint(18800, "127.0.0.1"));

    // sync request
    writeln(client.call!string("hello", array(repeat('f', 4100))).length);
}

The result is

before leastSize
size: 4096
before leastSize
size: 11
before leastSize
# blocking until server terminated

Re: How to receive 4096+ bytes request without length?

What I'm still wondering is how the client-server protocol works in this
case. Since TCP is a stream based protocol, it's necessary to have some
kind of marker for where a single message ends, either using a message
length field, using an end marker, or using the connection state (close
after each message).

Basically there is no intrinsic way to know where the peer has stopped
writing to the connection and one has to be arbitrarily chosen.
leastSize will always block until either new data has arrived, or
until the connection is closed. dataAvailableForRead may miss data if
it doesn't get written/transferred fast enough. Using waitForData with
a finite timeout is slow and may still miss if it doesn't get written
fast enough.

Re: How to receive 4096+ bytes request without length?

Sorry for late reply.

On Mon, 20 Oct 2014 09:26:48 +0200, Sönke Ludwig wrote:

What I'm still wondering is how the client-server protocol works in this
case. Since TCP is a stream based protocol, it's necessary to have some
kind of marker for where a single message ends, either using a message
length field, using an end marker, or using the connection state (close
after each message).

Almost MessagePack libraries have streaming deserializer which processes separated / multiple MessagePack object. msgpack-d also has this deserializer.

https://github.com/msgpack/msgpack-d/blob/878fcb1852160d1c3d206df933f6becba18aa222/src/msgpack.d#L3928

We can consume the MessagePack message across TCP segments in one connection using this deserializer without length / marker. Implicit message end is connection close.

Other msgpack-rpc implementation uses same approach.

Basically there is no intrinsic way to know where the peer has stopped
writing to the connection and one has to be arbitrarily chosen.
leastSize will always block until either new data has arrived, or
until the connection is closed. dataAvailableForRead may miss data if
it doesn't get written/transferred fast enough. Using waitForData with
a finite timeout is slow and may still miss if it doesn't get written
fast enough.

So I want leastSize without blocking or returning entire message size, not 4096, method.

Re: How to receive 4096+ bytes request without length?

On Sun, 26 Oct 2014 16:22:19 GMT, Masahiro Nakagawa wrote:

Sorry for late reply.

On Mon, 20 Oct 2014 09:26:48 +0200, Sönke Ludwig wrote:

What I'm still wondering is how the client-server protocol works in this
case. Since TCP is a stream based protocol, it's necessary to have some
kind of marker for where a single message ends, either using a message
length field, using an end marker, or using the connection state (close
after each message).

Almost MessagePack libraries have streaming deserializer which processes separated / multiple MessagePack object. msgpack-d also has this deserializer.

https://github.com/msgpack/msgpack-d/blob/878fcb1852160d1c3d206df933f6becba18aa222/src/msgpack.d#L3928

We can consume the MessagePack message across TCP segments in one connection using this deserializer without length / marker. Implicit message end is connection close.

Other msgpack-rpc implementation uses same approach.

Basically there is no intrinsic way to know where the peer has stopped
writing to the connection and one has to be arbitrarily chosen.
leastSize will always block until either new data has arrived, or
until the connection is closed. dataAvailableForRead may miss data if
it doesn't get written/transferred fast enough. Using waitForData with
a finite timeout is slow and may still miss if it doesn't get written
fast enough.

So I want leastSize without blocking or returning entire message size, not 4096, method.

ok I think I found a fix, see:
https://github.com/msgpack-rpc/msgpack-rpc-d/issues/16#issuecomment-312934308

Re: How to receive 4096+ bytes request without length?

On Tue, 04 Jul 2017 19:57:58 GMT, Timothee Cour wrote:

On Sun, 26 Oct 2014 16:22:19 GMT, Masahiro Nakagawa wrote:

(...)

So I want leastSize without blocking or returning entire message size, not 4096, method.

ok I think I found a fix, see:
https://github.com/msgpack-rpc/msgpack-rpc-d/issues/16#issuecomment-312934308

There is one thing that I don't yet understand about that read loop. Just to make sure that there is no misunderstanding somewhere, input.connected can return a different value from !input.empty. .empty (same as .leastSize == 0) specifically signals that the connection was closed by the remote and that there is no more data in the read buffer. .connected returns the physical connection state as seen from the local peer. As a rule of thumb, .connected should be used to test write-readiness and .empty to test for read-readyness.

In this case, .connected is used together with .leastSize/read, meaning that the loop will possibly exit before all data has been read. But that doesn't fit with the initial issue description that it blocks in leastSize. I think I'll have to see the expected sequence of the data exchange between client and server to make a guess of how this should really look like.

Re: How to receive 4096+ bytes request without length?

On Wed, 05 Jul 2017 17:03:40 GMT, Sönke Ludwig wrote:

On Tue, 04 Jul 2017 19:57:58 GMT, Timothee Cour wrote:

On Sun, 26 Oct 2014 16:22:19 GMT, Masahiro Nakagawa wrote:

(...)

So I want leastSize without blocking or returning entire message size, not 4096, method.

ok I think I found a fix, see:
https://github.com/msgpack-rpc/msgpack-rpc-d/issues/16#issuecomment-312934308

There is one thing that I don't yet understand about that read loop. Just to make sure that there is no misunderstanding somewhere, input.connected can return a different value from !input.empty. .empty (same as .leastSize == 0) specifically signals that the connection was closed by the remote and that there is no more data in the read buffer. .connected returns the physical connection state as seen from the local peer. As a rule of thumb, .connected should be used to test write-readiness and .empty to test for read-readyness.

In this case, .connected is used together with .leastSize/read, meaning that the loop will possibly exit before all data has been read. But that doesn't fit with the initial issue description that it blocks in leastSize. I think I'll have to see the expected sequence of the data exchange between client and server to make a guess of how this should really look like.

But if the meaning of _unpacker.size is as I imagine (couldn't find the definition anywhere), then the loop should probably look like this:

do {
    size_t len = _input.leastSize;
    // NOTE: leastSize == 0 signals that the remote has closed the connection
    enforce(len == 0, "Connection closed before a full message was written.");

    auto data = new ubyte[](len);
    input.read(data);
    proccessRequest(data);
} while (_unpacker.size > 0);