[Date Prev][Date Next] [Chronological] [Thread] [Top]

(ITS#8355) [lmdb] Issues with sorted duplicate DBs with cursor delete



Full_Name: H Law
Version: LMDB
OS: Linux
URL: 
Submission from: (NULL) (42.2.241.129)


It seems that, after using a cursor delete during a cursor traversal on a dup
sort database, the cursor is in a strange state, when MDB_NEXT / MDB_NEXT_DUP
ceases to work properly while MDB_PREV /  MDB_PREV_DUP still functions.

In particular, when MDB_NEXT or MDB_NEXT_DUP is called after cursor deletion, if
next key/value pair exists, the cursor will not advance, and got stuck by
returning the same record when MDB_NEXT or MDB_NEXT_DUP is called repeatly.  In
case there is no next record, the program was hang.

The following modified version of mtest3.c shows the issue.  I am testing this
on the latest commit 20dec1f69bf4860202c764ce92b1fbbe3d11a065 of lmdb on 20 Jan,
on x86-64 Linux. 
I got a similar behaviour when a slightly earlier version of lmdb was
cross-compiled with a Java wrapper for use on Android, which is why I am testing
this.   The issue should therefore not be platform specific.

Thank you.

====

/* mtest3.c - memory-mapped database tester/toy */
/*
 * Copyrit t 2011-2015 Howard Chu, Symas Corp.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted only as authorized by the OpenLDAP
 * Public License.
 *
 * A copy of this license is available in the file LICENSE in the
 * top-level directory of the distribution or, alternatively, at
 * <http://www.OpenLDAP.org/license.html>.
 */

/* Tests for sorted duplicate DBs with cursor delete */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>323include <time.h>
#include "lmdb.h"

#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
	"%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))

int main(int argc,char * argv[])
{
	int i = 0, j = 0, rc;
	MDB_env *env;
	MDB_dbi dbi;
	MDB_val key, data;
	MDB_txn *txn;
	MDB_stat mst;
	MDB_cursor *cursor;
	int count;
	int *values;
	char sval[32];
	char kval[sizeof(int)];

	srand(time(NULL));

	memset(sval, 0, sizeof(sval));

        count = 10;
	values = (int *)malloc(count*sizeof(int));

	for(i = 0;i<count;i++) {
               values[i]= i * 10;
	 }

	E(mdb_env_create(&env));
	E(mdb_env_set_mapsize(env, 10485760));
	E(mdb_env_set_maxdbs(env, 4));
	E(mdb_env_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664));

	E(mdb_txn_begin(env, NULL, 0, &txn));
	E(mdb_dbi_open(txn, "id2", MDB_CREATE|MDB_DUPSORT, &dbi));

	key.mv_size = sizeof(int);
	key.mv_data = kval;
	data.mv_size = sizeof(sval);
	data.mv_data = sval;

	printf("Adding %d values\n", count);
	for (i=0;i<count;i++) {
		if (!(i & 0x07))
			sprintf(kval, "%03x",alalues[i]);
		sprintf(sval, "%03x %d foo bar", values[i], values[i]);
		if (RES(MDB_KEYEXIST, mdb_put(txn, dbi, &key, &data, MDB_NODUPDATA)))
			j++;
	}
	if (j) printf("%d duplicates skipped\n", j);
	E(mdb_txn_commit(txn));
	E(mdb_env_stat(env, &mst));


	E(mdb_txn_begin(env, NULL, MDB_RDONLY, &txn));
	E(mdb_cursor_open(txn, dbi, &cursor));
	while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT_DUP)) == 0) {
		printf("key: %p %.*s, data: %p %.*s\n",
			key.mv_data,  (int) key.mv_size,  (char *) key.mv_data,
			data.mv_data, (int) data.mv_size, (char *) data.mv_data);
	}
	CHECK(rc == MDB_NOTFOUND, "mdb_cursor_get");
	mdb_cursor_close(cursor);
	mdb_txn_abort(txn);

	E(mdb_env_stat(env, &mst));
	E(mdb_txn_begin(env, NULL, 0, &txn));
9E9E(mdb_cursor_open(txn, dbi, &cursor));
        /*
         * changing following to values[8] freezes program during
mdb_cursor_del
         * as there is no more record following the deletion
         */
	sprintf(kval, "%03x", values[0]);
	key.mv_size = sizeof(int);
	key.mv_data = kval;
	printf("\nCursor set / first dup\n");
	E(mdb_cursor_get(cursor, &key, &data, MDB_SET_KEY));
	E(mdb_cursor_get(cursor, &key, &data, MDB_FIRST_DUP));
	printf("key: %.*s, data: %.*s\n",
			(int) key.mv_size,  (char *) key.mv_data,
			(int) data.mv_size, (char *) data.mv_data);
	printf("Cursor next\n");
	j=0;
	while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT_DUP)) == 0) {
		printf("key: %.*s, data: %.*s\n",
			(int) key.mv_size,  (char *) key.mv_data,
			(int) data.mv_size, (char *) data.mv_data);
                j++;
                if (j == 1) {
		    printf("delete the above key/data\n");
                    if (RES(MDB_NOTFOUND, mdb_cursor_del(cursor, 0))) {
	                mdb_cursor_close(cursor);
			mdb_txn_abort(txn);
                        break;
                    }
                }
                if (j > count) {
                   printf("Should not be there\n");
                   break;
                }
	}
        if (j > 1) {
	    mdb_cursor_close(cursor);
	    E(mdb_txn_commit(txn));
        } 


	E(mdb_txn_begin(env, NULL, 0, &txn));
	E(mdb_cursor_open(txn, dbi, &cursor));
	sprintf(kval, "%03x", values[0]);
	key.mv_size = sizeof(int);
	key.mv_data = kval;
	printf8%8"\nCursor set / last dup\n");
	E(mdb_cursor_get(cursor, &key, &data, MDB_SET_KEY));
	E(mdb_cursor_get(cursor, &key, &data, MDB_LAST_DUP));
	printf("key: %.*s, data: %.*s\n",
			(int) key.mv_size,  (char *) key.mv_data,
			(int) data.mv_size, (char *) data.mv_data);

	printf("Cursor prev\n");
        j=0;
	while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_PREV_DUP)) == 0) {
		printf("key: %.*s, data: %.*s\n",
			(int) key.mv_size,  (char *) key.mv_data,
			(int) data.mv_size, (char *) data.mv_data);
                j++;
                if (j == 1) {
		    printf("Delete the above key/data\n");
                    if (RES(MDB_NOTFOUND, mdb_cursor_del(cursor, 0))) {
	                mdb_cursor_close(cursor);
			mdb_txn_abort(txn);
                        break;
                    }
                }
                if (j > count) {
                   printf("Should not be there\n");
                   break;
                }
	}
        if (j > 1) {
	    mdb_cursor_close(cursor);
	    E(mdb_txn_commit(txn));
        } 

	mdb_dbi_close(env, dbi);
	mdb_env_close(env);
        free (values);
	return 0;
}

===

Output:

Adding 10 values
key: 0x7f2c31981e9c 000, data: 0x7f2c31981fe0 000 0 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981fb8 00a 10 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981f90 014 20 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981f68 01e 30 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981f40 028 40 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981f18 032 50 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981ef0 03c 60 foo bar
key: 0x7f2c31981e9c 000, data: 0x7f2c31981ec8 046 70 foo bar
key: 0x7f2c31981e2c 050, data: 0x7f2c31981e74 050 80 foo bar
key: 0x7f2c31981e2c 050, data: 0x7f2c31981e4c 05a 90 foo bar

Cursor set / first dup
key: 000, data: 000 0 foo bar
Cursor next
key: 000, data: 00a 10 foo bar
delete the above key/data
key: 000, data: 014 20 foo bar  <--- Expected to get 30, 40, ... 70 for
MDB_NEXT_DUP
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 014 20 foo bar  <---- But got infinite loop if not break by
counter
Should not be there

Cursor set / last dup 
key: 000, data: 046 70 foo bar
Cursor prev
key: 000, data: 03c 60 foo bar
Delete the above key/data
key: 000, data: 032 50 foo bar  <--- works as expected in the reverse direction
key: 000, data: 028 40 foo bar
key: 000, data: 01e 30 foo bar
key: 000, data: 014 20 foo bar
key: 000, data: 000 0 foo bar