En esta entrada vamos a hablar un poco de como montar un sistema de procesamiento masivo de datos montando un mecanismo de procesamiento en modo ventana y la visualización de los datos en un dashboard muy simple.
Vamos a trabajar con las tecnlogias Kafka, Python, Spark (scala), InfluxBD y Grafana, pero no se trata de hablar sobre ellas, más bien de presentar un problema y ver como podríamos resolverlo por medio de estas herramientas.
Imaginemos que tenemos una red de sensores, con movilidad, que manejan un conjunto de parámetros y nos dan unas medidas aleatorias en el tiempo, y nos piden monitorizar el movimiento de la red de sensores y fijar en un periodo de tiempo el valor máximo recibido de los mismos para un tipo de parámetro concreto.
Lo primero que vamos a hacer es montarnos un sistema que nos permite simular la red de sensores, para ello vamos a montar unos scripts en Python, que basados en unos datos aleatorios y otros pre fijamos, nos permitan simular la red de sensores, donde tendremos un sensor, que nos devuelve un parámetro de medida con un valor desde una posición geográfica concreta.
Vamos a montar una arquitectura Lambda donde vamos a usar como mecanismo de interconexión Kafka es una plataforma de streaming; básicamente un servicio de mensajería que permite publicar y suscribir streams de datos.
En el Script principal lo que vamos a hacer es:
- Conectarnos a Kafka.
- Generar los datos aleatorios.
- Enviar los datos, con un delay que pasaremos como parámetro al Script.
La clase que nos va a permitir conectarnos a Kafka va a tener un par de funciones, la conexión, el envió y lo olvidaba una función para gestionar los errores por medio de una callback.
Aquí sólo tendremos que tener en cuenta en tener instalado el paquete de Kafka para poder usar la clase que envuelve al productor. El método para inicializar la conexión se basa en el constructor de KafkaProducer donde tendremos que especificar el nombre del servidor y la versión de api que use el servidor.
Para enviar un dato sólo necesitamos el dato serializado, en nuestro caso vamos a enviar strings y el topic al cual esta asociado el dato a enviar.
A continuación vamos a generar los datos sinteticos que vamos a manejar, tendremos 2 partes, datos generados aleatoriamente a partir de un subconjunto de datos, vamos a manejar la identificación de los sensores, los parámetros a monitorizar y las coordenadas geográficas desde donde los sensores envían los datos. También vamos a manejar datos aleatorios, de tipo numérico, para representar los valores y fechas.
Los subconjuntos de datos aleatorios los vamos a cargar a partir de los datos generados en ficheros en formato csv, donde nos vamos a apoyar en la librería de Pandas para manejar los. Con esos sencillos métodos podemos cargar los ficheros, obtener los mismos y/o un dato concreto manejado por su columna y fila.
La generación de los datos del sensor se va a basar en el componente fecha, dentro de la información que envía el sensor, tendremos una fecha que vamos a generar de forma aleatoria. Todo esto lo tenemos recogido en la clase Sensor.
Los sensores van enviar la información codificada en una trama de texto plano que va a entrar directamente, como hemos visto el script de python a Kafka, con el siguiente formato:
id sensor; parámetro; año; mes; dia; hora; latitud; longitud; valor de la medida
Los valores para el id sensor, parámetro y coordenadas (latitud y longitud) van a ser generadas de forma aleatoria desde unos ficheros csv que contienen los valores necesarios.
Estas tramas con la medida para el parámetro asociado y en la fecha definida, van a llegar a Kafka sobre un topic que tenemos creado, no vamos a comentar que es Kafka, solamente lo emplearemos como mecanismo de comunicación para desacoplar los sensores del programa de procesamiento en spark. Para ello vamos a ejecutar el siguiente comando desde una shell
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test station-iot
Una vez que hemos lanzado el comando ya tendremos el topic creado y podremos comunicarnos con kafka, escribiendo sobre el puerto correcto desde nuestro script de python. Antes podemos ver la lista de topics creados por medio del siguiente comando:
kafka-topics --list --zookeeper localhost:2181
De esta forma podremos ver si el topic se ha creado, en otro caso analizaremos el error que nos devuelve el comando para crearlo antes definido. Para ver que todo funciona podemos ejecutar un consumidor, que nos irá mostrando los mensajes que se van recibiendo, como en nuestro caso el protocolo se basa en texto plano, es decir no estamos enviando nada serializado que debamos deserializar después, lo vamos a poder observar muy bien, para esto ejecutamos el siguiente comando y podremos ver que va llegando.
kafka-console-consumer --zookeeper localhost:2181 --topic station-iot
Una vez desarrollada esta parte, nos queda comenzar con la parte de Spark, para ello vamos a definir que vamos a desarrollar. Se tratará de un programa en scala que se conecte y analice un Streaming de datos, para ello vamos a realizar varios procesamientos, pero los más interesantes van a ser:
- Media sobre la ventana de procesamiento para cada uno de los sensores y el parámetro que envían.
- Procesamiento de todas los sensores recibidos, detectando además, para cada sensor el parámetro con mayor valor numérico y para el mismo el valor más alto.
- Filtrado de un conjunto de un conjunto de sensores específicos.
Lo primero que vamos a hacer es cargar las propiedades desde un fichero de configuración, nombre del topic, IP donde tenemos instalado Kafka, zookeeper, configuración especifica de Spark, etc.
Vamos a hacer es inicializar el script context, para ello inicializamos Spark, con la configuración recogida en el fichero de recursos, creando un objeto de tipo StreamingContext que nos permitirá terminar de configurar el flujo de datos a manejar y conectarlo con Kafka.
Para conectarlo con Kafka, vamos a asociar nuestro StreamingContext con la configuración de Kafka, donde vamos a especificar los valores de la conexión, el topic que vamos a emplear, como almacernerá Kafka la información (en nuestro caso en memoria), que codec va usar para manejar la información, en nuestro caso ya hemos mencionado que vamos a trabajar con Strings. Al final vamos a crear un objeto que representa la entrada del receptor de datos, de tipo ReceiverInputDStream sobre el cual podremos trabajar, mapearemos los datos a un objeto de tipo Sensor, que contendrá las propiedades del mismo y la lógica necesaria para trabajar con el mismo.
Finalmente vamos a procesar la información recibida, para ellos vamos a crear objetos especializados que nos ayudarán a tratar la información. En la siguiente imagen tenemos el método que nos permite enviar todas las coordenadas recibidas y calcular los máximos de los sensores, por operación para una ventana de procesamiento.
En este caso estamos usando funciones de ventana en Spark que nos permiten hacer las agrupaciones de datos por sensor, ordenando los valores por código de parámetro y valor, finalmente el DS lo registramos como una tabla, sobre la cual calculamos el valor máximo para cada operación.
Terminamos enviando los datos a Grafana por medio de influxdb, una herramienta que nos permite crear series temporales, realmente actúa como una BD para poder analizar los datos. Para esto desde Scala, vamos a enviar los datos consumiendo el servicio REST que la herramienta pone a nuestro servicio, por medio del siguiente objeto.
En Github tenéis lo necesario para echar a andar esta parte, en el repositorio de examples, en la rama develop (aún no he podido hacer el merge).
Aqui tenénis la parte de Python que hemos visto.
https://github.com/franciscojavierestrella/Examples/tree/develop/EjemplosPython
Aqui tenénis la parte de Scala para procesar los datos.
https://github.com/franciscojavierestrella/Examples/tree/develop/SparkStreaming
Nos queda procesar la información y pintar los datos , para ello si echamos una vistazo a la herramienta influxdb , rápidamente nos haremos con ella. Tendremos que enviar los datos reflejando la BD (que previamente hemos creado en Influx) y en la query string la información a enviar, como el nombre de la serie, la clave de la serie y los valores.
Esto sería la url que vamos a emplear en la querystring, donde reflejamos la base de datos sensor.
http://localhost:8086/write?db=sensor
meassure,op=7 sensor=5,latitude=47.2568899,longitude=-3.5487987,value=0007V
La siguiente imagen nos muestra como crear en la herramienta influxdb la nueva base de datos, las medidas se irán creando de forma dinámica a medida que vayamos enviando datos.
Por último nos queda crear el DashBoard, para ello tendremos que hacer lo siguiente:
- Conectar Grafana con Influxdb, para ello usaremos uno de los conectores que tiene por defecto y nos va a permitir recoger los datos y procesarlos.
- Reflejar los sensores en un mapa, junto con la primera coordenada recogida en la ventana de procesamiento para un intervalo de tiempo determinado.
- Además para cada parámetro vamos a reflejar el valor máximo de la serie recibido en los últimos 5 minutos, fijando unos valores umbrales para los mismos de 1..10 y además identificando el código del sensor.
Esto lo podemos ver reflejado en el siguiente DashBoard.
Para utilizar el mapa, tendremos que definir la query, haciendo referencia a la serie que vamos a emplear y sacando las coordenadas.
Finalmente, para pintar los sensores en el mapa necesitamos configurar las propiedades de los mismos y tratarlos en modo tabla, de esta forma tendremos acceso a los mismos y podremos reflejarlos en el mapa.
Para reflejar el valor máximo de las operaciones vamos a recurrir a un recurso gráfico de tipo Gauge que nos va a permitir relejar el valor asociando un código de colores al valor alcanzado por el mismo. Lo primero que haremos será definir como capturar los datos con respecto la serie, para ello accedemos a la serie concreta, y recogemos las variables a pintar.
Como hemos reflejado en la imagen anterior vamos a hacer un poco de trampa, para recoger el identificador del sensor, en la misma serie en el panel usando los datos, vamos a reflejar en el mismo tipo de recurso gráfico el identificador del sensor.
Con todo esto ya tenéis descrito como montarlo, si alguien necesita el DashBoard, que no dude en pedírmelo.
Comentarios
Publicar un comentario