00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "session.h"
00021 #include "session_p.h"
00022
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "servermanager_p.h"
00027 #include "xdgbasedirs_p.h"
00028
00029 #include <kdebug.h>
00030 #include <klocale.h>
00031
00032 #include <QCoreApplication>
00033 #include <QtCore/QDir>
00034 #include <QtCore/QQueue>
00035 #include <QtCore/QThreadStorage>
00036 #include <QtCore/QTimer>
00037
00038 #include <QtNetwork/QLocalSocket>
00039
00040 #define PIPELINE_LENGTH 2
00041
00042 using namespace Akonadi;
00043
00044
00045
00046
00047 void SessionPrivate::startNext()
00048 {
00049 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
00050 }
00051
00052 void SessionPrivate::reconnect()
00053 {
00054
00055 if ( socket->state() != QLocalSocket::ConnectedState &&
00056 socket->state() != QLocalSocket::ConnectingState ) {
00057 #ifdef Q_OS_WIN //krazy:exclude=cpp
00058 const QString namedPipe = mConnectionSettings->value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00059 socket->connectToServer( namedPipe );
00060 #else
00061 const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00062 const QString path = mConnectionSettings->value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString();
00063 socket->connectToServer( path );
00064 #endif
00065 }
00066 }
00067
00068 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
00069 {
00070 Q_ASSERT( mParent->sender() == socket );
00071 kWarning() << "Socket error occurred:" << socket->errorString();
00072 socketDisconnected();
00073 }
00074
00075 void SessionPrivate::socketDisconnected()
00076 {
00077 if ( currentJob )
00078 currentJob->d_ptr->lostConnection();
00079 connected = false;
00080 QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
00081 }
00082
00083 void SessionPrivate::dataReceived()
00084 {
00085 while ( socket->bytesAvailable() > 0 ) {
00086 if ( parser->continuationSize() > 1 ) {
00087 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00088 parser->parseBlock( data );
00089 } else if ( socket->canReadLine() ) {
00090 if ( !parser->parseNextLine( socket->readLine() ) )
00091 continue;
00092
00093
00094 if ( parser->tag() == QByteArray("0") ) {
00095 if ( parser->data().startsWith( "OK" ) ) {
00096 connected = true;
00097 startNext();
00098 } else {
00099 kWarning() << "Unable to login to Akonadi server:" << parser->data();
00100 socket->close();
00101 QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
00102 }
00103 }
00104
00105
00106 if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00107 const int pos = parser->data().indexOf( "[PROTOCOL" );
00108 if ( pos > 0 ) {
00109 qint64 tmp = 0;
00110 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00111 protocolVersion = tmp;
00112 Internal::setServerProtocolVersion( tmp );
00113 }
00114 kDebug() << "Server protocol version is:" << protocolVersion;
00115
00116 writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
00117
00118
00119 } else {
00120 if ( currentJob )
00121 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00122 }
00123
00124
00125 parser->reset();
00126 } else {
00127 break;
00128 }
00129 }
00130 }
00131
00132 bool SessionPrivate::canPipelineNext()
00133 {
00134 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00135 return false;
00136 if ( pipeline.isEmpty() && currentJob )
00137 return currentJob->d_ptr->mWriteFinished;
00138 if ( !pipeline.isEmpty() )
00139 return pipeline.last()->d_ptr->mWriteFinished;
00140 return false;
00141 }
00142
00143 void SessionPrivate::doStartNext()
00144 {
00145 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00146 return;
00147 if ( canPipelineNext() ) {
00148 Akonadi::Job *nextJob = queue.dequeue();
00149 pipeline.enqueue( nextJob );
00150 startJob( nextJob );
00151 }
00152 if ( jobRunning )
00153 return;
00154 jobRunning = true;
00155 if ( !pipeline.isEmpty() ) {
00156 currentJob = pipeline.dequeue();
00157 } else {
00158 currentJob = queue.dequeue();
00159 startJob( currentJob );
00160 }
00161 }
00162
00163 void SessionPrivate::startJob( Job *job )
00164 {
00165 if ( protocolVersion < minimumProtocolVersion() ) {
00166 job->setError( Job::ProtocolVersionMismatch );
00167 job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
00168 job->emitResult();
00169 } else {
00170 job->d_ptr->startQueued();
00171 }
00172 }
00173
00174 void SessionPrivate::endJob( Job *job )
00175 {
00176 job->emitResult();
00177 }
00178
00179 void SessionPrivate::jobDone(KJob * job)
00180 {
00181 if( job == currentJob ) {
00182 if ( pipeline.isEmpty() ) {
00183 jobRunning = false;
00184 currentJob = 0;
00185 } else {
00186 currentJob = pipeline.dequeue();
00187 }
00188 startNext();
00189 }
00190
00191 else {
00192 kDebug() << job << "Non-current job finished.";
00193 }
00194 }
00195
00196 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00197 {
00198 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00199 Q_UNUSED( job );
00200
00201 startNext();
00202 }
00203
00204 void SessionPrivate::jobDestroyed(QObject * job)
00205 {
00206 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
00207
00208 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
00209 if ( currentJob == job ) {
00210 currentJob = 0;
00211 jobRunning = false;
00212 }
00213 }
00214
00215 void SessionPrivate::addJob(Job * job)
00216 {
00217 queue.append( job );
00218 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
00219 QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
00220 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
00221 startNext();
00222 }
00223
00224 int SessionPrivate::nextTag()
00225 {
00226 return theNextTag++;
00227 }
00228
00229 void SessionPrivate::writeData(const QByteArray & data)
00230 {
00231 socket->write( data );
00232 }
00233
00234
00235
00236
00237 SessionPrivate::SessionPrivate( Session *parent )
00238 : mParent( parent ), mConnectionSettings( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
00239 {
00240 }
00241
00242 void SessionPrivate::init( const QByteArray &id )
00243 {
00244 parser = new ImapParser();
00245
00246 if ( !id.isEmpty() ) {
00247 sessionId = id;
00248 } else {
00249 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
00250 + '-' + QByteArray::number( qrand() );
00251 }
00252
00253 connected = false;
00254 theNextTag = 1;
00255 jobRunning = false;
00256
00257 const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00258
00259 QFileInfo fileInfo( connectionConfigFile );
00260 if ( !fileInfo.exists() ) {
00261 kWarning() << "Akonadi Client Session: connection config file '"
00262 << "akonadi/akonadiconnectionrc can not be found in '"
00263 << XdgBaseDirs::homePath( "config" ) << "' nor in any of "
00264 << XdgBaseDirs::systemPathList( "config" );
00265 }
00266
00267 mConnectionSettings = new QSettings( connectionConfigFile, QSettings::IniFormat );
00268
00269
00270 socket = new QLocalSocket( mParent );
00271
00272 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
00273 mParent->connect( socket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
00274 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
00275 reconnect();
00276 }
00277
00278 Session::Session(const QByteArray & sessionId, QObject * parent) :
00279 QObject( parent ),
00280 d( new SessionPrivate( this ) )
00281 {
00282 d->init( sessionId );
00283 }
00284
00285 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
00286 : QObject( parent ),
00287 d( dd )
00288 {
00289 d->init( sessionId );
00290 }
00291
00292 Session::~Session()
00293 {
00294 clear();
00295 delete d;
00296 }
00297
00298 QByteArray Session::sessionId() const
00299 {
00300 return d->sessionId;
00301 }
00302
00303 QThreadStorage<Session*> instances;
00304
00305 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00306 {
00307 Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00308 "You tried to create a default session with empty session id!" );
00309 Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00310 "You tried to create a default session twice!" );
00311
00312 instances.setLocalData( new Session( sessionId ) );
00313 }
00314
00315 Session* Session::defaultSession()
00316 {
00317 if ( !instances.hasLocalData() )
00318 instances.setLocalData( new Session() );
00319 return instances.localData();
00320 }
00321
00322 void Session::clear()
00323 {
00324 foreach ( Job* job, d->queue )
00325 job->kill( KJob::EmitResult );
00326 d->queue.clear();
00327 if ( d->currentJob )
00328 d->currentJob->kill( KJob::EmitResult );
00329 }
00330
00331 #include "session.moc"