|
@@ -38,7 +38,10 @@
|
|
|
#include "../../ut.h"
|
|
|
#include "../../lib/kcore/faked_msg.h"
|
|
|
|
|
|
+#include "evapi_dispatch.h"
|
|
|
+
|
|
|
static int _evapi_notify_sockets[2];
|
|
|
+static int _evapi_netstring_format = 1;
|
|
|
|
|
|
#define EVAPI_IPADDR_SIZE 64
|
|
|
typedef struct _evapi_client {
|
|
@@ -80,7 +83,7 @@ void evapi_env_reset(evapi_env_t *evenv)
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
-void evapi_init_event_routes(void)
|
|
|
+void evapi_init_environment(int dformat)
|
|
|
{
|
|
|
memset(&_evapi_rts, 0, sizeof(evapi_evroutes_t));
|
|
|
|
|
@@ -93,6 +96,7 @@ void evapi_init_event_routes(void)
|
|
|
_evapi_rts.msg_received = route_get(&event_rt, "evapi:message-received");
|
|
|
if (_evapi_rts.msg_received < 0 || event_rt.rlist[_evapi_rts.msg_received] == NULL)
|
|
|
_evapi_rts.msg_received = -1;
|
|
|
+ _evapi_netstring_format = dformat;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -220,11 +224,12 @@ int evapi_dispatch_notify(char *obuf, int olen)
|
|
|
*/
|
|
|
void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
|
|
|
{
|
|
|
-#define CLIENT_BUFFER_SIZE 1024
|
|
|
+#define CLIENT_BUFFER_SIZE 4096
|
|
|
char rbuffer[CLIENT_BUFFER_SIZE];
|
|
|
ssize_t rlen;
|
|
|
- int i;
|
|
|
+ int i, k;
|
|
|
evapi_env_t evenv;
|
|
|
+ str frame;
|
|
|
|
|
|
if(EV_ERROR & revents) {
|
|
|
perror("received invalid event\n");
|
|
@@ -270,10 +275,47 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
|
|
|
i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
|
|
|
(int)rlen, rbuffer);
|
|
|
evenv.conidx = i;
|
|
|
- evenv.msg.s = rbuffer;
|
|
|
- evenv.msg.len = rlen;
|
|
|
evenv.eset = 1;
|
|
|
- evapi_run_cfg_route(&evenv, _evapi_rts.msg_received);
|
|
|
+ if(_evapi_netstring_format) {
|
|
|
+ /* netstring decapsulation */
|
|
|
+ k = 0;
|
|
|
+ while(k<rlen) {
|
|
|
+ frame.len = 0;
|
|
|
+ while(k<rlen) {
|
|
|
+ if(rbuffer[k]==' ' || rbuffer[k]=='\t'
|
|
|
+ || rbuffer[k]=='\r' || rbuffer[k]=='\n')
|
|
|
+ k++;
|
|
|
+ else break;
|
|
|
+ }
|
|
|
+ if(k==rlen) return;
|
|
|
+ while(k<rlen) {
|
|
|
+ if(rbuffer[k]>='0' && rbuffer[k]<='9') {
|
|
|
+ frame.len = frame.len*10 + rbuffer[k] - '0';
|
|
|
+ } else {
|
|
|
+ if(rbuffer[k]==':')
|
|
|
+ break;
|
|
|
+ /* invalid character - discard the rest */
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ k++;
|
|
|
+ }
|
|
|
+ if(k==rlen || frame.len<=0) return;
|
|
|
+ if(frame.len + k>=rlen) return;
|
|
|
+ k++;
|
|
|
+ frame.s = rbuffer + k;
|
|
|
+ if(frame.s[frame.len]!=',') return;
|
|
|
+ frame.s[frame.len] = '\0';
|
|
|
+ k += frame.len ;
|
|
|
+ evenv.msg.s = frame.s;
|
|
|
+ evenv.msg.len = frame.len;
|
|
|
+ evapi_run_cfg_route(&evenv, _evapi_rts.msg_received);
|
|
|
+ k++;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ evenv.msg.s = rbuffer;
|
|
|
+ evenv.msg.len = rlen;
|
|
|
+ evapi_run_cfg_route(&evenv, _evapi_rts.msg_received);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|