123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- /**
- * $Id$
- *
- * Copyright (C) 2011 Flowroute LLC (flowroute.com)
- *
- * This file is part of Kamailio, a free SIP server.
- *
- * This file is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- *
- * This file is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <errno.h>
- #include <string.h>
- #include <fcntl.h>
- #include <event.h>
- #include <sys/timerfd.h>
- #include "../../sr_module.h"
- #include "../../route.h"
- #include "../../route_struct.h"
- #include "../../lvalue.h"
- #include "../tm/tm_load.h"
- #include "jsonrpc_io.h"
- #include "jsonrpc.h"
- #include "netstring.h"
- #define CHECK_MALLOC_VOID(p) if(!p) {LM_ERR("Out of memory!"); return;}
- #define CHECK_MALLOC(p) if(!p) {LM_ERR("Out of memory!"); return -1;}
- struct jsonrpc_server {
- char *host;
- int port, socket, status;
- struct jsonrpc_server *next;
- struct event *ev;
- struct itimerspec *timer;
- };
- struct jsonrpc_server_group {
- struct jsonrpc_server *next_server;
- int priority;
- struct jsonrpc_server_group *next_group;
- };
- struct tm_binds tmb;
- struct jsonrpc_server_group *server_group;
- void socket_cb(int fd, short event, void *arg);
- void cmd_pipe_cb(int fd, short event, void *arg);
- int set_non_blocking(int fd);
- int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr);
- int connect_servers(struct jsonrpc_server_group *group);
- int connect_server(struct jsonrpc_server *server);
- int handle_server_failure(struct jsonrpc_server *server);
- int jsonrpc_io_child_process(int cmd_pipe, char* _servers)
- {
- if (parse_servers(_servers, &server_group) != 0)
- {
- LM_ERR("servers parameter could not be parsed\n");
- return -1;
- }
- event_init();
-
- struct event pipe_ev;
- set_non_blocking(cmd_pipe);
- event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev);
- event_add(&pipe_ev, NULL);
- if (!connect_servers(server_group))
- {
- LM_ERR("failed to connect to any servers\n");
- return -1;
- }
- event_dispatch();
- return 0;
- }
- void timeout_cb(int fd, short event, void *arg)
- {
- LM_ERR("message timeout\n");
- jsonrpc_request_t *req = (jsonrpc_request_t*)arg;
- json_object *error = json_object_new_string("timeout");
- void_jsonrpc_request(req->id);
- close(req->timerfd);
- event_del(req->timer_ev);
- pkg_free(req->timer_ev);
- req->cbfunc(error, req->cbdata, 1);
- pkg_free(req);
- }
- int result_cb(json_object *result, char *data, int error)
- {
- struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data;
- pv_spec_t *dst = cmd->cb_pv;
- pv_value_t val;
- const char* res = json_object_get_string(result);
- val.rs.s = (char*)res;
- val.rs.len = strlen(res);
- val.flags = PV_VAL_STR;
- dst->setf(0, &dst->pvp, (int)EQ_T, &val);
- int n;
- if (error) {
- n = route_get(&main_rt, cmd->err_route);
- } else {
- n = route_get(&main_rt, cmd->cb_route);
- }
- struct action *a = main_rt.rlist[n];
- tmb.t_continue(cmd->t_hash, cmd->t_label, a);
- free_pipe_cmd(cmd);
- return 0;
- }
- int (*res_cb)(json_object*, char*, int) = &result_cb;
- void cmd_pipe_cb(int fd, short event, void *arg)
- {
- struct jsonrpc_pipe_cmd *cmd;
- /* struct event *ev = (struct event*)arg; */
- if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
- LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
- return;
- }
- json_object *params = json_tokener_parse(cmd->params);
- json_object *payload = NULL;
- jsonrpc_request_t *req = NULL;
- if (cmd->notify_only) {
- payload = build_jsonrpc_notification(cmd->method, params);
- } else {
- req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb);
- if (req)
- payload = req->payload;
- }
- if (!payload) {
- LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params);
- return;
- }
- char *json = (char*)json_object_get_string(payload);
- char *ns; size_t bytes;
- bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
- struct jsonrpc_server_group *g;
- int sent = 0;
- for (g = server_group; g != NULL; g = g->next_group)
- {
- struct jsonrpc_server *s, *first = NULL;
- for (s = g->next_server; s != first; s = s->next)
- {
- if (first == NULL) first = s;
- if (s->status == JSONRPC_SERVER_CONNECTED) {
- if (send(s->socket, ns, bytes, 0) == bytes)
- {
- sent = 1;
- break;
- } else {
- handle_server_failure(s);
- }
- }
- g->next_server = s->next;
- }
- if (sent) {
- break;
- } else {
- LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority);
- }
- }
- if (sent && req) {
- int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
- if (timerfd == -1) {
- LM_ERR("Could not create timerfd.");
- return;
- }
- req->timerfd = timerfd;
- struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
- CHECK_MALLOC_VOID(itime);
- itime->it_interval.tv_sec = 0;
- itime->it_interval.tv_nsec = 0;
- itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000;
- itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000;
- if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
- {
- LM_ERR("Could not set timer.");
- return;
- }
- pkg_free(itime);
- struct event *timer_ev = pkg_malloc(sizeof(struct event));
- CHECK_MALLOC_VOID(timer_ev);
- event_set(timer_ev, timerfd, EV_READ, timeout_cb, req);
- if(event_add(timer_ev, NULL) == -1) {
- LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
- return;
- }
- req->timer_ev = timer_ev;
- } else if (!sent) {
- LM_ERR("Request could not be sent... no more failover groups.\n");
- if (req) {
- json_object *error = json_object_new_string("failure");
- void_jsonrpc_request(req->id);
- req->cbfunc(error, req->cbdata, 1);
- }
- }
- pkg_free(ns);
- json_object_put(payload);
- }
- void socket_cb(int fd, short event, void *arg)
- {
- struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
- if (event != EV_READ) {
- LM_ERR("unexpected socket event (%d)\n", event);
- handle_server_failure(server);
- return;
- }
- char *netstring;
- int retval = netstring_read_fd(fd, &netstring);
- if (retval != 0) {
- LM_ERR("bad netstring (%d)\n", retval);
- handle_server_failure(server);
- return;
- }
- struct json_object *res = json_tokener_parse(netstring);
- if (res) {
- handle_jsonrpc_response(res);
- json_object_put(res);
- } else {
- LM_ERR("netstring could not be parsed: (%s)\n", netstring);
- handle_server_failure(server);
- }
- pkg_free(netstring);
- }
- int set_non_blocking(int fd)
- {
- int flags;
- flags = fcntl(fd, F_GETFL);
- if (flags < 0)
- return flags;
- flags |= O_NONBLOCK;
- if (fcntl(fd, F_SETFL, flags) < 0)
- return -1;
- return 0;
- }
- int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr)
- {
- char cpy[strlen(_servers)+1];
- char *servers = strcpy(cpy, _servers);
- struct jsonrpc_server_group *group = NULL;
- /* parse servers string */
- char *token = strtok(servers, ":");
- while (token != NULL)
- {
- char *host, *port_s, *priority_s, *tail;
- int port, priority;
- host = token;
- /* validate domain */
- if (!(isalpha(host[0]) || isdigit(host[0]))) {
- LM_ERR("invalid domain (1st char is '%c')\n", host[0]);
- return -1;
- }
- int i;
- for (i=1; i<strlen(host)-1; i++)
- {
- if(!(isalpha(host[i]) || isdigit(host[i]) || host[i] == '-' || host[i] == '.'))
- {
- LM_ERR("invalid domain (char %d is %c)\n", i, host[i]);
- return -1;
- }
- }
- if (!(isalpha(host[i]) || isdigit(host[i]))) {
- LM_ERR("invalid domain (char %d (last) is %c)\n", i, host[i]);
- return -1;
- }
- /* convert/validate port */
- port_s = strtok(NULL, ",");
- if (port_s == NULL || !(port = strtol(port_s, &tail, 0)) || strlen(tail))
- {
- LM_ERR("invalid port: %s\n", port_s);
- return -1;
- }
- /* convert/validate priority */
- priority_s = strtok(NULL, " ");
- if (priority_s == NULL || !(priority = strtol(priority_s, &tail, 0)) || strlen(tail))
- {
- LM_ERR("invalid priority: %s\n", priority_s);
- return -1;
- }
-
- struct jsonrpc_server *server = pkg_malloc(sizeof(struct jsonrpc_server));
- CHECK_MALLOC(server);
- char *h = pkg_malloc(strlen(host)+1);
- CHECK_MALLOC(h);
- strcpy(h,host);
- server->host = h;
- server->port = port;
- server->status = JSONRPC_SERVER_DISCONNECTED;
- server->socket = 0;
- int group_cnt = 0;
- /* search for a server group with this server's priority */
- struct jsonrpc_server_group *selected_group = NULL;
- for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group)
- {
- if (selected_group->priority == priority) break;
- }
-
- if (selected_group == NULL) {
- group_cnt++;
- LM_INFO("Creating group for priority %d\n", priority);
- /* this is the first server for this priority... link it to itself */
- server->next = server;
-
- selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group));
- CHECK_MALLOC(selected_group);
- selected_group->priority = priority;
- selected_group->next_server = server;
-
- /* insert the group properly in the linked list */
- struct jsonrpc_server_group *x, *pg;
- pg = NULL;
- if (group == NULL)
- {
- group = selected_group;
- group->next_group = NULL;
- } else {
- for (x = group; x != NULL; x = x->next_group)
- {
- if (priority > x->priority)
- {
- if (pg == NULL)
- {
- group = selected_group;
- } else {
- pg->next_group = selected_group;
- }
- selected_group->next_group = x;
- break;
- } else if (x->next_group == NULL) {
- x->next_group = selected_group;
- break;
- } else {
- pg = x;
- }
- }
- }
- } else {
- LM_ERR("Using existing group for priority %d\n", priority);
- server->next = selected_group->next_server->next;
- selected_group->next_server->next = server;
- }
- token = strtok(NULL, ":");
- }
- *group_ptr = group;
- return 0;
- }
- int connect_server(struct jsonrpc_server *server)
- {
- struct sockaddr_in server_addr;
- struct hostent *hp;
- server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(server->port);
- hp = gethostbyname(server->host);
- if (hp == NULL) {
- LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
- handle_server_failure(server);
- return -1;
- }
- memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length);
- int sockfd = socket(AF_INET,SOCK_STREAM,0);
- if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) {
- LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno));
- handle_server_failure(server);
- return -1;
- }
- if (set_non_blocking(sockfd) != 0)
- {
- LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port);
- handle_server_failure(server);
- return -1;
- }
- server->socket = sockfd;
- server->status = JSONRPC_SERVER_CONNECTED;
- struct event *socket_ev = pkg_malloc(sizeof(struct event));
- CHECK_MALLOC(socket_ev);
- event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server);
- event_add(socket_ev, NULL);
- server->ev = socket_ev;
- return 0;
- }
- int connect_servers(struct jsonrpc_server_group *group)
- {
- int connected_servers = 0;
- for (;group != NULL; group = group->next_group)
- {
- struct jsonrpc_server *s, *first = NULL;
- LM_INFO("Connecting to servers for priority %d:\n", group->priority);
- for (s=group->next_server;s!=first;s=s->next)
- {
- if (connect_server(s) == 0)
- {
- connected_servers++;
- LM_INFO("Connected to host %s on port %d\n", s->host, s->port);
- }
- if (first == NULL) first = s;
- }
- }
- return connected_servers;
- }
- void reconnect_cb(int fd, short event, void *arg)
- {
- LM_INFO("Attempting to reconnect now.");
- struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
-
- if (server->status == JSONRPC_SERVER_CONNECTED) {
- LM_WARN("Trying to connect an already connected server.");
- return;
- }
- if (server->ev != NULL) {
- event_del(server->ev);
- pkg_free(server->ev);
- server->ev = NULL;
- }
- close(fd);
- pkg_free(server->timer);
- connect_server(server);
- }
- int handle_server_failure(struct jsonrpc_server *server)
- {
- LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
- if (server->socket)
- close(server->socket);
- server->socket = 0;
- if (server->ev != NULL) {
- event_del(server->ev);
- pkg_free(server->ev);
- server->ev = NULL;
- }
- server->status = JSONRPC_SERVER_FAILURE;
- int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
-
- if (timerfd == -1) {
- LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
- return -1;
- }
- struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
- CHECK_MALLOC(itime);
- itime->it_interval.tv_sec = 0;
- itime->it_interval.tv_nsec = 0;
-
- itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL;
- itime->it_value.tv_nsec = 0;
-
- if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
- {
- LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
- return -1;
- }
- LM_INFO("timerfd value is %d\n", timerfd);
- struct event *timer_ev = pkg_malloc(sizeof(struct event));
- CHECK_MALLOC(timer_ev);
- event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server);
- if(event_add(timer_ev, NULL) == -1) {
- LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
- return -1;
- }
- server->ev = timer_ev;
- server->timer = itime;
- return 0;
- }
- void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd)
- {
- if (cmd->method)
- shm_free(cmd->method);
- if (cmd->params)
- shm_free(cmd->params);
- if (cmd->cb_route)
- shm_free(cmd->cb_route);
- if (cmd->err_route)
- shm_free(cmd->err_route);
- if (cmd->cb_pv)
- shm_free(cmd->cb_pv);
- shm_free(cmd);
- }
|