Get Started with Data Comms and the Raspberry Pi Pico
The Raspberry Pi Pico would be a great Internet of Things device but for one thing: it has no Internet connectivity. Fortunately, we can fix that with a Super SIM and an add-on cellular module such as Waveshare's Pico SIM7080.
In our tutorial Get Started with SMS Commands and the Raspberry Pi Pico, we combined the Pico, the Waveshare Pico SIM7080 cellular module board, an MCP9808 temperature sensor, and a four-digit, seven-segment LED display into a prototype Internet of Things (IoT) development device.
This device uses Super SIM's SMS Commands API to receive commands over-the-air and, when instructed, to send back information. This works very well for device-to-user communications routed through your cloud, but what if you want the device to be able to reach out to other Internet resources? For that you need a data connection and the ability to make HTTP requests — GET, POST, PUT, etc. — and parse the remote server's response.
This tutorial will take you through the process of adding exactly this functionality to your IoT application.
Head there now and then come back here when you've completed Step 4.
2. Prepare the initial Python code
Throughout this tutorial, you'll be pasting code from this page into a text editor, first the code below and then additional functions as you progress through the guide. At each stage, you'll copy the current code from your editor and paste it across to the Pico. The code included here entirely replaces hat from the previous tutorial in the series.
At this point, you should have a Pico with MicroPython installed. It should be fitted to the Waveshare board and connected to your computer by USB cable. You should have fired up Minicom (Mac/Linux) or PuTTY (Windows) and have the MicroPython REPL prompt, >>>. Hit Ctrl-C to exit the running program, if you don't see the prompt.
As a reminder, hit Ctrl-E to enter MicroPython's 'paste mode', paste in code copied from your text editor, and then hit Ctrl-D to start running it.
Alternatively, if you're a Mac or Linux user, you can use the pyboard.py tool to beam it over for you and relay the output to your terminal — details here.
Here's the base code listing. Copy it — click on the copy icon in the top right corner of the listing; it'll appear as you mouse over the code — and paste it into your text editor.
You can find the a complete listing of the code, including all subsequent additions, at our public GitHub repo.
Don't send it over to the Pico just yet — you'll need to complete Step 3 first.
To save scrolling, click here to jump to the rest of the tutorial.
from machine import UART, Pin, I2Cfrom utime import ticks_ms, sleepimport jsonclass MCP9808:""" A simple driver for the I2C-connected MCP9808 temperature sensor. This release supports MicroPython.""" # *********** PRIVATE PROPERTIES ********** i2c = None address = 0x18 # *********** CONSTRUCTOR ********** def __init__(self, i2c, i2c_address=0x18): assert 0x00 <= i2c_address < 0x80,"ERROR - Invalid I2C address in MCP9808()" self.i2c = i2c self.address = i2c_address # *********** PUBLIC METHODS ********** def read_temp(self): # Read sensor and return its value in degrees celsius. temp_bytes = self.i2c.readfrom_mem(self.address, 0x05, 2) # Scale and convert to signed value. temp_raw = (temp_bytes[0] << 8) | temp_bytes[1] temp_cel = (temp_raw & 0x0FFF) / 16.0 if temp_raw & 0x1000: temp_cel -= 256.0 return temp_celclass HT16K33:""" A simple, generic driver for the I2C-connected Holtek HT16K33 controller chip. This release supports MicroPython and CircuitPython Version: 3.0.2 Bus: I2C Author: Tony Smith (@smittytone) License: MIT Copyright: 2020""" # *********** CONSTANTS ********** HT16K33_GENERIC_DISPLAY_ON = 0x81 HT16K33_GENERIC_DISPLAY_OFF = 0x80 HT16K33_GENERIC_SYSTEM_ON = 0x21 HT16K33_GENERIC_SYSTEM_OFF = 0x20 HT16K33_GENERIC_DISPLAY_ADDRESS = 0x00 HT16K33_GENERIC_CMD_BRIGHTNESS = 0xE0 HT16K33_GENERIC_CMD_BLINK = 0x81 # *********** PRIVATE PROPERTIES ********** i2c = None address = 0 brightness = 15 flash_rate = 0 # *********** CONSTRUCTOR ********** def __init__(self, i2c, i2c_address): assert 0x00 <= i2c_address < 0x80,"ERROR - Invalid I2C address in HT16K33()" self.i2c = i2c self.address = i2c_address self.power_on() # *********** PUBLIC METHODS ********** def set_blink_rate(self, rate=0):""" Set the display's flash rate.""" assert rate in (0, 0.5, 1, 2), "ERROR - Invalid blink rate set in set_blink_rate()" self.blink_rate = rate & 0x03 self._write_cmd(self.HT16K33_GENERIC_CMD_BLINK | rate << 1) def set_brightness(self, brightness=15):""" Set the display's brightness (ie. duty cycle).""" if brightness < 0 or brightness > 15: brightness = 15 self.brightness = brightness self._write_cmd(self.HT16K33_GENERIC_CMD_BRIGHTNESS | brightness) def draw(self):""" Writes the current display buffer to the display itself.""" self._render() def update(self):""" Alternative for draw() for backwards compatibility""" self._render() def clear(self):""" Clear the buffer.""" for i in range(0, len(self.buffer)): self.buffer[i] = 0x00 return self def power_on(self):""" Power on the controller and display.""" self._write_cmd(self.HT16K33_GENERIC_SYSTEM_ON) self._write_cmd(self.HT16K33_GENERIC_DISPLAY_ON) def power_off(self):""" Power on the controller and display.""" self._write_cmd(self.HT16K33_GENERIC_DISPLAY_OFF) self._write_cmd(self.HT16K33_GENERIC_SYSTEM_OFF) # ********** PRIVATE METHODS ********** def _render(self):""" Write the display buffer out to I2C""" buffer = bytearray(len(self.buffer) + 1) buffer[1:] = self.buffer buffer[0] = 0x00 self.i2c.writeto(self.address, bytes(buffer)) def _write_cmd(self, byte):""" Writes a single command to the HT16K33. A private method.""" self.i2c.writeto(self.address, bytes([byte]))class HT16K33Segment(HT16K33):""" Micro/Circuit Python class for the Adafruit 0.56-in 4-digit, 7-segment LED matrix backpack and equivalent Featherwing. Version: 3.0.2 Bus: I2C Author: Tony Smith (@smittytone) License: MIT Copyright: 2020""" # *********** CONSTANTS ********** HT16K33_SEGMENT_COLON_ROW = 0x04 HT16K33_SEGMENT_MINUS_CHAR = 0x10 HT16K33_SEGMENT_DEGREE_CHAR = 0x11 HT16K33_SEGMENT_SPACE_CHAR = 0x00 # The positions of the segments within the buffer POS = (0, 2, 6, 8) # Bytearray of the key alphanumeric characters we can show: # 0-9, A-F, minus, degree CHARSET = b'\x3F\x06\x5B\x4F\x66\x6D\x7D\x07\x7F\x6F\x5F\x7C\x58\x5E\x7B\x71\x40\x63' # *********** CONSTRUCTOR ********** def __init__(self, i2c, i2c_address=0x70): self.buffer = bytearray(16) super(HT16K33Segment, self).__init__(i2c, i2c_address) # *********** PUBLIC METHODS ********** def set_colon(self, is_set=True):""" Set or unset the display's central colon symbol.""" self.buffer[self.HT16K33_SEGMENT_COLON_ROW] = 0x02 if is_set is True else 0x00 return self def set_glyph(self, glyph, digit=0, has_dot=False):""" Present a user-defined character glyph at the specified digit.""" assert 0 <= digit < 4, "ERROR - Invalid digit (0-3) set in set_glyph()" assert 0 <= glyph < 0xFF, "ERROR - Invalid glyph (0x00-0xFF) set in set_glyph()" self.buffer[self.POS[digit]] = glyph if has_dot is True: self.buffer[self.POS[digit]] |= 0x80 return self def set_number(self, number, digit=0, has_dot=False):""" Present single decimal value (0-9) at the specified digit.""" assert 0 <= digit < 4, "ERROR - Invalid digit (0-3) set in set_number()" assert 0 <= number < 10, "ERROR - Invalid value (0-9) set in set_number()" return self.set_character(str(number), digit, has_dot) def set_character(self, char, digit=0, has_dot=False):""" Present single alphanumeric character at the specified digit.""" assert 0 <= digit < 4, "ERROR - Invalid digit set in set_character()" char = char.lower() char_val = 0xFF if char == "deg": char_val = HT16K33_SEGMENT_DEGREE_CHAR elif char == '-': char_val = self.HT16K33_SEGMENT_MINUS_CHAR elif char == ' ': char_val = self.HT16K33_SEGMENT_SPACE_CHAR elif char in 'abcdef': char_val = ord(char) - 87 elif char in '0123456789': char_val = ord(char) - 48 assert char_val != 0xFF,"ERROR - Invalid char string set in set_character()" self.buffer[self.POS[digit]] = self.CHARSET[char_val] if has_dot is True: self.buffer[self.POS[digit]] |= 0x80 return self'''Send an AT command - return True if we got an expectedresponse ('back'), otherwise False'''def send_at(cmd, back="OK", timeout=1000): # Send the command and get the response (until timeout) buffer = send_at_get_resp(cmd, timeout) if len(buffer) > 0: return (back in buffer) return False'''Send an AT command - just return the response'''def send_at_get_resp(cmd, timeout=1000): # Send the AT command modem.write((cmd + "\r\n").encode()) # Read and return the response (until timeout) return read_buffer(timeout)'''Read in the buffer by sampling the UART until timeout'''def read_buffer(timeout): buffer = bytes() now = ticks_ms() while (ticks_ms() - now) < timeout and len(buffer) < 1025: if modem.any(): buffer += modem.read(1) return buffer.decode()'''Module startup detectionSend a command to see if the modem is powered up'''def boot_modem(): state = False count = 0 while count < 20: if send_at("ATE1"): print("The modem is ready") return True if not state: print("Powering the modem") module_power() state = True sleep(4) count += 1 return False'''Power the module on/off'''def module_power(): pwr_key = Pin(14, Pin.OUT) pwr_key.value(1) sleep(1.5) pwr_key.value(0)'''Check we are attached'''def check_network(): is_connected = False response = send_at_get_resp("AT+COPS?") line = split_msg(response, 1) if "+COPS:" in line: is_connected = (line.find(",") != -1) if is_connected: print("Network information:", line) return is_connected'''Configure the modem'''def configure_modem(): # NOTE AT commands can be sent together, not one at a time. # Set the error reporting level, set SMS text mode, delete left-over SMS # select LTE-only mode, select Cat-M only mode, set the APN to 'super' for Super SIM send_at("AT+CMEE=2;+CMGF=1;+CMGD=,4;+CNMP=38;+CMNB=1;+CGDCONT=1,\"IP\",\"super\"") # Set SSL version, SSL no verify, set HTTPS request parameters send_at("AT+CSSLCFG=\"sslversion\",1,3;+SHSSL=1,\"\";+SHCONF=\"BODYLEN\",1024;+SHCONF=\"HEADERLEN\",350") print("Modem configured for Cat-M and Super SIM")'''Open/close a data connection to the server'''def open_data_conn(): # Activate a data connection using PDP 0, # but first check it's not already open response = send_at_get_resp("AT+CNACT?") line = split_msg(response, 1) status = get_field_value(line, 1) if status == "0": # Inactive data connection so start one up success = send_at("AT+CNACT=0,1","ACTIVE", 2000) elif status in ("1","2"): # Active or operating data connection success = True print("Data connection","active" if success else "inactive") return successdef close_data_conn(): # Just close the connection down send_at("AT+CNACT=0,0") print("Data connection inactive")'''Start/end an HTTP session'''def start_session(server): # Deal with an existing session if there is one if send_at("AT+SHSTATE?","1"): print("Closing existing HTTP session") send_at("AT+SHDISC") # Configure a session with the server... send_at("AT+SHCONF=\"URL\",\"" + server + "\"") # ...and open it resp = send_at_get_resp("AT+SHCONN", 2000) # The above command may take a while to return, so # continue to check the UART until we have a response, # or 90s passes (timeout) now = ticks_ms() while ((ticks_ms() - now) < 90000): #if len(resp) > 0: print(resp) if "OK" in resp: return True if "ERROR" in resp: return False resp = read_buffer(1000) return Falsedef end_session(): # Break the link to the server send_at("AT+SHDISC") print("HTTP session closed")'''Set a standard request header'''def set_request_header(): global req_head_set # Check state variable to see if we need to # set the standard request header if not req_head_set: send_at("AT+SHCHEAD") send_at("AT+SHAHEAD=\"Content-Type\",\"application/x-www-form-urlencoded\";+SHAHEAD=\"User-Agent\",\"twilio-pi-pico/1.0.0\"")
send_at("AT+SHAHEAD=\"Cache-control\",\"no-cache\";+SHAHEAD=\"Connection\",\"keep-alive\";+SHAHEAD=\"Accept\",\"*/*\"")
req_head_set = True'''Make a request to the specified server'''def issue_request(server, path, body, verb): result = "" # Check the request verb code = 0 verbs = ["GET","PUT","POST","PATCH","HEAD"] if verb.upper() in verbs: code = verbs.index(verb) + 1 else: print("ERROR -- Unknown request verb specified") return "" # Attempt to open a data session if start_session(server): print("HTTP session open") # Issue the request... set_request_header() print("HTTP request verb code:",code) if body != None: set_request_body(body) response = send_at_get_resp("AT+SHREQ=\"" + path + "\"," + str(code)) start = ticks_ms() while ((ticks_ms() - start) < 90000): if "+SHREQ:" in response: break response = read_buffer(1000) # ...and process the response lines = split_msg(response) for line in lines: if len(line) == 0: continue if "+SHREQ:" in line: status_code = get_field_value(line, 1) if int(status_code) > 299: print("ERROR -- HTTP status code",status_code) break # Get the data from the modem data_length =get_field_value(line,2)if data_length =="0": break response =send_at_get_resp("AT+SHREAD=0,"+ data_length) # The JSON data may be multi-line so store everything in the # response that comes after (and including) the first '{' pos =response.find("{")if pos !=-1: result = response[pos:]end_session()else:print("ERROR -- Could not connect to server")return result'''Flash the Pico LED'''def led_blink(blinks): for i inrange(0, blinks):led_off()sleep(0.25)led_on()sleep(0.25)def led_on():led.value(1)def led_off():led.value(0)'''Split a response from the modem into separate lines,removing empty lines and returning all that's left or,if'want_line' has a non-default value,return that one line'''def split_msg(msg, want_line=99): lines =msg.split("\r\n") results = [] for i inrange(0,len(lines)):if i == want_line:return lines[i]iflen(lines[i]) >0:results.append(lines[i])return results'''Extract the SMS index from a modem response line'''def get_sms_number(line):returnget_field_value(line,1)'''Extract a comma-separated field value from a line'''def get_field_value(line, field_num): parts =line.split(",")iflen(parts) > field_num:return parts[field_num]return""'''Blink the LED n times after extracting n from the command string'''def process_command_led(msg): blinks = msg[4:]print("Blinking LED",blinks,"time(s)")try:led_blink(int(blinks)) except:print("BAD COMMAND:",blinks)'''Display the decimal value n after extracting n from the command string'''def process_command_num(msg): value = msg[4:]print("Setting",value,"on the LED")try: # Extract the decimal value (string) from 'msg' and convert # to a hex integer for easy presentation of decimal digits hex_value =int(value,16)display.set_number((hex_value &0xF000) >>12,0)display.set_number((hex_value &0x0F00) >>8,1)display.set_number((hex_value &0x00F0) >>4,2)display.set_number((hex_value &0x000F),3).update() except:print("BAD COMMAND:",value)'''Get a temperature reading and send it back as an SMS'''def process_command_tmp():print("Sending a temperature reading") celsius_temp ="{:.2f}".format(sensor.read_temp())ifsend_at("AT+CMGS=\"000\"",">"): # '>' is the prompt sent by the modem to signal that # it's waiting to receive the message text. # 'chr(26)' is the code for ctrl-z, which the modem # uses asanend-of-message marker r =send_at_get_resp(celsius_temp +chr(26))'''Make a request to a sample server'''def process_command_get():print("Requesting data...") server ="YOUR_BEECEPTOR_URL" endpoint_path ="/api/v1/status" # Attempt to open a data connectionifopen_data_conn(): result =issue_request(server, endpoint_path, None,"GET")iflen(result) >0: # Decode the received JSONtry: response =json.loads(result) # Extract an integer value and show it on the displayif"status"in response:process_command_num("NUM="+str(response["status"])) except:print("ERROR -- No JSON data received. Raw:\n",result)else:print("ERROR -- No JSON data received") # Close the open connectionclose_data_conn()'''Listen for incoming SMS Commands'''def listen():print("Listening for Commands...")while True: # Did we receive a Unsolicited Response Code (URC)? buffer =read_buffer(5000) if len(buffer) >0: lines =split_msg(buffer) for line in lines:if"+CMTI:"in line: # We received an SMS, so get it... num =get_sms_number(line) msg =send_at_get_resp("AT+CMGR="+ num,2000) # ...and process it for commands cmd =split_msg(msg,2).upper()ifcmd.startswith("LED="):process_command_led(cmd) elif cmd.startswith("NUM="):process_command_num(cmd) elif cmd.startswith("TMP"):process_command_tmp() elif cmd.startswith("GET"):process_command_get()else:print("UNKNOWN COMMAND:",cmd) # Delete all SMS now we're done with themsend_at("AT+CMGD=,4")# Globalsreq_head_set = False# Set up the modem UARTmodem =UART(0,115200)# Set up I2C and the displayi2c =I2C(1, scl=Pin(3), sda=Pin(2))display =HT16K33Segment(i2c)display.set_brightness(2)display.clear().draw()# Set up the MCP9808 sensorsensor =MCP9808(i2c=i2c)# Set the LED and turn it offled =Pin(25,Pin.OUT)led_off()# Start the modemifboot_modem():configure_modem() # Check we're attached state = Truewhile not check_network():if state:led_on()else:led_off() state = not state # Light the LEDled_on() # Begin listening for commandslisten()else: # Error! Blink LED5 timesled_blink(5)led_off()
This is the basis of the code you'll work on through the remainder of the guide. What does it do? Much of it is the code you worked on last time, so let's focus on the additions.
To communicate with Internet resources, the modem needs to establish a data connection through the cellular network, and then an HTTP connection to the target server. Lastly, it creates and sends an HTTP request to that server, and reads back the response.
The function open_data_conn() handles the first part, by sending the AT command CNACT=0,1. The 0 is the 'Packet Data Protocol (PDP) context', essentially one of a number of IP channels the modem provides. The 1 is the instruction to enable the data connection.
When the data connection is up, the code calls start_session() to open an HTTP connection to a specific server, which is passed in as an argument. The server is set using AT+SHCONF="URL","<SERVER_DOMAIN>" and the connection then opened with AT+SHCONN.
Requests are made through the function issue_request(). It calls start_session() and then sets up the request: we build the header on the modem (and keep it for future use) and then send AT+SHREQ= with the path to a resource and the value 1 as parameters — the 1 indicates it is a GET request.
The response returned by the modem contains information about the data returned by the server, which is stored on the modem. The code uses this information to get the HTTP status code — to check the request was successful — and the response's length. If the latter is non-zero, the code sends AT+SHREAD= to retrieve that many bytes from the modem's cache. issue_request() extracts any JSON in the response and returns it.
All this is triggered by the receipt of an SMS command, GET, which causes the function process_command_get() to be called. This function calls open_data_conn() and then issue_request(). It parses the received data as JSON and displays the value of a certain field on the LED display using code you worked on in the previous tutorial.
When the code runs, it turns off the Pico's built-in LED. The LED will flash rapidly five times if there was a problem booting the modem.
The LED is turned on when the device is attached to the network. If the LED is flashing slowly, that means it has not yet attached. Please be patient; it will attach shortly.
3. Set up a data source
Before you can run the code, you need to set up the data that will be retrieved. You're going to use Beeceptor as a proxy for the Internet resource your IoT device will be communicating with. In a real-world application, you would sign up to use a specific service and access that, but Beeceptor makes a very handy stand-in. Let's set it up to receive HTTP GET requests from the device.
Enter an endpoint name in the large text field and click Create Endpoint:
On the screen that appears next, click on the upper of the two clipboard icons to copy the endpoint URL:
Keep the tab open.
Jump back to your text editor and locate the process_command_get() function in the Python code. Paste the endpoint URL you got from step 3, in place of YOUR_BEECEPTOR_URL .
Save the file and then transfer it over to the Pico.
Hop back to Beeceptor and click on Mocking Rules (0) in the page shown above and then click Create New Rule .
In the third field, add api/v1/status right after the / that's already there.
Under Response Body, paste the following JSON, your test API's sample output:
{ "userId":10,"status":1234,"title":"delectus aut autem","completed":false,"datapoints":[1,2,3,4,5,6,7,8,9,0] }
The panel should look like this:
Click Save Rule and then close the Mocking Rules panel by clicking the X in the top right corner.
Again, keep the tab open.
4. Try out the code
Switch over to Minicom (or PuTTY). When you see the Listening for commands... message, open a separate terminal tab or window and enter the following command:
twilio api:supersim:v1:sms-commands:create \
--sim "<YOUR_SIM_NAME_OR_SID>" \
--payload GET
You'll need to replace the sections in angle brackets (< and >) with your own information, just as you did last time. Your SIM's SID — or friendly name if you've set one — and the specified account credentials are all accessible from the Twilio Console.
This command uses the Super SIM API's SMS Commands API to send a machine-to-machine message to the Pico. The --payload parameter tells Twilio what the body of the SMS should be: it's whatever comes after the equals sign. In this case, that's GET, the command to which we want the Pico to respond.
The listen() function in your Python code keeps an ear open for incoming SMS messages, which are signalled by the module transmitting a string that includes the characters +CMTI:. If it appears, the code sends a new AT command to the modem to get the message (AT+CMGR) and then awaits a response. When the response comes, the code processes it and extracts the GET — which tells the device to make an HTTP request of that type.
You'll see all this in your terminal window:
You should also see 1234 displayed on the LED — one part of the data received from the API you connected to!
The code listed above will output state messages, but if you want to see the full flow of AT command requests and their responses, you'll need to add a handful of extra lines. To do so, drop in this function:
'''
Output raw data
'''
def debug_output(msg):
for line in split_msg(msg): print(">>> ",line)
and add this line right before the final return in the function read_buffer():
debug_output(buffer.decode())
The output will now look like this:
5. Post data from the device
Reaching out across the Internet and requesting information is only half of the story: you also want to push data out to the cloud. Our Pico-based IoT demo is well prepared to be a source of information: it includes a temperature sensor which you can read and transmit the result by SMS if you send the command TMP by text message.
Not all data receivers accept input by SMS, however. Most will accept POST requests, though, so let's add the code to the application to support that. There are a number of changes and additions to make.
Add the following code right below the def set_request_header(): function definition:
'''
Set request body
'''
def set_request_body(body):
send_at("AT+SHCPARA;+SHPARA=\"data\",\"" + body + "\"")
'''
Make a GET, POST requests to the specified server
'''
def get_data(server, path):
return issue_request(server, path, None, "GET")
def send_data(server, path, data):
return issue_request(server, path, data, "POST")
Replace the existing issue_request() function with this code:
def issue_request(server, path, body, verb):
result = ""
# Check the request verb
code = 0
verbs = ["GET", "PUT", "POST", "PATCH", "HEAD"]
if verb.upper() in verbs:
code = verbs.index(verb) + 1
else:
print("ERROR -- Unknown request verb specified")
return ""
# Attempt to open a data session
if start_session(server):
print("HTTP session open")
# Issue the request...
set_request_header()
print("HTTP request verb code:",code)
if body != None: set_request_body(body)
response = send_at_get_resp("AT+SHREQ=\"" + path + "\"," + str(code))
start = ticks_ms()
while ((ticks_ms() - start) < 90000):
if "+SHREQ:" in response: break
response = read_buffer(1000)
# ...and process the response
lines = split_msg(response)
for line in lines:
if len(line) == 0: continue
if "+SHREQ:" in line:
status_code = get_field_value(line, 1)
if int(status_code) > 299:
print("ERROR -- HTTP status code",status_code)
break
# Get the data from the modem
data_length = get_field_value(line, 2)
if data_length == "0": break
response = send_at_get_resp("AT+SHREAD=0," + data_length)
# The JSON data may be multi-line so store everything in the
# response that comes after (and including) the first '{'
pos = response.find("{")
if pos != -1: result = response[pos:]
end_session()
else:
print("ERROR -- Could not connect to server")
return result
Replace the process_command_get() function with all of the following code:
'''
Make a request to a sample server
'''
def process_command_get():
print("Requesting data...")
server = "YOUR_BEECEPTOR_URL"
endpoint_path = "/api/v1/status"
process_request(server, endpoint_path)
def process_command_post():
print("Sending data...")
server = "YOUR_BEECEPTOR_URL"
endpoint_path = "/api/v1/logs"
process_request(server, endpoint_path, "{:.2f}".format(sensor.read_temp()))
def process_request(server, path, data=None):
# Attempt to open a data connection
if open_data_conn():
if data is not None:
result = send_data(server, path, data)
else:
result = get_data(server, path)
if len(result) > 0:
# Decode the received JSON
try:
response = json.loads(result)
# Extract an integer value and show it on the display
if "status" in response:
process_command_num("NUM=" + str(response["status"]))
except:
print("ERROR -- No JSON data received. Raw:\n",result)
else:
print("ERROR -- No JSON data received")
# Close the open connection
close_data_conn()
When you've done that, copy and paste your Beeceptor endpoint URL into the places marked YOUR_BEECEPTOR_URL.
Finally, add the following lines to the listen() function, right below the code that looks for a GET command:
Finally, transfer the updated program to the Pico.
6. Set up a data sink
You can't send data without somewhere to post it, so set that up now. Once more, Beeceptor comes to our assistance: you're going to add a mocking rule as a stand-in for an application server that will take in the data the device posts and return a status message.
Click on Mocking Rules (1) and then Create New Rule .
Under Method , select POST .
In the third field, add api/v1/logs right after the / that's already there.
Under Response Body, paste the following JSON:
{ "status":4567 }
The panel should look like this:
Click Save Rule and then close the Mocking Rules panel by clicking the X in the top right corner.
Again, keep the tab open.
7. Try out the code — part deux
Switch over to Minicom (or PuTTY). When you see the Listening for commands... message, open a separate terminal tab or window and enter the following command, filling in your details where necessary:
twilio api:supersim:v1:sms-commands:create \
--sim "<YOUR_SIM_NAME_OR_SID>" \
--payload POST
This time, you'll see all this in your terminal window:
You should also see 4567 displayed on the LED — one part of the data received bacl from the API. Speaking of the API, what did it see? Take a look at Beeceptor. It records the receipt of a POST request to /api/v1/logs, and if you click on the entry in the table, then the JSON icon ({:}) above the Request body panel, you'll see the celsius temperature as received data:
8. Next steps
You now have a Raspberry Pi Pico-based IoT device that can send and receive data across the Internet. In this demo, you've used test APIs for getting and posting data, and triggered both by manually sending an SMS command. Why not adapt the code not only to make use of different APIs — perhaps one you might make use of in your production IoT device — but also to do so automatically, at a time appropriate to the application? For example, you might include a regular weather forecast update, or pull in the output from a Slack channel. You might post device status data to your own cloud.
Wherever you take your Raspberry Pi Pico-based IoT device next, we can't wait to see what you build!
Note
When this was written, of course: the Pico W has been release since then.