Commit dfefd981 authored by Davis King's avatar Davis King

merged

parents 01b127ea 79bdfebb
......@@ -7,19 +7,152 @@
#include <utility>
#include <iostream>
#include <cstdio>
#include <fcntl.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/select.h>
#include "call_matlab.h"
using namespace std;
// ----------------------------------------------------------------------------------------
namespace dlib
{
// ----------------------------------------------------------------------------------------
void make_fd_non_blocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// ----------------------------------------------------------------------------------------
// Block until fd is ready to read, while also echoing whatever is in fd_printf to
// cout.
int read_echoing_select(int fd, int fd_printf)
{
// run until fd has data ready
while(fd_printf >= 0)
{
fd_set rfds;
int retval;
while(true)
{
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
FD_SET(fd_printf, &rfds);
// select times out every second just so we can check for matlab ctrl+c.
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
try{check_for_matlab_ctrl_c();} catch(...) { return 1; }
retval = select(std::max(fd,fd_printf)+1, &rfds, NULL, NULL, &tv);
try{check_for_matlab_ctrl_c();} catch(...) { return 1; }
if (retval == 0) // keep going if it was just a timeout.
continue;
else if (retval == -1 && errno == EINTR)
continue;
break;
}
if (retval == -1)
{
return 1;
}
else
{
if (FD_ISSET(fd,&rfds))
{
return 0;
}
else
{
char buf[1024];
int num = read(fd_printf,buf, sizeof(buf)-1);
if (num == -1)
return 1;
if (num > 0)
{
buf[num] = 0;
cout << buf << flush;
}
}
}
}
return 0;
}
int write_echoing_select(int fd, int fd_printf)
{
// run until fd has data ready
while(fd_printf >= 0)
{
fd_set rfds, wfds;
int retval;
while(true)
{
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_SET(fd, &wfds);
FD_SET(fd_printf, &rfds);
// select times out every second just so we can check for matlab ctrl+c.
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
try{check_for_matlab_ctrl_c();} catch(...) { return 1; }
retval = select(std::max(fd,fd_printf)+1, &rfds, &wfds, NULL, &tv);
try{check_for_matlab_ctrl_c();} catch(...) { return 1; }
if (retval == 0) // keep going if it was just a timeout.
continue;
else if (retval == -1 && errno == EINTR)
continue;
break;
}
if (retval == -1)
{
return 1;
}
else
{
if (FD_ISSET(fd,&wfds))
{
return 0;
}
else
{
char buf[1024];
int num = read(fd_printf,buf, sizeof(buf)-1);
if (num == -1)
return 1;
if (num > 0)
{
buf[num] = 0;
cout << buf << flush;
}
}
}
}
return 0;
}
// ----------------------------------------------------------------------------------------
class filestreambuf : public std::streambuf
{
/*!
INITIAL VALUE
- fd_in == the file descriptor we read from.
- fd_out == the file descriptor we write to.
- fd == the file descriptor we read from.
- in_buffer == an array of in_buffer_size bytes
- out_buffer == an array of out_buffer_size bytes
......@@ -32,11 +165,11 @@ namespace dlib
public:
filestreambuf (
int fd_in_,
int fd_out_
int fd_,
int fd_printf_
) :
fd_in(fd_in_),
fd_out(fd_out_),
fd(fd_),
fd_printf(fd_printf_),
out_buffer(0),
in_buffer(0)
{
......@@ -86,13 +219,23 @@ namespace dlib
)
{
int num = static_cast<int>(pptr()-pbase());
if (write(fd_out,out_buffer,num) != num)
const int num_written = num;
char* buf = out_buffer;
while(num != 0)
{
// the write was not successful so return EOF
return EOF;
}
pbump(-num);
return num;
if(write_echoing_select(fd, fd_printf))
return EOF;
int status = write(fd,buf,num);
if (status < 0)
{
// the write was not successful so return EOF
return EOF;
}
num -= status;
buf += status;
}
pbump(-num_written);
return num_written;
}
// output functions
......@@ -155,11 +298,19 @@ namespace dlib
}
else
{
if (write(fd_out,s,num_left) != num_left)
while(num_left != 0)
{
// the write was not successful so return that 0 bytes were written
return 0;
}
if(write_echoing_select(fd, fd_printf))
return EOF;
int status = write(fd,s,num_left);
if (status < 0)
{
// the write was not successful so return that 0 bytes were written
return 0;
}
num_left -= status;
s += status;
}
return num;
}
}
......@@ -184,7 +335,9 @@ namespace dlib
std::memmove(in_buffer+(max_putback-num_put_back), gptr()-num_put_back, num_put_back);
int num = read(fd_in,in_buffer+max_putback, in_buffer_size-max_putback);
if (read_echoing_select(fd, fd_printf))
return EOF;
int num = read(fd,in_buffer+max_putback, in_buffer_size-max_putback);
if (num <= 0)
{
// an error occurred or the connection is over which is EOF
......@@ -236,8 +389,8 @@ namespace dlib
private:
// member data
int fd_in;
int fd_out;
int fd;
int fd_printf;
static const std::streamsize max_putback = 4;
static const std::streamsize out_buffer_size = 10000;
static const std::streamsize in_buffer_size = 10000;
......@@ -246,10 +399,20 @@ namespace dlib
};
namespace impl
{
std::ostream& get_data_ostream()
{
static filestreambuf dbuff(STDIN_FILENO, -1);
static ostream out(&dbuff);
return out;
}
}
// ----------------------------------------------------------------------------------------
subprocess_stream::
subprocess_stream(const char* program_name) : stderr(NULL), std::iostream(NULL)
subprocess_stream(const char* program_name) : stderr(NULL), iosub(NULL)
{
if (access(program_name, F_OK))
throw dlib::error("Error: '" + std::string(program_name) + "' file does not exist.");
......@@ -263,12 +426,12 @@ namespace dlib
if (child_pid == 0)
{
// In child process
dup2(write_pipe.read_fd(), STDIN_FILENO);
dup2(read_pipe.write_fd(), STDOUT_FILENO);
dup2(err_pipe.write_fd(), STDERR_FILENO);
write_pipe.close();
read_pipe.close();
err_pipe.close();
dup2(data_pipe.child_fd(), STDIN_FILENO);
dup2(stdout_pipe.child_fd(), STDOUT_FILENO);
dup2(stderr_pipe.child_fd(), STDERR_FILENO);
data_pipe.close();
stdout_pipe.close();
stderr_pipe.close();
char* argv[] = {(char*)program_name, nullptr};
char* envp[] = {nullptr};
......@@ -281,15 +444,18 @@ namespace dlib
else
{
// In parent process
close(write_pipe.read_fd());
close(read_pipe.write_fd());
close(err_pipe.write_fd());
inout_buf = std::unique_ptr<filestreambuf>(new filestreambuf(read_pipe.read_fd(), write_pipe.write_fd()));
err_buf = std::unique_ptr<filestreambuf>(new filestreambuf(err_pipe.read_fd(), 0));
this->rdbuf(inout_buf.get());
close(data_pipe.child_fd());
close(stdout_pipe.child_fd());
close(stderr_pipe.child_fd());
make_fd_non_blocking(data_pipe.parent_fd());
make_fd_non_blocking(stdout_pipe.parent_fd());
make_fd_non_blocking(stderr_pipe.parent_fd());
inout_buf = std::unique_ptr<filestreambuf>(new filestreambuf(data_pipe.parent_fd(), stdout_pipe.parent_fd()));
err_buf = std::unique_ptr<filestreambuf>(new filestreambuf(stderr_pipe.parent_fd(), stdout_pipe.parent_fd()));
iosub.rdbuf(inout_buf.get());
stderr.rdbuf(err_buf.get());
this->tie(this);
stderr.tie(this);
iosub.tie(&iosub);
stderr.tie(&iosub);
}
}
......@@ -321,6 +487,11 @@ namespace dlib
std::ostringstream sout;
sout << stderr.rdbuf();
try{check_for_matlab_ctrl_c();} catch(...)
{
kill(child_pid, SIGTERM);
}
int status;
waitpid(child_pid, &status, 0);
if (status)
......@@ -334,7 +505,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
void subprocess_stream::
send_eof() { inout_buf->sync(); ::close(write_pipe.write_fd()); }
send_eof() { inout_buf->sync(); ::close(data_pipe.parent_fd()); }
// ----------------------------------------------------------------------------------------
......
......@@ -8,6 +8,9 @@
#include <iostream>
#include <memory>
#include <dlib/matrix.h>
#include <sys/types.h>
#include <sys/socket.h>
namespace dlib
{
......@@ -15,7 +18,7 @@ namespace dlib
// --------------------------------------------------------------------------------------
// Call dlib's serialize and deserialize by default. The point of this version of
// serailize is to do something fast that normally we wouldn't do, like directly copy
// serialize is to do something fast that normally we wouldn't do, like directly copy
// memory. This is safe since this is an interprocess communication happening the same
// machine.
template <typename T> void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); }
......@@ -49,19 +52,20 @@ namespace dlib
// ----------------------------------------------------------------------------------------
inline void send_to_parent_process() {std::cout.flush();}
namespace impl{ std::ostream& get_data_ostream(); }
inline void send_to_parent_process() {impl::get_data_ostream().flush();}
template <typename U, typename ...T>
void send_to_parent_process(U&& arg1, T&& ...args)
/*!
ensures
- sends all the arguments to send_to_parent_process() to standard output (and
hence to the parent process) by serializing them with
interprocess_serialize().
- sends all the arguments to send_to_parent_process() to the parent process by
serializing them with interprocess_serialize().
!*/
{
interprocess_serialize(arg1, std::cout);
interprocess_serialize(arg1, impl::get_data_ostream());
send_to_parent_process(std::forward<T>(args)...);
if (!std::cout)
if (!impl::get_data_ostream())
throw dlib::error("Error sending object to parent process.");
}
......@@ -86,22 +90,32 @@ namespace dlib
class filestreambuf;
class subprocess_stream : public std::iostream
class subprocess_stream
{
/*!
WHAT THIS OBJECT REPRESENTS
This is a tool for spawning a subprocess and communicating with it through
that processes standard input, output, and error. Here is an example:
its standard input, output, and error. Here is an example:
subprocess_stream s("/usr/bin/some_program");
s.send(obj1, obj2, obj3);
s.receive(obj4, obj5);
s.wait(); // wait for sub process to terminate
Then in the sub process you would have:
receive_from_parent_process(obj1, obj2, obj3);
// do stuff
cout << "echo this text to parent cout" << endl;
send_to_parent_process(obj4, obj5);
subprocess_stream s("/usr/bin/echo");
s << "echo me this!";
string line;
getline(s, line);
cout << line << endl;
s.wait();
That example runs echo, sends it some text, gets it back, and prints it to
the screen. Then it waits for the subprocess to finish.
Additionally, if the sub process writes to its standard out then that will
be echoed to std::cout in the parent process. Also, the communication of
send()/receive() calls between the parent and child happens all on the
standard input file descriptor. So you can't really use std::cin for
anything inside the child process as that would interfere with
receive_from_parent_process() and send_to_parent_process().
!*/
public:
......@@ -119,7 +133,7 @@ namespace dlib
/*!
ensures
- calls wait(). Note that the destructor never throws even though wait() can.
If an exception is thrown by wait() it is just logged to std::cerr.
If an exception is thrown by wait() it is just logged to std::cerr.
!*/
void wait(
......@@ -147,16 +161,16 @@ namespace dlib
with interprocess_serialize().
!*/
{
interprocess_serialize(arg1, *this);
interprocess_serialize(arg1, iosub);
send(std::forward<T>(args)...);
if (!this->good())
if (!iosub)
{
std::ostringstream sout;
sout << stderr.rdbuf();
throw dlib::error("Error sending object to child process.\n" + sout.str());
}
}
void send() {this->flush();}
void send() {iosub.flush();}
template <typename U, typename ...T>
void receive(U&& arg1, T&& ...args)
......@@ -166,9 +180,9 @@ namespace dlib
them with interprocess_deserialize().
!*/
{
interprocess_deserialize(arg1, *this);
interprocess_deserialize(arg1, iosub);
receive(std::forward<T>(args)...);
if (!this->good())
if (!iosub)
{
std::ostringstream sout;
sout << stderr.rdbuf();
......@@ -187,21 +201,22 @@ namespace dlib
private:
int fd[2];
public:
cpipe() { if (pipe(fd)) throw dlib::error("Failed to create pipe"); }
cpipe() { if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fd)) throw dlib::error("Failed to create pipe"); }
~cpipe() { close(); }
int read_fd() const { return fd[0]; }
int write_fd() const { return fd[1]; }
int parent_fd() const { return fd[0]; }
int child_fd() const { return fd[1]; }
void close() { ::close(fd[0]); ::close(fd[1]); }
};
cpipe write_pipe;
cpipe read_pipe;
cpipe err_pipe;
cpipe data_pipe;
cpipe stdout_pipe;
cpipe stderr_pipe;
bool wait_called = false;
std::unique_ptr<filestreambuf> inout_buf;
std::unique_ptr<filestreambuf> err_buf;
int child_pid = -1;
std::istream stderr;
std::iostream iosub;
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment