Azure IoT Edge Image Processing–3–Add the Image Processor Module
March 23, 2020 Leave a comment
In the previous posts, we created a new IoT project and created a module to load the customvision container. Now, in this project, we will be adding a new module to the project. This module will run the main method for our solution. This module will integrate with the Unify camera, capture the image, pass that image to the image classifier, then based on the outcome, publish MQTT messages on a topic.
Create a new module as below.
Select the existing deployment file.
Select a Python module
Call the module AlfrescoImageProcessor
Provide the docker registry information
Press enter.
Once the module is created, you will notice that there are two folders under the modules/ folder.
Open the main.py under the modules/alfrescoImageProcessor
Replace the code with the following.
import time
import sys
import os
import requests
import json
import shutil
from azure.iot.device import IoTHubModuleClient, Message
import paho.mqtt.client as mqtt
# global counters
SENT_IMAGES = 0
# global client
CLIENT = None
# Send a message to IoT Hub
# Route output1 to $upstream in deployment.template.json
def send_to_hub(strMessage):
message = Message(bytearray(strMessage, ‘utf8’))
CLIENT.send_message_to_output(message, "output1")
global SENT_IMAGES
SENT_IMAGES += 1
print( "Total images sent: {}".format(SENT_IMAGES) )
# Find Probability in the JSON values.
def findprobablity (attributeName,jsonObject):
for entry in jsonObject["predictions"]:
if attributeName == entry [‘tagName’]:
return entry [‘probability’]
else:
return 0
# Send an image to the image classifying server
# Return the JSON response from the server with the prediction result
def sendFrameForProcessing(imagePath, imageProcessingEndpoint):
# Process Image
headers = {‘Content-Type’: ‘application/octet-stream’}
with open(imagePath, mode="rb") as test_image:
try:
response = requests.post(imageProcessingEndpoint, headers = headers, data = test_image)
print("Response from classification service: (" + str(response.status_code) + ") " + json.dumps(response.json()) + "\n")
except Exception as e:
print(e)
print("Response from classification service: (" + str(response.status_code))
return json.dumps(response.json())
def main(imagePath, imageProcessingEndpoint):
try:
print ( "Simulated camera module for Azure IoT Edge. Press Ctrl-C to exit." )
try:
global CLIENT
CLIENT = IoTHubModuleClient.create_from_edge_environment()
except Exception as iothub_error:
print ( "Unexpected error {} from IoTHub".format(iothub_error) )
return
print ( "The sample is now sending images for processing and will indefinitely.")
while True:
# Get image from HomeAssistant
url = CAMERA_CAPTURE_URL
response = requests.get(url, stream=True)
with open(imagePath, ‘wb’) as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
# Process Image
classification = sendFrameForProcessing(imagePath, imageProcessingEndpoint)
# find Active Probablity
probability = findprobablity("active", json.loads(classification))
# update MQTT sensor
client = mqtt.Client()
client.username_pw_set(MQTTUSER, MQTTPASSWORD)
client.connect(MQTTBROKER,1883,60)
if float(probability) > float(PROBABILITY_THRESHOLD):
client.publish("home/alfresco/image_processing_sensor/state", "on")
else:
client.publish("home/alfresco/image_processing_sensor/state", "off")
client.disconnect()
# send to IoT Hub
send_to_hub(classification)
time.sleep(15)
except KeyboardInterrupt:
print ( "IoT Edge module sample stopped" )
if __name__ == ‘__main__’:
try:
# Retrieve the image location and image classifying server endpoint from container environment
IMAGE_PATH = os.getenv(‘IMAGE_PATH’, "")
IMAGE_PROCESSING_ENDPOINT = os.getenv(‘IMAGE_PROCESSING_ENDPOINT’, "")
PROBABILITY_THRESHOLD = os.getenv(‘PROBABILITY_THRESHOLD’, "")
CAMERA_CAPTURE_URL = os.getenv(‘CAMERA_CAPTURE_URL’, "")
MQTTBROKER = os.getenv(‘MQTTBROKER’, "")
MQTTUSER = os.getenv(‘MQTTUSER’, "")
MQTTPASSWORD = os.getenv(‘MQTTPASSWORD’, "")
except ValueError as error:
print ( error )
sys.exit(1)
if ((IMAGE_PATH and IMAGE_PROCESSING_ENDPOINT) != ""):
main(IMAGE_PATH, IMAGE_PROCESSING_ENDPOINT)
else:
print ( "Error: Image path or image-processing endpoint missing" )
Now open the .env file and add some more credentials required for the second module.
Save the document.
Open the deployment.template.json
Navigate to the branch > modulesContent>$edgeAgent>properties.desired>modules
Remove the module SimulatedTemperatureSensor
And update the createOptions in AlfrescoImageProcessor as below
"createOptions": "{\"Env\":[\"IMAGE_PATH=alfresco_image1.jpg\",\"IMAGE_PROCESSING_ENDPOINT=http://alfrescoClassifier/image\",\"PROBABILITY_THRESHOLD=0.6\",\"CAMERA_CAPTURE_URL=$camera_capture_url\",\"MQTTBROKER=$mqttbroker\",\"MQTTUSER=$mqttuser\",\"MQTTPASSWORD=$mqttpassword\"]}"
That’s it. Now it’s time to build and deploy the modules.
The full source of this solution is available in github in the following location.
https://github.com/sameeraman/AlfrescoVisionwithAzureIoTEdge