parent
fec539f22b
commit
49637560c6
@ -0,0 +1,72 @@
|
||||
diff -up rsyslog-8.2102.0/parse.h.orig rsyslog-8.2102.0/parse.h
|
||||
--- rsyslog-8.2102.0/parse.h.orig 2023-05-09 09:10:09.236597063 +0200
|
||||
+++ rsyslog-8.2102.0/parse.h 2023-05-09 09:10:26.913608034 +0200
|
||||
@@ -56,7 +56,7 @@ struct rsParsObject
|
||||
rsObjID OID; /**< object ID */
|
||||
#endif
|
||||
cstr_t *pCStr; /**< pointer to the string object we are parsing */
|
||||
- int iCurrPos; /**< current parsing position (char offset) */
|
||||
+ size_t iCurrPos; /**< current parsing position (char offset) */
|
||||
};
|
||||
typedef struct rsParsObject rsParsObj;
|
||||
|
||||
diff -up rsyslog-8.2102.0/runtime/stream.c.orig rsyslog-8.2102.0/runtime/stream.c
|
||||
--- rsyslog-8.2102.0/runtime/stream.c.orig 2023-05-09 09:10:34.122612508 +0200
|
||||
+++ rsyslog-8.2102.0/runtime/stream.c 2023-05-09 09:12:47.934640583 +0200
|
||||
@@ -1071,7 +1071,7 @@ strmReadMultiLine(strm_t *pThis, cstr_t
|
||||
cstr_t *thisLine = NULL;
|
||||
rsRetVal readCharRet;
|
||||
const time_t tCurr = pThis->readTimeout ? getTime(NULL) : 0;
|
||||
- int maxMsgSize = glblGetMaxLine();
|
||||
+ size_t maxMsgSize = glblGetMaxLine();
|
||||
DEFiRet;
|
||||
|
||||
do {
|
||||
@@ -1132,9 +1132,9 @@ strmReadMultiLine(strm_t *pThis, cstr_t
|
||||
}
|
||||
|
||||
|
||||
- int currLineLen = cstrLen(thisLine);
|
||||
+ size_t currLineLen = cstrLen(thisLine);
|
||||
if(currLineLen > 0) {
|
||||
- int len;
|
||||
+ size_t len;
|
||||
if((len = cstrLen(pThis->prevMsgSegment) + currLineLen) <
|
||||
maxMsgSize) {
|
||||
CHKiRet(cstrAppendCStr(pThis->prevMsgSegment, thisLine));
|
||||
@@ -1144,7 +1144,7 @@ strmReadMultiLine(strm_t *pThis, cstr_t
|
||||
len = 0;
|
||||
} else {
|
||||
len = currLineLen-(len-maxMsgSize);
|
||||
- for(int z=0; z<len; z++) {
|
||||
+ for(size_t z=0; z<len; z++) {
|
||||
cstrAppendChar(pThis->prevMsgSegment,
|
||||
thisLine->pBuf[z]);
|
||||
}
|
||||
diff -up rsyslog-8.2102.0/runtime/stringbuf.c.orig rsyslog-8.2102.0/runtime/stringbuf.c
|
||||
--- rsyslog-8.2102.0/runtime/stringbuf.c.orig 2023-05-09 09:09:37.627577446 +0200
|
||||
+++ rsyslog-8.2102.0/runtime/stringbuf.c 2023-05-09 09:09:59.061590749 +0200
|
||||
@@ -474,7 +474,7 @@ finalize_it:
|
||||
* This is due to performance reasons.
|
||||
*/
|
||||
#ifndef NDEBUG
|
||||
-int cstrLen(cstr_t *pThis)
|
||||
+size_t cstrLen(cstr_t *pThis)
|
||||
{
|
||||
rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
|
||||
return(pThis->iStrLen);
|
||||
diff -up rsyslog-8.2102.0/runtime/stringbuf.h.orig rsyslog-8.2102.0/runtime/stringbuf.h
|
||||
--- rsyslog-8.2102.0/runtime/stringbuf.h.orig 2023-05-09 09:08:05.199520082 +0200
|
||||
+++ rsyslog-8.2102.0/runtime/stringbuf.h 2023-05-09 09:09:26.924570803 +0200
|
||||
@@ -144,9 +144,9 @@ rsRetVal cstrAppendCStr(cstr_t *pThis, c
|
||||
|
||||
/* now come inline-like functions */
|
||||
#ifdef NDEBUG
|
||||
-# define cstrLen(x) ((int)((x)->iStrLen))
|
||||
+# define cstrLen(x) ((size_t)((x)->iStrLen))
|
||||
#else
|
||||
- int cstrLen(cstr_t *pThis);
|
||||
+ size_t cstrLen(cstr_t *pThis);
|
||||
#endif
|
||||
#define rsCStrLen(s) cstrLen((s))
|
||||
|
@ -0,0 +1,37 @@
|
||||
diff -up rsyslog-8.2102.0/plugins/omelasticsearch/omelasticsearch.c.orig rsyslog-8.2102.0/plugins/omelasticsearch/omelasticsearch.c
|
||||
--- rsyslog-8.2102.0/plugins/omelasticsearch/omelasticsearch.c.orig 2023-05-11 14:14:39.778187570 +0200
|
||||
+++ rsyslog-8.2102.0/plugins/omelasticsearch/omelasticsearch.c 2023-05-11 14:15:36.254234445 +0200
|
||||
@@ -232,7 +232,11 @@ static rsRetVal curlSetup(wrkrInstanceDa
|
||||
BEGINcreateInstance
|
||||
CODESTARTcreateInstance
|
||||
pData->fdErrFile = -1;
|
||||
- pthread_mutex_init(&pData->mutErrFile, NULL);
|
||||
+ if(pthread_mutex_init(&pData->mutErrFile, NULL) != 0) {
|
||||
+ LogError(errno, RS_RET_ERR, "omelasticsearch: cannot create "
|
||||
+ "error file mutex, failing this action");
|
||||
+ ABORT_FINALIZE(RS_RET_ERR);
|
||||
+ }
|
||||
pData->caCertFile = NULL;
|
||||
pData->myCertFile = NULL;
|
||||
pData->myPrivKeyFile = NULL;
|
||||
@@ -240,6 +244,7 @@ CODESTARTcreateInstance
|
||||
pData->retryRulesetName = NULL;
|
||||
pData->retryRuleset = NULL;
|
||||
pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
|
||||
+finalize_it:
|
||||
ENDcreateInstance
|
||||
|
||||
BEGINcreateWrkrInstance
|
||||
@@ -2165,10 +2170,12 @@ ENDfreeCnf
|
||||
|
||||
BEGINdoHUP
|
||||
CODESTARTdoHUP
|
||||
+ pthread_mutex_lock(&pData->mutErrFile);
|
||||
if(pData->fdErrFile != -1) {
|
||||
close(pData->fdErrFile);
|
||||
pData->fdErrFile = -1;
|
||||
}
|
||||
+ pthread_mutex_unlock(&pData->mutErrFile);
|
||||
ENDdoHUP
|
||||
|
||||
|
@ -0,0 +1,54 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index 0808c6054e..d7d6c68e60 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -116,6 +116,7 @@ typedef struct instanceConf_s {
|
||||
uchar **serverBaseUrls;
|
||||
int numServers;
|
||||
long healthCheckTimeout;
|
||||
+ long indexTimeout;
|
||||
uchar *uid;
|
||||
uchar *pwd;
|
||||
uchar *authBuf;
|
||||
@@ -187,6 +188,7 @@ static struct cnfparamdescr actpdescr[] = {
|
||||
{ "server", eCmdHdlrArray, 0 },
|
||||
{ "serverport", eCmdHdlrInt, 0 },
|
||||
{ "healthchecktimeout", eCmdHdlrInt, 0 },
|
||||
+ { "indextimeout", eCmdHdlrInt, 0 },
|
||||
{ "uid", eCmdHdlrGetWord, 0 },
|
||||
{ "pwd", eCmdHdlrGetWord, 0 },
|
||||
{ "searchindex", eCmdHdlrGetWord, 0 },
|
||||
@@ -355,6 +357,7 @@ CODESTARTdbgPrintInstInfo
|
||||
dbgprintf("\ttemplate='%s'\n", pData->tplName);
|
||||
dbgprintf("\tnumServers=%d\n", pData->numServers);
|
||||
dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout);
|
||||
+ dbgprintf("\tindexTimeout=%lu\n", pData->indexTimeout);
|
||||
dbgprintf("\tserverBaseUrls=");
|
||||
for(i = 0 ; i < pData->numServers ; ++i)
|
||||
dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]);
|
||||
@@ -1768,6 +1771,8 @@ curlPostSetup(wrkrInstanceData_t *const pWrkrData)
|
||||
PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
|
||||
curlSetupCommon(pWrkrData, pWrkrData->curlPostHandle);
|
||||
curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_POST, 1);
|
||||
+ curl_easy_setopt(pWrkrData->curlPostHandle,
|
||||
+ CURLOPT_TIMEOUT_MS, pWrkrData->pData->indexTimeout);
|
||||
}
|
||||
|
||||
#define CONTENT_JSON "Content-Type: application/json; charset=utf-8"
|
||||
@@ -1797,6 +1802,7 @@ setInstParamDefaults(instanceData *const pData)
|
||||
pData->serverBaseUrls = NULL;
|
||||
pData->defaultPort = 9200;
|
||||
pData->healthCheckTimeout = 3500;
|
||||
+ pData->indexTimeout = 0;
|
||||
pData->uid = NULL;
|
||||
pData->pwd = NULL;
|
||||
pData->authBuf = NULL;
|
||||
@@ -1865,6 +1871,8 @@ CODESTARTnewActInst
|
||||
pData->defaultPort = (int) pvals[i].val.d.n;
|
||||
} else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) {
|
||||
pData->healthCheckTimeout = (long) pvals[i].val.d.n;
|
||||
+ } else if(!strcmp(actpblk.descr[i].name, "indextimeout")) {
|
||||
+ pData->indexTimeout = (long) pvals[i].val.d.n;
|
||||
} else if(!strcmp(actpblk.descr[i].name, "uid")) {
|
||||
pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
|
@ -0,0 +1,43 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index 0808c6054e..ed9359732c 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -877,14 +877,6 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
||||
int i;
|
||||
int numitems;
|
||||
fjson_object *items=NULL, *jo_errors = NULL;
|
||||
- int errors = 0;
|
||||
-
|
||||
- if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
|
||||
- errors = fjson_object_get_boolean(jo_errors);
|
||||
- if (!errors && pWrkrData->pData->retryFailures) {
|
||||
- return RS_RET_OK;
|
||||
- }
|
||||
- }
|
||||
|
||||
/*iterate over items*/
|
||||
if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
|
||||
@@ -897,6 +889,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
||||
|
||||
numitems = fjson_object_array_length(items);
|
||||
|
||||
+ int errors = 0;
|
||||
+ if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
|
||||
+ errors = fjson_object_get_boolean(jo_errors);
|
||||
+ if (!errors && pWrkrData->pData->retryFailures) {
|
||||
+ STATSCOUNTER_ADD(indexSuccess, mutIndexSuccess, numitems);
|
||||
+ return RS_RET_OK;
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
if (reqmsg) {
|
||||
DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
|
||||
} else {
|
||||
@@ -1267,6 +1268,7 @@ getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
|
||||
response);
|
||||
}
|
||||
}
|
||||
+
|
||||
need_free_omes = 0;
|
||||
CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
|
||||
MsgSetRuleset(msg, ctx->retryRuleset);
|
@ -0,0 +1,148 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index ed9359732c..8200403eaf 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -86,12 +86,14 @@ STATSCOUNTER_DEF(rebinds, mutRebinds)
|
||||
static prop_t *pInputName = NULL;
|
||||
|
||||
# define META_STRT "{\"index\":{\"_index\": \""
|
||||
-# define META_STRT_CREATE "{\"create\":{\"_index\": \""
|
||||
+# define META_STRT_CREATE "{\"create\":{" /* \"_index\": \" */
|
||||
+# define META_IX "\"_index\": \""
|
||||
# define META_TYPE "\",\"_type\":\""
|
||||
# define META_PIPELINE "\",\"pipeline\":\""
|
||||
# define META_PARENT "\",\"_parent\":\""
|
||||
# define META_ID "\", \"_id\":\""
|
||||
# define META_END "\"}}\n"
|
||||
+# define META_END_NOQUOTE " }}\n"
|
||||
|
||||
typedef enum {
|
||||
ES_WRITE_INDEX,
|
||||
@@ -362,8 +364,8 @@ CODESTARTdbgPrintInstInfo
|
||||
dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
|
||||
dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
|
||||
dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
|
||||
- dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
|
||||
- dbgprintf("\tsearch type='%s'\n", pData->searchType);
|
||||
+ dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL ? (uchar*)"(not configured)" : pData->searchIndex);
|
||||
+ dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL ? (uchar*)"(not configured)" : pData->searchType);
|
||||
dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
|
||||
dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
|
||||
dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty);
|
||||
@@ -596,8 +598,8 @@ getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls,
|
||||
}
|
||||
|
||||
done:
|
||||
- assert(srchIndex != NULL);
|
||||
- assert(srchType != NULL);
|
||||
+ //assert(srchIndex != NULL);
|
||||
+ //assert(srchType != NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -633,9 +635,14 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||||
parent = NULL;
|
||||
} else {
|
||||
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||||
- r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||||
- if(r == 0) r = es_addChar(&url, '/');
|
||||
- if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
|
||||
+ if(searchIndex != NULL) {
|
||||
+ r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||||
+ if(r == 0) r = es_addChar(&url, '/');
|
||||
+ if(searchType != NULL) {
|
||||
+ if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
|
||||
+ }
|
||||
+ } else
|
||||
+ r = 0;
|
||||
if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||||
if(r == 0) r = es_addChar(&url, separator);
|
||||
if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
|
||||
@@ -692,7 +699,11 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
|
||||
uchar *pipelineName;
|
||||
|
||||
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||||
- r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType);
|
||||
+ r += ustrlen((char *)message);
|
||||
+ if(searchIndex != NULL)
|
||||
+ r += ustrlen(searchIndex);
|
||||
+ if(searchType != NULL)
|
||||
+ r += ustrlen(searchType);
|
||||
|
||||
if(parent != NULL) {
|
||||
r += sizeof(META_PARENT)-1 + ustrlen(parent);
|
||||
@@ -717,6 +728,7 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||||
{
|
||||
int length = strlen((char *)message);
|
||||
int r;
|
||||
+ int endQuote = 1;
|
||||
uchar *searchIndex = NULL;
|
||||
uchar *searchType;
|
||||
uchar *parent = NULL;
|
||||
@@ -725,28 +737,43 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||||
DEFiRet;
|
||||
|
||||
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||||
- if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
||||
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) {
|
||||
r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
|
||||
- else
|
||||
+ endQuote = 0;
|
||||
+ } else
|
||||
r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
|
||||
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
||||
+ if(searchIndex != NULL) {
|
||||
+ endQuote = 1;
|
||||
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_IX, sizeof(META_IX)-1);
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
||||
ustrlen(searchIndex));
|
||||
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
||||
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
|
||||
+ if(searchType != NULL) {
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
|
||||
ustrlen(searchType));
|
||||
+ }
|
||||
+ }
|
||||
if(parent != NULL) {
|
||||
+ endQuote = 1;
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
|
||||
}
|
||||
if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||||
+ endQuote = 1;
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName));
|
||||
}
|
||||
if(bulkId != NULL) {
|
||||
+ endQuote = 1;
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
|
||||
}
|
||||
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
|
||||
+ if(endQuote == 0) {
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END_NOQUOTE, sizeof(META_END_NOQUOTE)-1);
|
||||
+ } else {
|
||||
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
|
||||
+ }
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
|
||||
if(r != 0) {
|
||||
@@ -2094,6 +2121,8 @@ CODESTARTnewActInst
|
||||
CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
|
||||
}
|
||||
|
||||
+ //Only needed befor ES-Version 7.x
|
||||
+ /*
|
||||
if(pData->searchIndex == NULL)
|
||||
pData->searchIndex = (uchar*) strdup("system");
|
||||
if(pData->searchType == NULL)
|
||||
@@ -2104,6 +2133,7 @@ CODESTARTnewActInst
|
||||
"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
+ */
|
||||
|
||||
if (pData->retryFailures) {
|
||||
CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
|
@ -0,0 +1,118 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index 8200403eaf..8b74d610df 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -130,6 +130,7 @@ typedef struct instanceConf_s {
|
||||
uchar *timeout;
|
||||
uchar *bulkId;
|
||||
uchar *errorFile;
|
||||
+ int esVersion;
|
||||
sbool errorOnly;
|
||||
sbool interleaved;
|
||||
sbool dynSrchIdx;
|
||||
@@ -221,7 +222,8 @@ static struct cnfparamdescr actpdescr[] = {
|
||||
{ "ratelimit.interval", eCmdHdlrInt, 0 },
|
||||
{ "ratelimit.burst", eCmdHdlrInt, 0 },
|
||||
{ "retryruleset", eCmdHdlrString, 0 },
|
||||
- { "rebindinterval", eCmdHdlrInt, 0 }
|
||||
+ { "rebindinterval", eCmdHdlrInt, 0 },
|
||||
+ { "esversion.major", eCmdHdlrPositiveInt, 0 }
|
||||
};
|
||||
static struct cnfparamblk actpblk =
|
||||
{ CNFPARAMBLK_VERSION,
|
||||
@@ -246,6 +248,7 @@ CODESTARTcreateInstance
|
||||
pData->retryRulesetName = NULL;
|
||||
pData->retryRuleset = NULL;
|
||||
pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
|
||||
+ pData->esVersion = 0;
|
||||
finalize_it:
|
||||
ENDcreateInstance
|
||||
|
||||
@@ -364,8 +367,10 @@ CODESTARTdbgPrintInstInfo
|
||||
dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
|
||||
dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
|
||||
dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
|
||||
- dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL ? (uchar*)"(not configured)" : pData->searchIndex);
|
||||
- dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL ? (uchar*)"(not configured)" : pData->searchType);
|
||||
+ dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL
|
||||
+ ? (uchar*)"(not configured)" : pData->searchIndex);
|
||||
+ dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL
|
||||
+ ? (uchar*)"(not configured)" : pData->searchType);
|
||||
dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
|
||||
dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
|
||||
dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty);
|
||||
@@ -598,8 +603,6 @@ getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls,
|
||||
}
|
||||
|
||||
done:
|
||||
- //assert(srchIndex != NULL);
|
||||
- //assert(srchType != NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -700,11 +703,12 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
|
||||
|
||||
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||||
r += ustrlen((char *)message);
|
||||
- if(searchIndex != NULL)
|
||||
- r += ustrlen(searchIndex);
|
||||
- if(searchType != NULL)
|
||||
- r += ustrlen(searchType);
|
||||
-
|
||||
+ if(searchIndex != NULL) {
|
||||
+ r += ustrlen(searchIndex);
|
||||
+ }
|
||||
+ if(searchType != NULL) {
|
||||
+ r += ustrlen(searchType);
|
||||
+ }
|
||||
if(parent != NULL) {
|
||||
r += sizeof(META_PARENT)-1 + ustrlen(parent);
|
||||
}
|
||||
@@ -728,7 +732,7 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||||
{
|
||||
int length = strlen((char *)message);
|
||||
int r;
|
||||
- int endQuote = 1;
|
||||
+ int endQuote = 1;
|
||||
uchar *searchIndex = NULL;
|
||||
uchar *searchType;
|
||||
uchar *parent = NULL;
|
||||
@@ -1990,6 +1994,8 @@ CODESTARTnewActInst
|
||||
pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
|
||||
pData->rebindInterval = (int) pvals[i].val.d.n;
|
||||
+ } else if(!strcmp(actpblk.descr[i].name, "esversion.major")) {
|
||||
+ pData->esVersion = pvals[i].val.d.n;
|
||||
} else {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omelasticsearch: program error, "
|
||||
"non-handled param '%s'", actpblk.descr[i].name);
|
||||
@@ -2121,19 +2127,18 @@ CODESTARTnewActInst
|
||||
CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
|
||||
}
|
||||
|
||||
- //Only needed befor ES-Version 7.x
|
||||
- /*
|
||||
- if(pData->searchIndex == NULL)
|
||||
- pData->searchIndex = (uchar*) strdup("system");
|
||||
- if(pData->searchType == NULL)
|
||||
- pData->searchType = (uchar*) strdup("events");
|
||||
+ if(pData->esVersion < 8) {
|
||||
+ if(pData->searchIndex == NULL)
|
||||
+ pData->searchIndex = (uchar*) strdup("system");
|
||||
+ if(pData->searchType == NULL)
|
||||
+ pData->searchType = (uchar*) strdup("events");
|
||||
|
||||
- if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
|
||||
- LogError(0, RS_RET_CONFIG_ERROR,
|
||||
- "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
|
||||
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
+ if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
|
||||
+ LogError(0, RS_RET_CONFIG_ERROR,
|
||||
+ "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
|
||||
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
+ }
|
||||
}
|
||||
- */
|
||||
|
||||
if (pData->retryFailures) {
|
||||
CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
|
@ -0,0 +1,40 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index 76d5081d3b..f481ec3f7e 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -620,6 +620,8 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||||
uchar *parent;
|
||||
uchar *bulkId;
|
||||
char* baseUrl;
|
||||
+ /* since 7.0, the API always requires /idx/_doc, so use that if searchType is not explicitly set */
|
||||
+ uchar* actualSearchType = (uchar*)"_doc";
|
||||
es_str_t *url;
|
||||
int r;
|
||||
DEFiRet;
|
||||
@@ -645,11 +647,12 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||||
if(searchIndex != NULL) {
|
||||
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||||
if(r == 0) r = es_addChar(&url, '/');
|
||||
- if(searchType != NULL) {
|
||||
- if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
|
||||
- }
|
||||
- } else
|
||||
- r = 0;
|
||||
+
|
||||
+ if(searchType != NULL) {
|
||||
+ actualSearchType = searchType;
|
||||
+ }
|
||||
+ if(r == 0) r = es_addChar(&url, '/');
|
||||
+ if(r == 0) r = es_addBuf(&url, (char*)actualSearchType, ustrlen(actualSearchType));
|
||||
if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||||
if(r == 0) r = es_addChar(&url, separator);
|
||||
if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
|
||||
@@ -693,7 +696,7 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
|
||||
const uchar *const message,
|
||||
uchar **const tpls)
|
||||
{
|
||||
- size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
|
||||
+ size_t r = sizeof(META_END)-1 + sizeof("\n")-1;
|
||||
if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
||||
r += sizeof(META_STRT_CREATE)-1;
|
||||
else
|
@ -0,0 +1,53 @@
|
||||
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||||
index f481ec3f7e..b297a9274f 100644
|
||||
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||||
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||||
@@ -623,7 +623,7 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||||
/* since 7.0, the API always requires /idx/_doc, so use that if searchType is not explicitly set */
|
||||
uchar* actualSearchType = (uchar*)"_doc";
|
||||
es_str_t *url;
|
||||
- int r;
|
||||
+ int r = 0;
|
||||
DEFiRet;
|
||||
instanceData *const pData = pWrkrData->pData;
|
||||
char separator;
|
||||
@@ -646,13 +646,12 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||||
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||||
if(searchIndex != NULL) {
|
||||
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||||
+ if(searchType != NULL && searchType[0] != '\0') {
|
||||
+ actualSearchType = searchType;
|
||||
+ }
|
||||
if(r == 0) r = es_addChar(&url, '/');
|
||||
-
|
||||
- if(searchType != NULL) {
|
||||
- actualSearchType = searchType;
|
||||
+ if(r == 0) r = es_addBuf(&url, (char*)actualSearchType, ustrlen(actualSearchType));
|
||||
}
|
||||
- if(r == 0) r = es_addChar(&url, '/');
|
||||
- if(r == 0) r = es_addBuf(&url, (char*)actualSearchType, ustrlen(actualSearchType));
|
||||
if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||||
if(r == 0) r = es_addChar(&url, separator);
|
||||
if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
|
||||
@@ -714,7 +713,11 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
|
||||
r += ustrlen(searchIndex);
|
||||
}
|
||||
if(searchType != NULL) {
|
||||
- r += ustrlen(searchType);
|
||||
+ if(searchType[0] == '\0') {
|
||||
+ r += 4; // "_doc"
|
||||
+ } else {
|
||||
+ r += ustrlen(searchType);
|
||||
+ }
|
||||
}
|
||||
if(parent != NULL) {
|
||||
r += sizeof(META_PARENT)-1 + ustrlen(parent);
|
||||
@@ -759,7 +762,7 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_IX, sizeof(META_IX)-1);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
||||
ustrlen(searchIndex));
|
||||
- if(searchType != NULL) {
|
||||
+ if(searchType != NULL && searchType[0] != '\0') {
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
||||
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
|
||||
ustrlen(searchType));
|
Loading…
Reference in new issue