enum client_state state;
struct show_raw *show_raw;
ZOOM_resultset resultset;
- YAZ_MUTEX mutex;
int ref_count;
char *id;
facet_limits_t facet_limits;
void client_check_preferred_watch(struct client *cl)
{
struct session *se = cl->session;
+
yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
if (se)
{
- client_unlock(cl);
- /* TODO possible threading issue. Session can have been destroyed */
- if (session_is_preferred_clients_ready(se)) {
+ assert(cl->session);
+ if (session_is_preferred_clients_ready(se))
session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
- }
else
yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
-
- client_lock(cl);
+ assert(cl->session);
}
else
yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
void client_got_records(struct client *cl)
{
struct session *se = cl->session;
- if (se)
+
+ if (reclist_get_num_records(se->reclist) > 0)
{
- if (reclist_get_num_records(se->reclist) > 0)
- {
- client_unlock(cl);
- session_alert_watch(se, SESSION_WATCH_SHOW);
- session_alert_watch(se, SESSION_WATCH_BYTARGET);
- session_alert_watch(se, SESSION_WATCH_TERMLIST);
- session_alert_watch(se, SESSION_WATCH_RECORD);
- client_lock(cl);
- }
+ session_alert_watch(se, SESSION_WATCH_SHOW);
+ session_alert_watch(se, SESSION_WATCH_BYTARGET);
+ session_alert_watch(se, SESSION_WATCH_TERMLIST);
+ session_alert_watch(se, SESSION_WATCH_RECORD);
}
}
cl->show_raw = 0;
cl->resultset = 0;
cl->suggestions = 0;
- cl->mutex = 0;
- pazpar2_mutex_create(&cl->mutex, "client");
cl->preferred = 0;
cl->ref_count = 1;
cl->facet_limits = 0;
return cl;
}
-void client_lock(struct client *c)
-{
- yaz_mutex_enter(c->mutex);
-}
-
-void client_unlock(struct client *c)
-{
- yaz_mutex_leave(c->mutex);
-}
-
void client_incref(struct client *c)
{
- pazpar2_incref(&c->ref_count, c->mutex);
+ c->ref_count++;
yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
c, client_get_id(c), c->ref_count);
}
{
yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
c, client_get_id(c), c->ref_count);
- if (!pazpar2_decref(&c->ref_count, c->mutex))
+ if (--c->ref_count == 0)
{
xfree(c->pquery);
c->pquery = 0;
{
ZOOM_resultset_destroy(c->resultset);
}
- yaz_mutex_destroy(&c->mutex);
xfree(c);
client_use(-1);
return 1;
return cl->maxrecs;
}
-void client_set_preferred(struct client *cl, int v)
-{
- cl->preferred = v;
-}
-
-
struct suggestions* client_suggestions_create(const char* suggestions_string)
{
int i;
yaz_mutex_set_name(*p, ppmutex_level, name);
}
+void pazpar2_lock_rdwr_init(Pazpar2_lock_rdwr *p)
+{
+ p->readers_reading = 0;
+ p->writers_writing = 0;
+#if YAZ_POSIX_THREADS
+ pthread_mutex_init(&p->mutex, 0);
+ pthread_cond_init(&p->lock_free, 0);
+#endif
+}
+
+void pazpar2_lock_rdwr_destroy(Pazpar2_lock_rdwr *p)
+{
+ assert (p->readers_reading == 0);
+ assert (p->writers_writing == 0);
+#if YAZ_POSIX_THREADS
+ pthread_mutex_destroy(&p->mutex);
+ pthread_cond_destroy(&p->lock_free);
+#endif
+}
+
+void pazpar2_lock_rdwr_rlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(& p->mutex);
+ while (p->writers_writing)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->readers_reading++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
+void pazpar2_lock_rdwr_wlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&p->mutex);
+ while (p->writers_writing || p->readers_reading)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->writers_writing++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
+void pazpar2_lock_rdwr_upgrade(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ assert(p->readers_reading > 0);
+ pthread_mutex_lock(&p->mutex);
+ --p->readers_reading;
+ while (p->writers_writing || p->readers_reading)
+ pthread_cond_wait(&p->lock_free, &p->mutex);
+ p->writers_writing++;
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
+void pazpar2_lock_rdwr_downgrade(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ assert(p->writers_writing == 1);
+ pthread_mutex_lock(&p->mutex);
+ p->writers_writing--;
+ p->readers_reading++;
+ pthread_cond_broadcast(&p->lock_free);
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
+void pazpar2_lock_rdwr_runlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ assert(p->readers_reading > 0);
+ pthread_mutex_lock(&p->mutex);
+ p->readers_reading--;
+ if (p->readers_reading == 0)
+ pthread_cond_signal(&p->lock_free);
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
+void pazpar2_lock_rdwr_wunlock(Pazpar2_lock_rdwr *p)
+{
+#if YAZ_POSIX_THREADS
+ assert(p->writers_writing == 1);
+ pthread_mutex_lock(&p->mutex);
+ p->writers_writing--;
+ pthread_cond_broadcast(&p->lock_free);
+ pthread_mutex_unlock(&p->mutex);
+#endif
+}
+
/*
* Local variables:
* c-basic-offset: 4
xmlFree(result);
}
-static void session_enter(struct session *s, const char *caller)
+void session_enter_ro(struct session *s, const char *caller)
{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_LOG, "Session read lock by %s", caller);
+ pazpar2_lock_rdwr_rlock(&s->lock);
+}
+
+void session_enter_rw(struct session *s, const char *caller)
+{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_LOG, "Session write lock by %s", caller);
+ pazpar2_lock_rdwr_wlock(&s->lock);
+}
+
+void session_upgrade(struct session *s, const char *caller)
+{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_LOG, "Session upgrade lock by %s", caller);
+ pazpar2_lock_rdwr_upgrade(&s->lock);
+}
+
+void session_leave_ro(struct session *s, const char *caller)
+{
+ assert(s);
+ if (caller)
+ session_log(s, YLOG_LOG, "Session read unlock by %s", caller);
+ pazpar2_lock_rdwr_runlock(&s->lock);
+}
+
+void session_leave_rw(struct session *s, const char *caller)
+{
+ assert(s);
if (caller)
- session_log(s, YLOG_DEBUG, "Session lock by %s", caller);
- yaz_mutex_enter(s->session_mutex);
+ session_log(s, YLOG_LOG, "Session write unlock by %s", caller);
+ pazpar2_lock_rdwr_wunlock(&s->lock);
}
-static void session_leave(struct session *s, const char *caller)
+void session_downgrade(struct session *s, const char *caller)
{
- yaz_mutex_leave(s->session_mutex);
+ assert(s);
if (caller)
- session_log(s, YLOG_DEBUG, "Session unlock by %s", caller);
+ session_log(s, YLOG_LOG, "Session write unlock by %s", caller);
+ pazpar2_lock_rdwr_downgrade(&s->lock);
}
static void session_normalize_facet(struct session *s,
struct http_channel *chan)
{
int ret;
- session_enter(s, "session_set_watch");
if (s->watchlist[what].fun)
ret = -1;
else
session_watch_cancel);
ret = 0;
}
- session_leave(s, "session_set_watch");
return ret;
}
void session_alert_watch(struct session *s, int what)
{
assert(s);
- session_enter(s, "session_alert_watch");
if (s->watchlist[what].fun)
{
/* our watch is no longer associated with http_channel */
s->watchlist[what].data = 0;
s->watchlist[what].obs = 0;
- session_leave(s, "session_alert_watch");
session_log(s, YLOG_DEBUG,
"Alert Watch: %d calling function: %p", what, fun);
fun(data);
}
- else
- session_leave(s,"session_alert_watch");
}
//callback for grep_databases
{
struct client_list *l;
- session_enter(se, "session_reset_active_clients");
l = se->clients_active;
se->clients_active = new_list;
- session_leave(se, "session_reset_active_clients");
while (l)
{
struct client_list *l_next = l->next;
- client_lock(l->client);
client_set_session(l->client, 0); /* mark client inactive */
- client_unlock(l->client);
xfree(l);
l = l_next;
session_reset_active_clients(se, 0);
- session_enter(se, "session_remove_cached_clients");
l = se->clients_cached;
se->clients_cached = 0;
- session_leave(se, "session_remove_cached_clients");
while (l)
{
struct client_list *l_next = l->next;
- client_lock(l->client);
client_set_session(l->client, 0);
client_set_database(l->client, 0);
- client_unlock(l->client);
client_destroy(l->client);
xfree(l);
l = l_next;
void session_sort(struct session *se, struct reclist_sortparms *sp,
const char *mergekey, const char *rank)
{
- //session_enter(se, "session_sort");
+ session_enter_rw(se, "session_sort");
session_sort_unlocked(se, sp, mergekey, rank);
- //session_leave(se, "session_sort");
+ session_leave_rw(se, "session_sort");
}
-
-enum pazpar2_error_code session_search(struct session *se,
- const char *query,
- const char *startrecs,
- const char *maxrecs,
- const char *filter,
- const char *limit,
- const char **addinfo,
- struct reclist_sortparms *sp,
- const char *mergekey,
- const char *rank)
+static
+enum pazpar2_error_code session_search_unlocked(struct session *se,
+ const char *query,
+ const char *startrecs,
+ const char *maxrecs,
+ const char *filter,
+ const char *limit,
+ const char **addinfo,
+ struct reclist_sortparms *sp,
+ const char *mergekey,
+ const char *rank)
{
int live_channels = 0;
int no_working = 0;
*addinfo = 0;
- if (se->settings_modified) {
+ if (se->settings_modified)
session_remove_cached_clients(se);
- }
else
session_reset_active_clients(se, 0);
- session_enter(se, "session_search");
se->settings_modified = 0;
if (mergekey)
live_channels = select_targets(se, filter);
if (!live_channels)
- {
- session_leave(se, "session_search");
return PAZPAR2_NO_TARGETS;
- }
facet_limits_destroy(se->facet_limits);
se->facet_limits = facet_limits_create(limit);
if (!se->facet_limits)
{
*addinfo = "limit";
- session_leave(se, "session_search");
return PAZPAR2_MALFORMED_PARAMETER_VALUE;
}
l0 = se->clients_active;
se->clients_active = 0;
- session_leave(se, "session_search");
for (l = l0; l; l = l->next)
{
else
return PAZPAR2_NO_TARGETS;
}
- session_log(se, YLOG_LOG, "session_start_search done");
return PAZPAR2_NO_ERROR;
}
+enum pazpar2_error_code session_search(struct session *se,
+ const char *query,
+ const char *startrecs,
+ const char *maxrecs,
+ const char *filter,
+ const char *limit,
+ const char **addinfo,
+ struct reclist_sortparms *sp,
+ const char *mergekey,
+ const char *rank)
+{
+ enum pazpar2_error_code c;
+ session_enter_rw(se, "session_search");
+ c = session_search_unlocked(se, query, startrecs, maxrecs, filter,
+ limit, addinfo, sp, mergekey, rank);
+ session_leave_rw(se, "session_search");
+ return c;
+}
+
// Creates a new session_database object for a database
static void session_init_databases_fun(void *context, struct database *db)
{
void session_destroy(struct session *se)
{
struct session_database *sdb;
+
+ session_enter_rw(se, "session_destroy");
session_log(se, YLOG_DEBUG, "Destroying");
session_use(-1);
session_remove_cached_clients(se);
facet_limits_destroy(se->facet_limits);
nmem_destroy(se->nmem);
service_destroy(se->service);
- yaz_mutex_destroy(&se->session_mutex);
+
+ session_leave_rw(se, "session_destroy");
+ pazpar2_lock_rdwr_destroy(&se->lock);
}
size_t session_get_memory_status(struct session *session) {
size_t session_nmem;
if (session == 0)
return 0;
- session_enter(session, "session_get_memory_status");
+ session_enter_ro(session, "session_get_memory_status");
session_nmem = nmem_total(session->nmem);
- session_leave(session, "session_get_memory_status");
+ session_leave_ro(session, "session_get_memory_status");
return session_nmem;
}
-struct session *new_session(NMEM nmem, struct conf_service *service,
- unsigned session_id)
+struct session *session_create(NMEM nmem, struct conf_service *service,
+ unsigned session_id)
{
int i;
struct session *session = nmem_malloc(nmem, sizeof(*session));
sprintf(tmp_str, "session#%u", session_id);
session->session_id = session_id;
- session_log(session, YLOG_DEBUG, "New");
+ session_log(session, YLOG_LOG, "new");
session->service = service;
session->relevance = 0;
session->total_records = 0;
session->watchlist[i].fun = 0;
}
session->normalize_cache = normalize_cache_create();
- session->session_mutex = 0;
- pazpar2_mutex_create(&session->session_mutex, tmp_str);
+
+ pazpar2_lock_rdwr_init(&session->lock);
+
session_use(1);
return session;
}
struct hitsbytarget *get_hitsbytarget(struct session *se, int *count, NMEM nmem)
{
struct hitsbytarget *p;
- session_enter(se, "get_hitsbytarget");
p = hitsbytarget_nb(se, count, nmem);
- session_leave(se, "get_hitsbytarget");
return p;
}
nmem_strsplit(nmem_tmp, ",", name, &names, &num_names);
- session_enter(se, "perform_termlist");
+ session_enter_ro(se, "perform_termlist");
for (j = 0; j < num_names; j++)
{
wrbuf_puts(c->wrbuf, "\"/>\n");
}
}
- session_leave(se, "perform_termlist");
+ session_leave_ro(se, "perform_termlist");
nmem_destroy(nmem_tmp);
}
{
struct record_cluster *r = 0;
- session_enter(se, "show_single_start");
+ session_enter_ro(se, "show_single_start");
*prev_r = 0;
*next_r = 0;
reclist_limit(se->reclist, se);
}
reclist_leave(se->reclist);
if (!r)
- session_leave(se, "show_single_start");
+ session_leave_ro(se, "show_single_start");
return r;
}
void show_single_stop(struct session *se, struct record_cluster *rec)
{
- session_leave(se, "show_single_stop");
+ session_leave_ro(se, "show_single_stop");
}
#if USE_TIMING
yaz_timing_t t = yaz_timing_create();
#endif
- session_enter(se, "show_range_start");
*sumhits = 0;
*approx_hits = 0;
*total = 0;
session_log(se, YLOG_LOG, "can not fetch more");
else
{
- show_range_stop(se, recs);
session_log(se, YLOG_LOG, "fetching more in progress");
if (session_set_watch(se, SESSION_WATCH_SHOW,
show_records_ready, chan, chan))
{
session_log(se, YLOG_WARN, "Ignoring show block");
- session_enter(se, "show_range_start");
}
else
{
+ show_range_stop(se, recs);
session_log(se, YLOG_LOG, "session watch OK");
return 0;
}
void show_range_stop(struct session *se, struct record_cluster **recs)
{
- session_leave(se, "show_range_stop");
}
void statistics(struct session *se, struct statistics *stat)
xmlFreeDoc(xdoc);
return -1;
}
- session_enter(se, "ingest_record");
- if (client_get_session(cl) == se && se->relevance)
+ assert(client_get_session(cl) == se);
+
+ if (se->relevance)
ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
- session_leave(se, "ingest_record");
xmlFreeDoc(xdoc);
return ret;