Ir al contenido principal

IoT, Kafka, Spark, Grafana



En esta entrada vamos a hablar un poco de como montar un sistema de procesamiento masivo de datos montando un mecanismo de procesamiento en modo ventana y la visualización de los datos en un dashboard muy simple.
Vamos a trabajar con las tecnlogias Kafka, Python, Spark (scala), InfluxBD y Grafana, pero no se trata de hablar sobre ellas, más bien de presentar un problema y ver como podríamos resolverlo por medio de estas herramientas.

Imaginemos que tenemos una red de sensores, con movilidad, que  manejan un conjunto de parámetros y nos dan unas medidas aleatorias en el tiempo, y nos piden monitorizar el movimiento de la red de sensores y fijar en un periodo de tiempo el valor máximo recibido de los mismos para un tipo de parámetro concreto.

Lo primero que vamos a hacer es montarnos un sistema que nos permite simular la red de sensores, para ello vamos a montar unos scripts en Python, que basados en unos datos aleatorios y otros pre fijamos, nos permitan simular la red de sensores, donde tendremos un sensor, que nos devuelve un parámetro de medida con un valor desde una posición geográfica concreta.

Vamos a montar una arquitectura Lambda donde vamos a usar como mecanismo de interconexión Kafka es una plataforma de streaming; básicamente un servicio de mensajería que permite publicar y suscribir streams de datos.

En el Script principal lo que vamos a hacer es:

  • Conectarnos a Kafka.
  • Generar los datos aleatorios.
  • Enviar los datos, con un delay que pasaremos como parámetro al Script.



La clase que nos va a permitir conectarnos a Kafka va a tener un par de funciones, la conexión, el envió y lo olvidaba una función para gestionar los errores por medio de una callback.


Aquí sólo tendremos que tener en cuenta en tener instalado el paquete de Kafka para poder usar la clase que envuelve al productor. El método para inicializar la conexión se basa en el constructor de KafkaProducer donde tendremos que especificar el nombre del servidor y la versión de api que use el servidor.
Para enviar un dato sólo necesitamos el dato serializado, en nuestro caso vamos a enviar strings y el topic al cual esta asociado el dato a enviar.


A continuación vamos a generar los datos sinteticos que vamos a manejar, tendremos 2 partes, datos generados aleatoriamente a partir de un subconjunto de datos, vamos a manejar la identificación de los sensores, los parámetros a monitorizar y las coordenadas geográficas desde donde los sensores envían los datos. También vamos a manejar datos aleatorios, de tipo numérico, para representar los valores y fechas.

Los subconjuntos de datos aleatorios los vamos a cargar a partir de los datos generados en ficheros en formato csv, donde nos vamos a apoyar en la librería de Pandas para manejar los. Con esos sencillos métodos podemos cargar los ficheros, obtener los mismos y/o un dato concreto manejado por su columna y fila.



La generación de los datos del sensor se va a basar en el componente fecha, dentro de la información que envía el sensor, tendremos una fecha que vamos a generar de forma aleatoria. Todo esto lo tenemos recogido en la clase Sensor.



Los sensores van enviar la información codificada en una trama de texto plano que va a entrar directamente, como hemos visto el script de python a Kafka, con el siguiente formato:

id sensor; parámetro; año; mes; dia; hora; latitud; longitud; valor de la medida

Los valores para el id sensor, parámetro y coordenadas (latitud y longitud) van a ser generadas de forma aleatoria desde unos ficheros csv que contienen los valores necesarios.

Estas tramas con la medida para el parámetro asociado y en la fecha definida, van a llegar a Kafka sobre un topic que tenemos creado, no vamos a comentar que es Kafka, solamente lo emplearemos como mecanismo de comunicación para desacoplar los sensores del programa de procesamiento en spark. Para ello vamos a ejecutar el siguiente comando desde una shell

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  station-iot

En mi caso he montado Kafka dentro una máquina virtual de cloudera, descargada directamente.




Una vez que hemos lanzado el comando ya tendremos el topic creado y podremos comunicarnos con kafka, escribiendo sobre el puerto correcto desde nuestro script de python. Antes podemos ver la lista de topics creados por medio del siguiente comando:

kafka-topics --list --zookeeper localhost:2181

De esta forma podremos ver si el topic se ha creado, en otro caso analizaremos el error que nos devuelve el comando para crearlo antes definido. Para ver que todo funciona podemos ejecutar un consumidor, que nos irá mostrando los mensajes que se van recibiendo, como en nuestro caso el protocolo se basa en texto plano, es decir no estamos enviando nada serializado que debamos deserializar después, lo vamos a poder observar muy bien, para esto ejecutamos el siguiente comando y podremos ver que va llegando.

kafka-console-consumer --zookeeper localhost:2181 --topic station-iot



Una vez desarrollada esta parte, nos queda comenzar con la parte de Spark, para ello vamos a definir que vamos a desarrollar. Se tratará de un programa en scala que se conecte y analice un Streaming de datos, para ello vamos a realizar varios procesamientos, pero los más interesantes van a ser:

  • Media sobre la ventana de procesamiento para cada uno de los sensores y el parámetro que envían.
  • Procesamiento de todas los sensores recibidos, detectando además, para cada sensor el parámetro con mayor valor numérico y para el mismo el valor más alto.
  • Filtrado de un conjunto de un conjunto de sensores específicos.



Lo primero que vamos a hacer es cargar las propiedades desde un fichero de configuración, nombre del topic, IP donde tenemos instalado Kafka,  zookeeper, configuración especifica de Spark, etc.



Vamos a hacer es inicializar el script context, para ello inicializamos Spark, con la configuración recogida en el fichero de recursos, creando un objeto de tipo StreamingContext que nos permitirá terminar de configurar el flujo de datos a manejar y conectarlo con Kafka.



Para conectarlo con Kafka, vamos a asociar nuestro StreamingContext con la configuración de Kafka, donde vamos a especificar los valores de la conexión, el topic que vamos a emplear, como almacernerá Kafka la información (en nuestro caso en memoria), que codec va usar para manejar la información, en nuestro caso ya hemos mencionado que vamos a trabajar con Strings. Al final vamos a crear un objeto que representa la entrada del receptor de datos, de tipo ReceiverInputDStream sobre el cual podremos trabajar, mapearemos los datos a un objeto de tipo Sensor, que contendrá las propiedades del mismo y la lógica necesaria para trabajar con el mismo.


Finalmente vamos a procesar la información recibida, para ellos vamos a crear objetos especializados que nos ayudarán a tratar la información. En la siguiente imagen tenemos el método que nos permite enviar todas las coordenadas recibidas y calcular los máximos de los sensores, por operación para una ventana de procesamiento.


En este caso estamos usando funciones de ventana en Spark que nos permiten hacer las agrupaciones de datos por sensor, ordenando los valores por código de parámetro y valor, finalmente el DS lo registramos como una tabla, sobre la cual calculamos el valor máximo para cada operación.

Terminamos enviando los datos a Grafana por medio de influxdb, una herramienta que nos permite crear series temporales, realmente actúa como una BD para poder analizar los datos. Para esto desde Scala, vamos a enviar los datos consumiendo el servicio REST que la herramienta pone a nuestro servicio, por medio del siguiente objeto.


En Github tenéis lo necesario para echar a andar esta parte, en el repositorio de examples, en la rama develop (aún no he podido hacer el merge).

Aqui tenénis la parte de Python que hemos visto.
https://github.com/franciscojavierestrella/Examples/tree/develop/EjemplosPython

Aqui tenénis la parte de Scala para procesar los datos.
https://github.com/franciscojavierestrella/Examples/tree/develop/SparkStreaming


Nos queda procesar la información y pintar los datos , para ello si echamos una vistazo a la herramienta influxdb , rápidamente nos haremos con ella. Tendremos que enviar los datos reflejando la BD (que previamente hemos creado en Influx) y en la query string la información a enviar, como el nombre de la serie, la clave de la serie y los valores.

Esto sería la url que vamos a emplear en la querystring, donde reflejamos la base de datos sensor.

http://localhost:8086/write?db=sensor

En la querystring vamos a reflejar la medida, indicando primero de que medida se trata y a continuación, separados por un blanco, los valores asociados a la misma. En este caso estamos hablando de una medida (meassure) para el parámetro 7, sensor 5 con latitude y longitud inventadas, terminando con el valor del parámetro.

meassure,op=7 sensor=5,latitude=47.2568899,longitude=-3.5487987,value=0007V

La siguiente imagen nos muestra como crear en la herramienta influxdb la nueva base de datos, las medidas se irán creando de forma dinámica a medida que vayamos enviando datos.



Por último nos queda crear el DashBoard, para ello tendremos que hacer lo siguiente:

  • Conectar Grafana con Influxdb, para ello usaremos uno de los conectores que tiene por defecto y nos va a permitir recoger los datos y procesarlos.
  • Reflejar los sensores en un mapa, junto con la primera coordenada recogida en la ventana de procesamiento para un intervalo de tiempo determinado.
  • Además para cada parámetro vamos a reflejar el valor máximo de la serie recibido en los últimos 5 minutos, fijando unos valores umbrales para los mismos de 1..10 y además identificando el código del sensor.
Esto lo podemos ver reflejado en el siguiente DashBoard.



Para utilizar el mapa, tendremos que definir la query, haciendo referencia a la serie que vamos a emplear y sacando las coordenadas.


Finalmente, para pintar los sensores en el mapa necesitamos configurar las propiedades de los mismos y tratarlos en modo tabla, de esta forma tendremos acceso a los mismos y podremos reflejarlos en el mapa.


Para reflejar el valor máximo de las operaciones vamos a recurrir a un recurso gráfico de tipo Gauge que nos va a permitir relejar el valor asociando un código de colores al valor alcanzado por el mismo. Lo primero que haremos será definir como capturar los datos con respecto la serie, para ello accedemos a la serie concreta, y recogemos las variables a pintar.



Como hemos reflejado en la imagen anterior vamos a hacer un poco de trampa, para recoger el identificador del sensor, en la misma serie en el panel usando los datos, vamos a reflejar en el mismo tipo de recurso gráfico el identificador del sensor.

Con todo esto ya tenéis descrito como montarlo, si alguien necesita el DashBoard, que no dude en pedírmelo.


Comentarios

Entradas populares de este blog

Machine Learning soluciones a los problemas

  El aprendizaje automático, o Machine Learning (ML), es una rama de la inteligencia artificial que se centra en la creación de sistemas que pueden aprender y mejorar a partir de la experiencia sin ser explícitamente programados para cada tarea específica. Este campo ha ganado una gran relevancia en los últimos tiempos debido a los avances en el procesamiento de datos, la disponibilidad de grandes volúmenes de información y el incremento en la capacidad de computación y soluciones Cloud. Vamos a explorar algunos conceptos fundamentales de ML, que problemas surgen en la práctica, como podemos limpiar los datos y que técnicas podemos emplear, que es la validación cruzada y algunos desafíos específicos que plantean los algoritmos. Conceptos Fundamentales de Machine Learning Tipos de Aprendizaje Aprendizaje Supervisado : En este tipo de aprendizaje, el modelo se entrena utilizando un conjunto de datos etiquetados, lo que significa que cada ejemplo de entrenamiento viene con una etiquet...

Despliegue de un IDP en Cognito AWS e Integración con una Web en Sprint Boot

  Introducción En este artículo vamos a describir el proceso de despliegue de un proveedor de identidad (IDP) en Amazon Cognito e integración con una aplicación web desarrollada en Sprint. Se detallan las fases necesarias, las tareas principales dentro de cada fase, junto con recomendaciones para asegurar un proceso exitoso. Fases Necesarias Configuración de Cognito En esta fase se creará el pool de usuarios, el grupo de usuarios y las políticas de autenticación y autorización en Cognito. Tareas Principales: Crear un pool de usuarios en Cognito para gestionar las identidades de los usuarios. Definir los atributos de usuario necesarios para la autenticación y autorización. Configurar las políticas de autenticación para determinar los métodos de acceso (por ejemplo, nombre de usuario y contraseña, autenticación social). Crear un grupo de usuarios para agrupar a los usuarios con permisos específicos. Definir las políticas de autorización para determinar los permisos de acceso a los re...

Jhipster (Angular + Spring Boot) + Docker + AWS

En esta entrada vamos a ver la potencia de esta terna, ya sé que no tiene mucho que ver con el blog, pero me ha parecido interesante compartirlo ya que podemos desarrollar nuestra aplicación web basada en Sprint Boot con un frontal Angular sin mucho esfuerzo (más bien en maquetación), dockerizar esta aplicación, gestionarla desde nuestro dockerhub y publicarla en una máquina EC2 en Amazon completa, teniendo acceso a la misma. El objetivo no es profundizar en JHipster, que nos permite desarrollar proyectos por medio de esta herramienta de una forma rápida, ya que nos asila de conocer y profundizar en exceso en las herramientas (siempre que no nos sea necesario) en las que nos podemos basar. JHipster Es una plataforma de desarrollo que nos permite generar, desarrollar y desplegar modernas aplicaciones web y arquitecturas de microservicios. Os recomiendo que echéis un ojo, me ha parecido muy completo, fácil e intuitivo de manejar.  https://www.jhipster....