2
0
Эх сурвалжийг харах

kafka: send messages with key via kafka_send_key function and kafka.send_key for KEMI.

Vicente Hernando 5 жил өмнө
parent
commit
4a71486624

+ 113 - 2
src/modules/kafka/kafka_mod.c

@@ -58,6 +58,7 @@ static void mod_destroy(void);
 static int child_init(int rank);
 static int child_init(int rank);
 static int fixup_kafka_send(void** param, int param_no);
 static int fixup_kafka_send(void** param, int param_no);
 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
+static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey);
 
 
 /*
 /*
  * Variables and functions to deal with module parameters.
  * Variables and functions to deal with module parameters.
@@ -72,6 +73,8 @@ static int kafka_topic_param(modparam_t type, void *val);
 static cmd_export_t cmds[] = {
 static cmd_export_t cmds[] = {
 	{"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
 	{"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
 	 0, ANY_ROUTE},
 	 0, ANY_ROUTE},
+	{"kafka_send_key", (cmd_function)w_kafka_send_key, 3, fixup_kafka_send,
+	 0, ANY_ROUTE},
     { 0, 0, 0, 0, 0, 0}
     { 0, 0, 0, 0, 0, 0}
 };
 };
 
 
@@ -201,7 +204,7 @@ static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
 		return -1;
 		return -1;
 	}
 	}
 
 
-	if (kfk_message_send(&s_topic, &s_message)) {
+	if (kfk_message_send(&s_topic, &s_message, NULL)) {
 		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
 		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
 			   s_topic.len, s_topic.s,
 			   s_topic.len, s_topic.s,
 			   s_message.len, s_message.s);
 			   s_message.len, s_message.s);
@@ -214,6 +217,74 @@ static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
 	return 1;
 	return 1;
 }
 }
 
 
+/**
+ * \brief Send a message via Kafka plus key parameter.
+ */
+static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey)
+{
+	str s_topic;
+
+	if (ptopic == NULL) {
+		LM_ERR("Invalid topic parameter\n");
+		return -1;
+	}
+
+	if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
+		LM_ERR("No topic\n");
+		return -1;
+	}
+	if (s_topic.s == NULL || s_topic.len == 0) {
+		LM_ERR("Invalid topic string\n");
+		return -1;
+	}
+
+	str s_message;
+
+	if (pmessage == NULL) {
+		LM_ERR("Invalid message parameter\n");
+		return -1;
+	}
+
+	if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
+		LM_ERR("No message\n");
+		return -1;
+	}
+	if (s_message.s == NULL || s_message.len == 0) {
+		LM_ERR("Invalid message string\n");
+		return -1;
+	}
+
+	str s_key;
+
+	if (pkey == NULL) {
+		LM_ERR("Invalid key parameter\n");
+		return -1;
+	}
+
+	if (get_str_fparam(&s_key, msg, (gparam_t*)pkey)!=0) {
+		LM_ERR("No key\n");
+		return -1;
+	}
+	if (s_key.s == NULL || s_key.len == 0) {
+		LM_ERR("Invalid key string\n");
+		return -1;
+	}
+
+	if (kfk_message_send(&s_topic, &s_message, &s_key)) {
+		LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
+			   s_topic.len, s_topic.s,
+			   s_key.len, s_key.s,
+			   s_message.len, s_message.s);
+		return -1;
+	}
+
+	LM_DBG("Message key sent (Topic: %.*s) (key: %.*s) : %.*s\n",
+		   s_topic.len, s_topic.s,
+		   s_key.len, s_key.s,
+		   s_message.len, s_message.s);
+	return 1;
+}
+
 /**
 /**
  * \brief KEMI function to send a Kafka message.
  * \brief KEMI function to send a Kafka message.
  */
  */
@@ -229,7 +300,7 @@ static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
 		return -1;
 		return -1;
 	}
 	}
 
 
-	if (kfk_message_send(s_topic, s_message)) {
+	if (kfk_message_send(s_topic, s_message, NULL)) {
 		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
 		LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
 			   s_topic->len, s_topic->s,
 			   s_topic->len, s_topic->s,
 			   s_message->len, s_message->s);
 			   s_message->len, s_message->s);
@@ -242,6 +313,41 @@ static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
 	return 1;
 	return 1;
 }
 }
 
 
+/**
+ * \brief KEMI function to send a Kafka message plus key.
+ */
+static int ki_kafka_send_key(struct sip_msg* msg, str *s_topic, str *s_message, str *s_key)
+{
+	if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
+		LM_ERR("Invalid topic string\n");
+		return -1;
+	}
+
+	if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
+		LM_ERR("Invalid message string\n");
+		return -1;
+	}
+
+	if (s_key == NULL || s_key->s == NULL || s_key->len == 0) {
+		LM_ERR("Invalid key string\n");
+		return -1;
+	}
+
+	if (kfk_message_send(s_topic, s_message, s_key)) {
+		LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
+			   s_topic->len, s_topic->s,
+			   s_key->len, s_key->s,
+			   s_message->len, s_message->s);
+		return -1;
+	}
+
+	LM_DBG("Message sent (Topic: %.*s) (key: %.*s) : %.*s\n",
+		   s_topic->len, s_topic->s,
+		   s_key->len, s_key->s,
+		   s_message->len, s_message->s);
+	return 1;
+}
+
 /**
 /**
  * \brief Kafka :: Array with KEMI functions
  * \brief Kafka :: Array with KEMI functions
  */
  */
@@ -252,6 +358,11 @@ static sr_kemi_t sr_kemi_kafka_exports[] = {
 	  { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
 	  { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
 		SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
 		SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
 	},
 	},
+	{ str_init("kafka"), str_init("send_key"),
+	  SR_KEMIP_INT, ki_kafka_send_key,
+	  { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
+		SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
+	},
 
 
 	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
 	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
 };
 };

+ 12 - 2
src/modules/kafka/kfk.c

@@ -822,10 +822,11 @@ clean:
  *
  *
  * \param topic_name name of the topic
  * \param topic_name name of the topic
  * \param message message to send.
  * \param message message to send.
+ * \param key to send.
  *
  *
  * \return 0 on success.
  * \return 0 on success.
  */
  */
-int kfk_message_send(str *topic_name, str *message)
+int kfk_message_send(str *topic_name, str *message, str *key)
 {
 {
     /* Get topic from name. */
     /* Get topic from name. */
 	rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
 	rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
@@ -835,6 +836,15 @@ int kfk_message_send(str *topic_name, str *message)
 		return -1;
 		return -1;
 	}
 	}
 
 
+	/* Default key values (No key) */
+	void *keyp = NULL;
+	size_t key_len = 0;
+	if (key != NULL && key->len > 0 && key->s != NULL) {
+		keyp = key->s;
+		key_len = key->len;
+		LM_DBG("Key: %.*s\n", (int)key_len, (char*)keyp);
+	}
+	
 	/* Send a message. */
 	/* Send a message. */
 	if (rd_kafka_produce(
 	if (rd_kafka_produce(
 			rkt,
 			rkt,
@@ -844,7 +854,7 @@ int kfk_message_send(str *topic_name, str *message)
 			message->s,
 			message->s,
 			message->len,
 			message->len,
 			/* Optional key and its length */
 			/* Optional key and its length */
-			NULL, 0,
+			keyp, key_len,
 			/* Message opaque, provided in
 			/* Message opaque, provided in
 			 * delivery report callback as
 			 * delivery report callback as
 			 * msg_opaque. */
 			 * msg_opaque. */

+ 5 - 1
src/modules/kafka/kfk.h

@@ -56,9 +56,13 @@ int kfk_topic_parse(char *spec);
 /**
 /**
  * \brief send a message to a topic.
  * \brief send a message to a topic.
  *
  *
+ * \param topic_name name of the topic
+ * \param message message to send.
+ * \param key to send.
+ *
  * \return 0 on success.
  * \return 0 on success.
  */
  */
-int kfk_message_send(str *topic, str *message);
+int kfk_message_send(str *topic_name, str *message, str *key);
 
 
 /**
 /**
  * \brief Initialize statistics.
  * \brief Initialize statistics.