Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions src/binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,17 @@ static void set_resp_error(ch_binary_response_t * resp, const char * str)
*/
static QuerySettings ch_binary_settings(const ch_query *query)
{
ListCell *lc;
auto res = QuerySettings{};
foreach (lc, (List *) query->settings)
{
/*
* foreach reads a non-const, so we have to cast. Would be nice to use
* foreach_ptr:
*
* foreach_ptr(DefElem, setting, query->settings)
*
* But it's only available in Postgres 17 and later.
*/
DefElem *setting = (DefElem *) lfirst(lc);
res.insert_or_assign(setting->defname, QuerySettingsField{strVal(setting->arg), 1});
}
int i;
kv_pair *pair;
auto res = QuerySettings{};

return res;
for (i = 0; i < query->settings->length; i++)
{
pair = query->settings->items[i];
res.insert_or_assign(pair->name, QuerySettingsField{pair->value, 1});
}

return res;
}

static void set_state_error(ch_binary_read_state_t * state, const char * str)
Expand Down
11 changes: 6 additions & 5 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ ch_http_simple_query(ch_http_connection_t * conn, const ch_query * query)
static char errbuffer[CURL_ERROR_SIZE];
struct curl_slist *headers = NULL;
CURLU *cu = curl_url();
ListCell *lc;
DefElem *setting;
int i;
kv_pair *pair;
char *buf = NULL;

ch_http_response_t *resp = calloc(sizeof(ch_http_response_t), 1);
Expand All @@ -178,10 +178,11 @@ ch_http_simple_query(ch_http_connection_t * conn, const ch_query * query)
pfree(buf);

/* Append each of the settings as a query param. */
foreach(lc, (List *) query->settings)

for (i = 0; i < query->settings->length; i++)
{
setting = (DefElem *) lfirst(lc);
buf = psprintf("%s=%s", setting->defname, strVal(setting->arg));
pair = query->settings->items[i];
buf = psprintf("%s=%s", pair->name, pair->value);
curl_url_set(cu, CURLUPART_QUERY, buf, CURLU_APPENDQUERY | CURLU_URLENCODE);
pfree(buf);
}
Expand Down
10 changes: 5 additions & 5 deletions src/include/engine.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef CLICKHOUSE_ENGINE_H
#define CLICKHOUSE_ENGINE_H

#include "nodes/pathnodes.h"
#include "kv_list.h"

/*
* ch_connection_details defines the details for connecting to ClickHouse.
Expand All @@ -20,10 +20,10 @@ typedef struct
*/
typedef struct
{
const char *sql;
const List *settings;
} ch_query;
const char *sql;
const kv_list *settings;
} ch_query;

#define new_query(sql) {sql, chfdw_parse_options(ch_session_settings, true, false)}
#define new_query(sql) {sql, chfdw_get_session_settings()}

#endif /* CLICKHOUSE_ENGINE_H */
3 changes: 2 additions & 1 deletion src/include/fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ extern void chfdw_report_error(int elevel, ch_connection conn,
bool clear, const char *sql);

/* in option.c */
extern char *ch_session_settings;
extern kv_list * chfdw_get_session_settings(void);

extern void
chfdw_extract_options(List * defelems, char **driver, char **host, int *port,
char **dbname, char **username, char **password);
Expand Down
53 changes: 53 additions & 0 deletions src/include/kv_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#ifndef PG_CLICKHOUSE_KV_LIST_H
#define PG_CLICKHOUSE_KV_LIST_H

#include <stdbool.h>
#include "postgres.h"
#include "nodes/pathnodes.h"

/* A key/value pair. */
typedef struct kv_pair
{
char *name;
char *value;
} kv_pair;

/*
* A simple data structure with a list of key/value string pairs. Use
* new_kv_list_from_list() to create.
*
* kv_list * pairs = new_kv_list_from_pg_list(list);
* if (!kv_list)
* {
* printf("out of memory\n");
* exit(1);
* }
*
* for (int i = 0; i < kv_list->length; i++) {
* kv_pair * pair = kv_list->items[i];
* printf("%s => %s\n", pair->name, pair->value);
* }
*
* kv_list_free(pairs);
*/
typedef struct kv_list
{
int length;
kv_pair **items;
} kv_list;

/*
* Create an empty kv_list, with length set to zero and items uninitialized.
*/
kv_list * new_empty_kv_list(void);

/*
* Create a new kv_list from a PostgreSQL List of DefElem. Logs an error
* message and returns NULL on memory errors.
*/
kv_list * new_kv_list_from_pg_list(List * list);

/* Frees the memory owned by the kv_list. */
void kv_list_free(kv_list * pairs);

#endif /* PG_CLICKHOUSE_KV_LIST_H */
88 changes: 88 additions & 0 deletions src/kv_list.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include <stddef.h>
#include "kv_list.h"
#include "utils/elog.h"
#include "nodes/pathnodes.h"
#include "utils/guc.h"

extern kv_list * new_empty_kv_list(void)
{
kv_list *pairs = (kv_list *) malloc(sizeof(kv_list));

if (pairs == NULL)
{
ereport(LOG, errcode(ERRCODE_FDW_OUT_OF_MEMORY), errmsg("out of memory"));
return NULL;
}
pairs->length = 0;
return pairs;
}

extern kv_list * new_kv_list_from_pg_list(List * list)
{
ListCell *lc;
DefElem *elem;
kv_list *pairs;
int i = 0;

/* Allocate the memory for the pairs object. */
pairs = (kv_list *) malloc(sizeof(kv_list));
if (pairs == NULL)
{
ereport(LOG, errcode(ERRCODE_FDW_OUT_OF_MEMORY), errmsg("out of memory"));
return NULL;
}

/* Allocate the memory for kv_pair pointers to all the items in list. */
pairs->items = (kv_pair * *) malloc(list_length(list) * sizeof(kv_pair *));
if (pairs->items == NULL)
{
free(pairs);
ereport(LOG, errcode(ERRCODE_FDW_OUT_OF_MEMORY), errmsg("out of memory"));
return NULL;
}
pairs->length = list_length(list);

/* Copy the values from list into pairs. */
foreach(lc, list)
{
kv_pair *pair = malloc(sizeof(kv_pair));

if (!pair)
{
kv_list_free(pairs);
ereport(LOG, errcode(ERRCODE_FDW_OUT_OF_MEMORY), errmsg("out of memory"));
return NULL;
}
elem = (DefElem *) lfirst(lc);
pair->name = strdup(elem->defname);
pair->value = strdup(strVal(elem->arg));
if (!pair->name || !pair->value)
{
kv_list_free(pairs);
ereport(LOG, errcode(ERRCODE_FDW_OUT_OF_MEMORY), errmsg("out of memory"));
return NULL;
}
pairs->items[i] = pair;
i++;
}

return pairs;
}

extern void
kv_list_free(kv_list * pairs)
{
if (pairs == NULL)
return;
for (int i = 0; i < pairs->length; i++)
if (pairs->items[i] != NULL)
{
if (pairs->items[i]->name)
free(pairs->items[i]->name);
if (pairs->items[i]->value)
free(pairs->items[i]->value);
free(pairs->items[i]);
}
free(pairs->items);
free(pairs);
}
71 changes: 59 additions & 12 deletions src/option.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "postgres.h"

#include "fdw.h"
#include "kv_list.h"

#include "access/reloptions.h"
#include "catalog/pg_foreign_server.h"
Expand All @@ -25,6 +26,7 @@
#include "nodes/makefuncs.h"
#include "utils/guc.h"
#include "utils/varlena.h"
#include "utils/builtins.h"

static char *DEFAULT_DBNAME = "default";

Expand Down Expand Up @@ -66,7 +68,8 @@ static const ChFdwOption ch_options[] =
/*
* GUC parameters
*/
char *ch_session_settings = NULL;
static char *ch_session_settings = NULL;
static kv_list * ch_session_settings_list = NULL;

/*
* Helper functions
Expand Down Expand Up @@ -469,35 +472,79 @@ chfdw_parse_options(const char *options_string, bool with_comma, bool with_equal
/*
* Now that we have the name and the value, store the record.
*/
options = lappend(options, makeDefElem(strdup(pname), (Node *) makeString(strdup(pval)), -1));
options = lappend(options, makeDefElem(pstrdup(pname), (Node *) makeString(pstrdup(pval)), -1));
}

return options;
}

/*
* check_settings_guc
*
* Return the current list of current session settings parsed from the
* session_settings GUC.
*/
kv_list *
chfdw_get_session_settings()
{
return ch_session_settings_list;
}

/*
* Validates the provided settings key/value pairs.
*/
static bool
check_settings_guc(char **newval, void **extra, GucSource source)
chfdw_check_settings_guc(char **newval, void **extra, GucSource source)
{
/*
* The value may be an empty string, so we have to accept that value.
* Leave extra unset; chfdw_settings_assign_hook() will assign NULL to
* ch_session_settings_list.
*/
if (*newval == NULL || *newval[0] == '\0')
return true;

/*
* Make sure we can parse the settings.
*/
/* Make sure we can parse the settings. */
chfdw_parse_options(*newval, true, false);

/* All good if no errors. */
return true;
}

/*
* Assigns the kv_list stored in extra to the ch_session_settings_list global.
*/
static void
chfdw_settings_assign_hook(const char *newval, void *extra)
{
/*
* All good if no error.
* From PostgreSQL's POV: (a) failure here is not acceptable, and (b) it
* is not necessarily called inside a transaction, so e.g. catalog lookups
* are not okay. IOW, keep it as simple as possible, and leave error
* returning behavior to chfdw_check_settings_guc(). Therefore wrap the
* parsing in PG_TRY() to prevent errors.
*/
return true;
kv_list_free(ch_session_settings_list);
PG_TRY();
{
if (newval == NULL || newval[0] == '\0')
ch_session_settings_list = new_empty_kv_list();
else
{
List *list = chfdw_parse_options(newval, true, false);

ch_session_settings_list = new_kv_list_from_pg_list(list);
}
}
PG_CATCH();
{
/*
* This should not happen, since chfdw_check_settings_guc successfully
* parsed the options, but we could get here due to memory errors.
*/
ereport(LOG,
(errcode(ERRCODE_FDW_ERROR),
errmsg("unexpected error parsing \"%s\"", newval)));
}
PG_END_TRY();
}

/*
Expand All @@ -522,8 +569,8 @@ _PG_init(void)
"join_use_nulls 1, group_by_use_nulls 1, final 1",
PGC_USERSET,
0,
check_settings_guc,
NULL,
chfdw_check_settings_guc,
chfdw_settings_assign_hook,
NULL);

#if PG_VERSION_NUM >= 150000
Expand Down
Loading