Project

General

Profile

Bug #2524 » main.cpp

Xiaopong Tran, 06/08/2012 03:15 AM

 
/*
* Copyright (C) 2012, xp@renzhi.ca
* All rights reserved.
*/

#include <iostream>
#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <rados/librados.h>

using namespace std;

void print_usage()
{
cout << endl;
cout << "Usage: " << endl;
cout << " test_rados -n num -p pool -i file" << endl;
cout << endl;
cout << " where" << endl;
cout << " num Number of client threads" << endl;
cout << " pool Pool name to write to" << endl;
cout << " file Input file to write to pool" << endl;
}

void * worker(void * arg);

typedef struct
{
int id;
char * filename;
char * pool;
} param_t;

const char * conf_file = "./etc/ceph.conf";

int main(int argc, char *argv[])
{
if (argc < 4)
{
print_usage();
return -1;
}

int num_threads = 0;
char * pool_name;
char * filename;

int c;
opterr = 0;
while ((c = getopt (argc, argv, "n:p:i:")) != -1)
{
switch(c)
{
case 'n':
num_threads = atoi(optarg);
break;
case 'p':
pool_name = optarg;
break;
case 'i':
filename = optarg;
break;
case '?':
print_usage();
return 1;
default:
if (isprint (optopt))
cout << "Unknown option `-%c'." << optopt << endl;
else
cout << "Unknown option character `\\x%x'." << optopt << endl;
break;
}
}

pthread_t *threads;
pthread_attr_t pthread_custom_attr;
param_t * parms;

threads = (pthread_t *) malloc(num_threads * sizeof(pthread_t));
pthread_attr_init(&pthread_custom_attr);
parms = (param_t *)malloc(sizeof(param_t) * num_threads);

for (int i = 0; i < num_threads; ++i)
{
parms[i].id = i;
parms[i].filename = filename;
parms[i].pool = pool_name;
pthread_create(&threads[i], &pthread_custom_attr, worker, (void *)(parms+i));
}

for (int i = 0; i < num_threads; ++i)
{
pthread_join(threads[i], NULL);
}

free(parms);
return 0;
}


void * worker(void * arg)
{
param_t * param = (param_t *) arg;
rados_t cluster;
cout << "Thread " << param->id << ": creating clusting handle" << endl;
int err = rados_create(&cluster, NULL);
if (err < 0)
{
cout << "Thread " << param->id << ": Cannot create a cluster handle: " << strerror(-err) << endl;
return NULL;
}
cout << "Thread " << param->id << ": reading conf file" << endl;
err = rados_conf_read_file(cluster, conf_file);
if (err < 0)
{
cout << "Thread " << param->id << ": Cannot read config file:" << strerror(-err) << endl;
return NULL;
}
cout << "Thread " << param->id << ": connecting to cluster" << endl;
err = rados_connect(cluster);
if (err < 0)
{
cout << "Thread " << param->id << ": Cannot connect to cluster: " << strerror(-err) << endl;
return NULL;
}
cout << "Thread " << param->id << ": opening rados pool" << endl;
rados_ioctx_t io;
err = rados_ioctx_create(cluster, param->pool, &io);
if (err < 0)
{
cout << "Thread " << param->id << ": Cannot open rados pool: " << strerror(-err) << endl;
rados_shutdown(cluster);
return NULL;
}
char buf[1024];
size_t len_read, len_write;
size_t total = 0;
FILE * fr = fopen(param->filename, "rb");
if (!fr)
{
cout << "Thread " << param->id << ": Error opening for read: " << param->filename << endl;
rados_shutdown(cluster);
return NULL;
}
// Use the thread id as the oid
char oid[64];
memset(oid, 0, 64);
sprintf(oid, "%d", param->id);
while (!feof(fr) && !ferror(fr))
{
memset(buf, 0, 1024);
len_read = fread(buf, 1, 1024, fr);
len_write = rados_write(io, oid, buf, len_read, total);
if (len_read != len_write)
{
cout << "Thread " << param->id << ": Error writing: read=" << len_read << ", write=" << len_write << endl;
break;
}
total += len_write;
cout << "Thread " << param->id << ": Reading and writing: " << total << endl;
}

fclose(fr);

cout << "Thread " << param->id << ": Destroying IO context" << endl;
rados_ioctx_destroy(io);
cout << "Thread " << param->id << ": Disconnect from cluster" << endl;
rados_shutdown(cluster);

return NULL;
}
(3-3/4)