[Date Prev][Date Next] [Chronological] [Thread] [Top]

LMDB write/read simultaneously over different threads

Hi all,

I am a new user to LMDB. Please forgive me for my ignorance or mistakes you might encounter.
However, I have spent quite some time trying to resolve my issues but without a success. 
That's why I am writing to you asking for your help to shade some light.

To the point: 
1) I am having a database where there is only one writer and a few readers(e.g. 4 readers).
2) The database-writer lives on different thread than the database-reader(s).
3) The writer-thread keeps on running/writing all the time to the database.
4) At random intervals readers from different threads will request to extract data from the database.
The problem that occurs is the following: If the writer is active( during writing), If I try to fetch any data
from the db using the reader, it fails, with error MDB_NOTFOUND. As a remark here, I make sure that the
reader-thread is launched AFTER the writer thread to ensure that the data exist in the db.

In order to simulate this kind of behaviour(write-read) over different threads, I have written a small test application,
that tries to depict my case/usecase.

In main() I simply launch two threads/functions, the writer-thread and after some time, the reader thread.
The common enviroment is constructed() in main and passed as parameter to the auxilliary functions.
If the reader starts reading while writing is not finished, then the application crashes or returns MDB_NOTFOUND.
This contradicts with the capabilities of the library, that is, parallel write/read from different threads. 
If the writer has finished, then the reader works properly.

Some general questions.
i) How many times do i need to open the database? Currently I open the (unique) database twice: one in the writer and one in the reader.
The doc mentions that it has to be opened only one time. Even in that case, my app does not work.

ii) What is the practical usage of mdb_txn_reset() and mdb_txn_renew()? Does it need to be used in my case?

iii) Since I am only ADDING data in the db and only READING them, what kind of optimizations can be applied?

iv) Regarding for-loops, what is a proper way of using the trasnactions and commit/abort/reset/etc of transactions?

Please any helpfull information is highly apprecited   or if someone can add some examples with different threads that would be awesome.

#include <iostream>
#include <thread>
#include <assert.h>
#include <string>
#include "lmdb.h"
#include <unistd.h>

using namespace std;

void writeLMDB( MDB_env * env, size_t nrElems )
    cout << "BEGIN WRITING." << endl;

    MDB_txn *txn;
    MDB_dbi dbi;
    MDB_stat status;

    // create transaction
    assert( MDB_SUCCESS == mdb_txn_begin( env, NULL, 0, &txn ) );
    // open db
    assert( MDB_SUCCESS == mdb_open( txn, NULL, MDB_DUPSORT  , &dbi ) );
    // this commit-transaction is needed since we are creating the db for the first time
    assert( MDB_SUCCESS == mdb_txn_commit( txn ) );

    MDB_val key, data;

    // begin (again a new transaction)
    assert( MDB_SUCCESS == mdb_txn_begin( env, NULL, 0, &txn ) );

    // keep on writing elements in the db
    for( size_t i = 0; i < nrElems ; ++i )
        key.mv_size = sizeof( size_t );
        key.mv_data = &i;
        string value = "test";
        data.mv_size = value.size();
        data.mv_data = (void*)value.data();
        cout << "WRITER-KEY:" << *(size_t*)key.mv_data << " VALUE:" ; cout.write( (char*)data.mv_data, data.mv_size ); cout << endl;
        assert( MDB_SUCCESS == mdb_put( txn, dbi, &key, &data, 0 ) );
    assert( MDB_SUCCESS == mdb_txn_commit( txn ) );
    assert( MDB_SUCCESS == mdb_env_stat( env, &status ) );
    mdb_close( env, dbi );
    cout << "FINISHED WRITING." << endl;

void readLMDB( MDB_env * _env, size_t start, size_t end )
    cout << "BEGIN READING.." << endl;
    MDB_dbi dbi;
    MDB_txn *txn = NULL;

    // lmdb operations
    assert( MDB_SUCCESS == mdb_txn_begin( _env, NULL, MDB_RDONLY, &txn ) );
    assert( MDB_SUCCESS == mdb_open( txn, NULL, 0, &dbi ) );

    // key+data
    MDB_val key, data;
    for ( size_t i = start; i <= end ; i++ )
        key.mv_size = sizeof( size_t );
        key.mv_data = &i;
        int res = mdb_get( txn, dbi , &key, &data );
        //cout << "READER-KEY:" << *(size_t*)key.mv_data << " VALUE:" ; cout.write( (char*)data.mv_data, data.mv_size ); cout << endl;
        if( res != 0 ) cout << "error fetching data" << endl;
    // close db
    mdb_close(  _env, dbi );
    cout << "FINISHED READING.." << endl;

int main(int argc, char *argv[] )
    // db location(folder must exist)
    string dbLocation = "/work/vasilis/databases/lmdbtest/";
    size_t maxdbSize = 10485760;
    system( "rm -rf /work/vasilis/databases/lmdbtest/*" );

    //create enviroment:ONE enviroment only for one process
    MDB_env * _env;
    assert( MDB_SUCCESS == ( mdb_env_create( &_env ) ) );
    //set the size of the database
    assert( MDB_SUCCESS == mdb_env_set_mapsize( _env, maxdbSize ) );
    // open the enviroment: ready to be used by everyone else
    assert( MDB_SUCCESS == mdb_env_open( _env, dbLocation.c_str(), 0 , 0664 ) );

    // writer-thread: write 1000 elements in the database
    thread writethread( writeLMDB, _env, 1000 );
    usleep( 1000 * 100 );
    // read from - to
    thread readthread( readLMDB, _env, 0, 20 );

    // wait for threads to finish
    cout << "THREADS COMPLETED " << endl;

    // close the enviroment
    mdb_env_close( _env );

    return 0;