Commit 5f2c6512 authored by Davis King's avatar Davis King

Merged in Miguel Grinberg's patch to add a non-blocking read() function to the

connection object.

--HG--
extra : convert_revision : svn%3Afdd8eb12-d10e-0410-9acb-85c331704f74/trunk%403060
parent ee9b19f0
// Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net) // Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net), Miguel Grinberg
// License: Boost Software License See LICENSE.txt for the full license. // License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_SOCKETS_KERNEL_1_CPp_ #ifndef DLIB_SOCKETS_KERNEL_1_CPp_
#define DLIB_SOCKETS_KERNEL_1_CPp_ #define DLIB_SOCKETS_KERNEL_1_CPp_
...@@ -325,6 +325,69 @@ namespace dlib ...@@ -325,6 +325,69 @@ namespace dlib
return status; return status;
} }
// ----------------------------------------------------------------------------------------
long connection::
read (
char* buf,
long num,
unsigned long timeout
)
{
if (readable() == false)
return TIMEOUT;
const long max_recv_length = 1024*1024*100;
// Make sure to cap the max value num can take on so that if it is
// really large (it might be big on 64bit platforms) so that the OS
// can't possibly get upset about it being large.
const long length = std::min(max_recv_length, num);
long status = recv(connection_socket,buf,length,0);
if (status == SOCKET_ERROR)
{
// if this error is the result of a shutdown call then return SHUTDOWN
if (sd_called())
return SHUTDOWN;
else
return OTHER_ERROR;
}
else if (status == 0 && sd_called())
{
return SHUTDOWN;
}
return status;
}
// ----------------------------------------------------------------------------------------
bool connection::
readable (
unsigned long timeout
) const
{
fd_set read_set;
// initialize read_set
FD_ZERO(&read_set);
// add the listening socket to read_set
FD_SET(connection_socket, &read_set);
// setup a timeval structure
timeval time_to_wait;
time_to_wait.tv_sec = static_cast<long>(timeout/1000);
time_to_wait.tv_usec = static_cast<long>((timeout%1000)*1000);
// wait on select
int status = select(0,&read_set,0,0,&time_to_wait);
// if select timed out or there was an error
if (status <= 0)
return false;
// data is ready to be read
return true;
}
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
int connection:: int connection::
......
// Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net) // Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net), Miguel Grinberg
// License: Boost Software License See LICENSE.txt for the full license. // License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_SOCKETS_KERNEl_1_ #ifndef DLIB_SOCKETS_KERNEl_1_
#define DLIB_SOCKETS_KERNEl_1_ #define DLIB_SOCKETS_KERNEl_1_
...@@ -111,6 +111,11 @@ namespace dlib ...@@ -111,6 +111,11 @@ namespace dlib
long num long num
); );
long read (
char* buf,
long num,
unsigned long timeout
);
unsigned short get_local_port ( unsigned short get_local_port (
) const { return connection_local_port; } ) const { return connection_local_port; }
...@@ -140,7 +145,19 @@ namespace dlib ...@@ -140,7 +145,19 @@ namespace dlib
) const; ) const;
private: private:
bool readable (
unsigned long timeout
) const;
/*!
requires
- timeout < 2000000
ensures
- returns true if a read call on this connection will not block.
- returns false if a read call on this connection will block or if
there was an error.
!*/
bool sd_called ( bool sd_called (
)const )const
/*! /*!
......
// Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net) // Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net), Miguel Grinberg
// License: Boost Software License See LICENSE.txt for the full license. // License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_SOCKETS_KERNEL_2_CPp_ #ifndef DLIB_SOCKETS_KERNEL_2_CPp_
#define DLIB_SOCKETS_KERNEL_2_CPp_ #define DLIB_SOCKETS_KERNEL_2_CPp_
...@@ -386,6 +386,78 @@ namespace dlib ...@@ -386,6 +386,78 @@ namespace dlib
return status; return status;
} // while (true) } // while (true)
} }
// ----------------------------------------------------------------------------------------
long connection::
read (
char* buf,
long num,
unsigned long timeout
)
{
long status;
const long max_recv_length = 1024*1024*100;
if (readable(timeout) == false)
return TIMEOUT;
// Make sure to cap the max value num can take on so that if it is
// really large (it might be big on 64bit platforms) so that the OS
// can't possibly get upset about it being large.
const long length = std::min(max_recv_length, num);
status = recv(connection_socket,buf,length,0);
if (status == -1)
{
// if recv was interupted then call this a timeout
if (errno == EINTR)
{
return TIMEOUT;
}
else
{
if (sd_called())
return SHUTDOWN;
else
return OTHER_ERROR;
}
}
else if (status == 0 && sd_called())
{
return SHUTDOWN;
}
return status;
}
// ----------------------------------------------------------------------------------------
bool connection::
readable (
unsigned long timeout
) const
{
fd_set read_set;
// initialize read_set
FD_ZERO(&read_set);
// add the listening socket to read_set
FD_SET(connection_socket, &read_set);
// setup a timeval structure
timeval time_to_wait;
time_to_wait.tv_sec = static_cast<long>(timeout/1000);
time_to_wait.tv_usec = static_cast<long>((timeout%1000)*1000);
// wait on select
int status = select(connection_socket+1,&read_set,0,0,&time_to_wait);
// if select timed out or there was an error
if (status <= 0)
return false;
// socket is ready to be read
return true;
}
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
......
// Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net) // Copyright (C) 2003 Davis E. King (davisking@users.sourceforge.net), Miguel Grinberg
// License: Boost Software License See LICENSE.txt for the full license. // License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_SOCKETS_KERNEl_2_ #ifndef DLIB_SOCKETS_KERNEl_2_
#define DLIB_SOCKETS_KERNEl_2_ #define DLIB_SOCKETS_KERNEl_2_
...@@ -133,6 +133,12 @@ namespace dlib ...@@ -133,6 +133,12 @@ namespace dlib
long num long num
); );
long read (
char* buf,
long num,
unsigned long timeout
);
int get_local_port ( int get_local_port (
) const { return connection_local_port; } ) const { return connection_local_port; }
...@@ -184,6 +190,17 @@ namespace dlib ...@@ -184,6 +190,17 @@ namespace dlib
private: private:
bool readable (
unsigned long timeout
) const;
/*!
requires
- timeout < 2000000
ensures
- returns true if a read call on this connection will not block.
- returns false if a read call on this connection will block or if
there was an error.
!*/
bool sd_called ( bool sd_called (
)const )const
......
...@@ -273,6 +273,34 @@ namespace dlib ...@@ -273,6 +273,34 @@ namespace dlib
- returns SHUTDOWN if the connection has been shutdown locally - returns SHUTDOWN if the connection has been shutdown locally
!*/ !*/
long read (
char* buf,
long num,
unsigned long timeout
);
/*!
requires
- num > 0
- buf points to an array of at least num bytes
- timeout < 2000000
ensures
- read() will not read more than num bytes of data into #buf
- if (timeout > 0) then read() blocks until ONE of the following happens:
- there is some data available and it has been written into #buf
- the remote end of the connection is closed
- an error has occurred
- the connection has been shutdown locally
- timeout milliseconds has elapsed
- else
- read() does not block
- returns the number of bytes read into #buf if there was any data.
- returns 0 if the connection has ended/terminated and there is no more data.
- returns TIMEOUT if timeout milliseconds elapsed before we got any data.
- returns OTHER_ERROR if there was an error.
- returns SHUTDOWN if the connection has been shutdown locally
!*/
unsigned short get_local_port ( unsigned short get_local_port (
) const; ) const;
/*! /*!
......
...@@ -65,6 +65,7 @@ set (tests ...@@ -65,6 +65,7 @@ set (tests
sliding_buffer.cpp sliding_buffer.cpp
smart_pointers.cpp smart_pointers.cpp
sockets.cpp sockets.cpp
sockets2.cpp
sockstreambuf.cpp sockstreambuf.cpp
stack.cpp stack.cpp
static_map.cpp static_map.cpp
......
...@@ -75,6 +75,7 @@ SRC += set.cpp ...@@ -75,6 +75,7 @@ SRC += set.cpp
SRC += sliding_buffer.cpp SRC += sliding_buffer.cpp
SRC += smart_pointers.cpp SRC += smart_pointers.cpp
SRC += sockets.cpp SRC += sockets.cpp
SRC += sockets2.cpp
SRC += sockstreambuf.cpp SRC += sockstreambuf.cpp
SRC += stack.cpp SRC += stack.cpp
SRC += static_map.cpp SRC += static_map.cpp
......
// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net)
// License: Boost Software License See LICENSE.txt for the full license.
#include "tester.h"
#include <dlib/sockets.h>
#include <dlib/threads.h>
#include <dlib/array.h>
#include <algorithm>
// This is called an unnamed-namespace and it has the effect of making everything
// inside this file "private" so that everything you declare will have static linkage.
// Thus we won't have any multiply defined symbol errors coming out of the linker when
// we try to compile the test suite.
namespace
{
using namespace test;
using namespace dlib;
using namespace std;
// Declare the logger we will use in this test. The name of the logger
// should start with "test."
dlib::logger dlog("test.sockets2");
class sockets2_tester : public tester, private multithreaded_object
{
/*!
WHAT THIS OBJECT REPRESENTS
This object represents a unit test. When it is constructed
it adds itself into the testing framework.
!*/
short port_num;
string data_to_send;
bool test_failed;
void write_thread (
)
{
try
{
scoped_ptr<connection> con(connect("127.0.0.1", port_num));
// Send a copy of the data down the connection so we can test our the read() function
// that uses timeouts in the main thread.
if (con->write(data_to_send.data(), data_to_send.size()) != (int)data_to_send.size())
{
test_failed = true;
dlog << LERROR << "failed to send all the data down the connection";
}
close_gracefully(con,300000);
}
catch (exception& e)
{
test_failed = true;
dlog << LERROR << e.what();
}
}
void no_write_thread (
)
{
try
{
scoped_ptr<connection> con(connect("127.0.0.1", port_num));
// just do nothing until the connection closes
char ch;
con->read(&ch, 1);
dlog << LDEBUG << "silent connection finally closing";
}
catch (exception& e)
{
test_failed = true;
dlog << LERROR << e.what();
}
}
public:
sockets2_tester (
) :
tester (
"test_sockets2", // the command line argument name for this test
"Run sockets2 tests.", // the command line argument description
0 // the number of command line arguments for this test
)
{
register_thread(*this, &sockets2_tester::write_thread);
register_thread(*this, &sockets2_tester::write_thread);
register_thread(*this, &sockets2_tester::write_thread);
register_thread(*this, &sockets2_tester::write_thread);
register_thread(*this, &sockets2_tester::write_thread);
register_thread(*this, &sockets2_tester::no_write_thread);
}
void perform_test (
)
{
run_tests(0);
run_tests(40);
}
void run_tests (
unsigned long timeout_to_use
)
{
// make sure there aren't any threads running
wait();
port_num = 5000;
test_failed = false;
print_spinner();
data_to_send = "oi 2m3ormao2m fo2im3fo23mi o2mi3 foa2m3fao23ifm2o3fmia23oima23iom3giugbiua";
// make the block of data much larger
for (int i = 0; i < 11; ++i)
data_to_send = data_to_send + data_to_send;
dlog << LINFO << "data block size: " << data_to_send.size();
scoped_ptr<listener> list;
DLIB_TEST(create_listener(list, port_num, "127.0.0.1") == 0);
DLIB_TEST(list);
// kick off the sending threads
start();
array<scoped_ptr<connection> >::expand_1a_c cons;
std::vector<long> bytes_received(6,0);
scoped_ptr<connection> con_temp;
// accept the 6 connections we should get
for (int i = 0; i < 6; ++i)
{
DLIB_TEST(list->accept(con_temp) == 0);
cons.push_back(con_temp);
print_spinner();
}
int finished_cons = 0;
// now receive all the bytes from the sending threads
while (finished_cons < 5)
{
for (unsigned long i = 0; i < cons.size(); ++i)
{
if (cons[i])
{
const int buf_size = 3000;
char buf[buf_size];
int status = cons[i]->read(buf, buf_size, timeout_to_use);
if (status > 0)
{
DLIB_TEST(equal(buf, buf+status, data_to_send.begin()+bytes_received[i]));
bytes_received[i] += status;
}
else if (status == 0)
{
// the connection is closed to kill it
cons[i].reset();
++finished_cons;
}
}
}
print_spinner();
}
for (unsigned long i = 0; i < bytes_received.size(); ++i)
{
DLIB_TEST(bytes_received[i] == (long)data_to_send.size() || cons[i]);
}
dlog << LINFO << "All data received correctly";
cons.clear();
print_spinner();
DLIB_TEST(test_failed == false);
// wait for all the sending threads to terminate
wait();
}
};
// Create an instance of this object. Doing this causes this test
// to be automatically inserted into the testing framework whenever this cpp file
// is linked into the project. Note that since we are inside an unnamed-namespace
// we won't get any linker errors about the symbol a being defined multple times.
sockets2_tester a;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment