ref: refs/heads/master
commit d95eea5ee95172da09803c2a45fb517b13af9676
Author: Michael Andres
Date: Fri Jul 24 16:11:01 2009 +0200
Implement data transfer class TextExch.
---
helper/installHelper.cc | 27 +++---
helper/installHelper.h | 219 ++++++++++++++++++++++++++++++++++++++++++-----
helper/test.cc | 39 ++++++---
3 files changed, 237 insertions(+), 48 deletions(-)
diff --git a/helper/installHelper.cc b/helper/installHelper.cc
index c490cdb..b53fece 100644
--- a/helper/installHelper.cc
+++ b/helper/installHelper.cc
@@ -31,7 +31,6 @@ try {
MIL << "InstallHelper " << getpid() << endl;
- std::string iFile;
{
ShmAccess<Comm> comm( shm(), "Comm" );
MIL << comm->pid << endl;
@@ -40,32 +39,32 @@ try {
ERR << "Not my pid: " << comm->pid << "(" << getpid() << ")" << endl;
return 1;
}
-
- if ( * comm->dataStr )
- iFile = comm->dataStr;
+ comm->setStatus( JS_RUNNING );
}
- MIL << "read " << iFile << endl;
- std::ifstream infile( iFile.c_str() );
- for( iostr::EachLine in( infile ); in; in.next() )
- {
- USR << *in << endl;
- }
+
+ MIL << "Prepare to receive data..." << endl;
+ ShmAccessUnlocked<TextExch> textExch( shm(), "TextExch" );
+ MIL << "Receive data..." << endl;
+ std::string rec;
+ do {
+ rec = textExch->get();
+ USR << rec << endl;
+ } while( ! rec.empty() );
+ MIL << "Received." << endl;
+
for(int i = 0; i < 10; ++i)
{
ShmAccess<Comm> comm( shm(), "Comm" );
-
comm->percent = i*10;
- comm->status = 4;
sleep(1);
}
{
ShmAccess<Comm> comm( shm(), "Comm" );
-
comm->percent = 100;
- comm->status = 7;
+ comm->setStatus( JS_COMPLETED );
}
MIL << "Done" << endl;
diff --git a/helper/installHelper.h b/helper/installHelper.h
index 9ec78bd..0aad237 100644
--- a/helper/installHelper.h
+++ b/helper/installHelper.h
@@ -2,7 +2,8 @@
#define INSTALLHELPER_H
#include
-#include
+#include
+#include
#include
#include
@@ -11,12 +12,6 @@
#define STR_SIZE 256
#define MAX_ANSWERS 10
-/*
-JobState
-Values {"New", "Starting", "Running", "Suspended", "Shutting Down", "Completed", "Terminated", "Killed", "Exception", "Service", "Query Pending", "DMTF Reserved", "Vendor Reserved"}
-ValueMap {"2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13..32767", "32768..65535"}
-*/
-
namespace cmpizypp
{
/**
@@ -24,14 +19,20 @@ namespace cmpizypp
*/
struct ShmData
{
- ShmData()
- : _lock(1)
- {}
- boost::interprocess::interprocess_semaphore _lock;
+ typedef boost::interprocess::interprocess_mutex mutex_t;
+ mutex_t _mutex;
+ };
+
+ enum
+ {
+ SHMACCESS_UNLOCKED = -2,
+ SHMACCESS_WAIT = -1,
+ SHMACCESS_IMMEDIATE = 0,
+ SHMACCESS_DEFAULTTIMEOUT = 5,
};
/**
- * Locked access to shared memory objetcs.
+ * Locked access (timeout) to shared memory objetcs.
* \code
* struct Comm {...};
* managed_shared_memory managed_shm( create_only, SHM_NAME, 65536 );
@@ -55,34 +56,161 @@ namespace cmpizypp
* \todo Define Exceptions.
*/
ShmAccess( boost::interprocess::managed_shared_memory & managed_shm_r,
- const char * name_r, time_t timeout = 5 )
+ const char * name_r, time_t timeout = SHMACCESS_DEFAULTTIMEOUT )
: _d( managed_shm_r.find<Derived>( name_r ).first )
+ , _locked( false )
{
if ( !_d )
throw std::string( "Can't find " ) + name_r;
- if ( _d && ! _d->_lock.timed_wait( boost::posix_time::from_time_t( ::time(0)+timeout ) ) )
- throw std::string( "Got no lock" );
+ if ( timeout >= 0 )
+ {
+ _locked = timeout ? _d->_mutex.timed_lock( boost::posix_time::from_time_t( ::time(0)+timeout ) )
+ : _d->_mutex.try_lock();
+ if ( !_locked )
+ throw std::string( "Got no lock" );
+ }
+ else if ( timeout == SHMACCESS_WAIT )
+ {
+ _d->_mutex.lock();
+ }
+ // else unlocked
}
- ~ShmAccess()
- { release(); }
+ ~ShmAccess() { release(); }
public:
- /** Give up lock and access. */
- void release()
- { if ( _d ) { _d->_lock.post(); _d = 0; } }
+ /** Give up lock. */
+ void release() { if ( _locked ) { _d->_mutex.unlock(); _locked = false; } }
public:
const Derived * get() const { return _d; }
Derived * get() { return _d; }
const Derived * operator->() const { return _d; }
Derived * operator->() { return _d; }
- const Derived & operator*() const { return _d; }
- Derived & operator*() { return _d; }
+ const Derived & operator*() const { return *_d; }
+ Derived & operator*() { return *_d; }
private:
Derived * _d;
+ bool _locked;
+ };
+
+ /**
+ * Unlocked access to shared memory objetcs.
+ */
+ template <class Derived>
+ struct ShmAccessUnlocked : public ShmAccess<Derived>
+ {
+ ShmAccessUnlocked( boost::interprocess::managed_shared_memory & managed_shm_r, const char * name_r )
+ : ShmAccess<Derived>( managed_shm_r, name_r, SHMACCESS_UNLOCKED )
+ {}
+ };
+
+ /**
+ * Locked access (no timeout) to shared memory objetcs.
+ */
+ template <class Derived>
+ struct ShmAccessWait : public ShmAccess<Derived>
+ {
+ ShmAccessWait( boost::interprocess::managed_shared_memory & managed_shm_r, const char * name_r )
+ : ShmAccess<Derived>( managed_shm_r, name_r, SHMACCESS_WAIT )
+ {}
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ struct TextExch : public ShmData
+ {
+ enum { buffsize = 1 };
+
+ TextExch()
+ : _filled( false )
+ {}
+
+ void send( const char * text_r )
+ { send( std::string( text_r ? text_r : "" ) ); }
+
+ void send( const std::string & text_r )
+ {
+ bool noEOD = true;
+ const char * data = text_r.c_str();
+ std::string::size_type remaining = text_r.size();
+ while ( remaining || noEOD )
+ {
+ boost::interprocess::scoped_lock lock( _mutex );
+ if ( _filled )
+ {
+ buff_full.wait( lock );
+ }
+ if ( remaining )
+ {
+ std::string::size_type toSend( std::min<unsigned>( remaining, buffsize ) );
+ strncpy( _buff, data, toSend );
+ data += toSend;
+ remaining -= toSend;
+ }
+ else
+ {
+ *_buff = 0;
+ noEOD = false;
+ }
+ //Notify to the other process that there is a message
+ _filled = true;
+ buff_empty.notify_one();
+ }
+ }
+
+ std::string get()
+ {
+ bool noEOD = true;
+ std::string ret;
+ do {
+ boost::interprocess::scoped_lock lock( _mutex );
+ if ( ! _filled )
+ {
+ buff_empty.wait( lock );
+ }
+ if ( *_buff )
+ ret += _buff;
+ else
+ noEOD = false;
+ //Notify the other process that the buffer is empty
+ _filled = false;
+ buff_full.notify_one();
+ } while ( noEOD );
+ return ret;
+ }
+
+ private:
+ /** Wait when buffer is empty. */
+ boost::interprocess::interprocess_condition buff_empty;
+ /** Wait when buffer is full. */
+ boost::interprocess::interprocess_condition buff_full;
+ /** The buffer. */
+ char _buff[buffsize+1];
+ /** Whether the buffer has data. */
+ bool _filled;
};
+ ////////////////////////////////////////////////////////////////////////////////
+
+ /** Job States
+ * \li DMTF Reserved 13..32767
+ * \li Vendor Reserved 32768..65535
+ */
+ enum JobState
+ {
+ JS_NEW = 2,
+ JS_STARTING = 3,
+ JS_RUNNING = 4,
+ JS_SUSPENDED = 5,
+ JS_SHUTTING_DOWN = 6,
+ JS_COMPLETED = 7,
+ JS_TERMINATE = 8,
+ JS_KILLED = 9,
+ JS_EXCEPTION = 10,
+ JS_SERVICE = 11,
+ JS_QUERY_PENDING = 12
+ };
enum Answers
{
@@ -94,9 +222,19 @@ namespace cmpizypp
*/
struct Comm : public ShmData
{
+ Comm()
+ : pid ( 0 )
+ , _status ( JS_NEW )
+ , percent ( 0 )
+ {}
+
+ JobState status() const { return JobState(_status); }
+ void setStatus( JobState val_r ) { _status = val_r; }
+
pid_t pid;
- uint16_t status;
+ uint16_t _status;
uint16_t percent;
+
uint16_t error;
char errorStr[STR_SIZE];
uint16_t answers[MAX_ANSWERS];
@@ -104,4 +242,39 @@ namespace cmpizypp
char dataStr[STR_SIZE];
};
}
+#ifdef HELPERDEBUG
+#include <iostream>
+namespace cmpizypp
+{
+ std::ostream & operator<<( std::ostream & str, JobState obj )
+ {
+ switch ( obj )
+ {
+#define OUTS(E) case E: return str << #E; break
+ OUTS( JS_NEW );
+ OUTS( JS_STARTING );
+ OUTS( JS_RUNNING );
+ OUTS( JS_SUSPENDED );
+ OUTS( JS_SHUTTING_DOWN );
+ OUTS( JS_COMPLETED );
+ OUTS( JS_TERMINATE );
+ OUTS( JS_KILLED );
+ OUTS( JS_EXCEPTION );
+ OUTS( JS_SERVICE );
+ OUTS( JS_QUERY_PENDING );
+#undef OUTS
+ }
+ return str << "JS_(" << unsigned(obj) << ")";
+ }
+
+ std::ostream & operator<<( std::ostream & str, const Comm & obj )
+ {
+ return str << "Comm(" << obj.pid << "|" << obj.status() << "|" << obj.percent << "%)";
+ }
+
+ template <class Derived>
+ inline std::ostream & operator<<( std::ostream & str, const ShmAccess<Derived> & obj )
+ { return str << *obj; }
+}
+#endif
#endif
\ No newline at end of file
diff --git a/helper/test.cc b/helper/test.cc
index 12b8ad6..b52da6a 100644
--- a/helper/test.cc
+++ b/helper/test.cc
@@ -11,6 +11,7 @@
#include
#include
+#define HELPERDEBUG
#include "installHelper.h"
using namespace zypp;
@@ -54,26 +55,42 @@ try {
{
throw std::string( "Out of shmem constructing Comm" );
}
-
- // write task list
- std::string path( TmpFile().path().asString() );
- std::ofstream o( path.c_str() );
- o << "hi" << endl;
- o.close();
+ if ( ! shm().find_or_construct<TextExch>("TextExch")() )
+ {
+ throw std::string( "Out of shmem constructing TextExch" );
+ }
ShmAccess<Comm> comm( shm(), "Comm" ); // blocks the helper
-
- strncpy( comm->dataStr, path.c_str(), STR_SIZE );
-
+ USR << "STATUS: " << comm << endl;
ExternalProgram helper( "./installHelper" );
comm->pid = helper.getpid();
comm.release(); // go...
+ USR << "STATUS: " << comm << endl;
+
+
+ MIL << "Prepare to send data..." << endl;
+ const char * args[] = {
+ "Das Reh huepft hoch",
+ "Das Reh huepft weit",
+ "Warum auch nicht",
+ "Es hat ja Zeit",
+ };
+ ShmAccessUnlocked<TextExch> textExch( shm(), "TextExch" );
+ MIL << "Send data..." << endl;
+ for_( it, arrayBegin( args ), arrayEnd( args ) )
+ {
+ textExch->send( *it );
+ }
+ textExch->send( 0 );
+ MIL << "Sent." << endl;
+
while ( helper.running() )
{
- sleep( 5 );
- MIL << "ping..." << endl;
+ USR << "STATUS: " << comm << endl;
+ sleep( 1 );
}
+ USR << "FIN: " << comm << endl;
if ( helper.close() != 0 )
ERR << helper.execError() << endl;
--
To unsubscribe, e-mail: zypp-commit+unsubscribe@opensuse.org
For additional commands, e-mail: zypp-commit+help@opensuse.org