Get Started with Data Comms and the Raspberry Pi Pico

The Raspberry Pi Pico would be a great Internet of Things device but for one thing: it has no Internet connectivity. Fortunately, we can fix that with a Super SIM and an add-on cellular module such as Waveshare's Pico SIM7080.

In our tutorial Get Started with SMS Commands and the Raspberry Pi Pico, we combined the Pico, the Waveshare Pico SIM7080 cellular module board, an MCP9808 temperature sensor, and a four-digit, seven-segment LED display into a prototype Internet of Things (IoT) development device.

This device uses Super SIM's SMS Commands API to receive commands over-the-air and, when instructed, to send back information. This works very well for device-to-user communications routed through your cloud, but what if you want the device to be able to reach out to other Internet resources? For that you need a data connection and the ability to make HTTP requests — GET, POST, PUT, etc. — and parse the remote server's response.

This tutorial will take you through the process of adding exactly this functionality to your IoT application.

This guide requires a Twilio account. Sign up here now if you don't have one. It also requires a configured Super SIM. If you haven't set up your Super SIM in the Console, please do so now. Our Super SIM First Steps guide has extra help if you need it.


1. Set up the hardware and software

If you have already completed Get Started with SMS Commands and the Raspberry Pi Pico, you're ready to jump straight to Step 2, below. If not, run through the SMS Commands tutorial's first four steps, which cover the crucial hardware and software setup that you will need to undertake in order to complete this tutorial.

Head there now and then come back here when you've completed Step 4.


2. Prepare the initial Python code

Throughout this tutorial, you'll be pasting code from this page into a text editor, first the code below and then additional functions as you progress through the guide. At each stage, you'll copy the current code from your editor and paste it across to the Pico. The code included here entirely replaces hat from the previous tutorial in the series.

At this point, you should have a Pico with MicroPython installed. It should be fitted to the Waveshare board and connected to your computer by USB cable. You should have fired up Minicom (Mac/Linux) or PuTTY (Windows) and have the MicroPython REPL prompt, >>>. Hit Ctrl-C to exit the running program, if you don't see the prompt.

As a reminder, hit Ctrl-E to enter MicroPython's 'paste mode', paste in code copied from your text editor, and then hit Ctrl-D to start running it.

Alternatively, if you're a Mac or Linux user, you can use the pyboard.py tool to beam it over for you and relay the output to your terminal — details here.

Here's the base code listing. Copy it — click on the copy icon in the top right corner of the listing; it'll appear as you mouse over the code — and paste it into your text editor.

You can find the a complete listing of the code, including all subsequent additions, at our public GitHub repo.

Don't send it over to the Pico just yet — you'll need to complete Step 3 first.

To save scrolling, click here to jump to the rest of the tutorial.

from machine import UART, Pin, I2C
from utime import ticks_ms, sleep
import json

class MCP9808:
    """
    A simple driver for the I2C-connected MCP9808 temperature sensor.
    This release supports MicroPython.
    """

    # *********** PRIVATE PROPERTIES **********

    i2c = None
    address = 0x18

    # *********** CONSTRUCTOR **********

    def __init__(self, i2c, i2c_address=0x18):
        assert 0x00 <= i2c_address < 0x80, "ERROR - Invalid I2C address in MCP9808()"
        self.i2c = i2c
        self.address = i2c_address

    # *********** PUBLIC METHODS **********

    def read_temp(self):
        # Read sensor and return its value in degrees celsius.
        temp_bytes = self.i2c.readfrom_mem(self.address, 0x05, 2)
        # Scale and convert to signed value.
        temp_raw = (temp_bytes[0] << 8) | temp_bytes[1]
        temp_cel = (temp_raw & 0x0FFF) / 16.0
        if temp_raw & 0x1000: temp_cel -= 256.0
        return temp_cel

class HT16K33:
    """
    A simple, generic driver for the I2C-connected Holtek HT16K33 controller chip.
    This release supports MicroPython and CircuitPython

    Version:    3.0.2
    Bus:        I2C
    Author:     Tony Smith (@smittytone)
    License:    MIT
    Copyright:  2020
    """

    # *********** CONSTANTS **********

    HT16K33_GENERIC_DISPLAY_ON = 0x81
    HT16K33_GENERIC_DISPLAY_OFF = 0x80
    HT16K33_GENERIC_SYSTEM_ON = 0x21
    HT16K33_GENERIC_SYSTEM_OFF = 0x20
    HT16K33_GENERIC_DISPLAY_ADDRESS = 0x00
    HT16K33_GENERIC_CMD_BRIGHTNESS = 0xE0
    HT16K33_GENERIC_CMD_BLINK = 0x81

    # *********** PRIVATE PROPERTIES **********

    i2c = None
    address = 0
    brightness = 15
    flash_rate = 0

    # *********** CONSTRUCTOR **********

    def __init__(self, i2c, i2c_address):
        assert 0x00 <= i2c_address < 0x80, "ERROR - Invalid I2C address in HT16K33()"
        self.i2c = i2c
        self.address = i2c_address
        self.power_on()

    # *********** PUBLIC METHODS **********

    def set_blink_rate(self, rate=0):
        """
        Set the display's flash rate.
        """
        assert rate in (0, 0.5, 1, 2), "ERROR - Invalid blink rate set in set_blink_rate()"
        self.blink_rate = rate & 0x03
        self._write_cmd(self.HT16K33_GENERIC_CMD_BLINK | rate << 1)

    def set_brightness(self, brightness=15):
        """
        Set the display's brightness (ie. duty cycle).
        """
        if brightness < 0 or brightness > 15: brightness = 15
        self.brightness = brightness
        self._write_cmd(self.HT16K33_GENERIC_CMD_BRIGHTNESS | brightness)

    def draw(self):
        """
        Writes the current display buffer to the display itself.
        """
        self._render()

    def update(self):
        """
        Alternative for draw() for backwards compatibility
        """
        self._render()

    def clear(self):
        """
        Clear the buffer.
        """
        for i in range(0, len(self.buffer)): self.buffer[i] = 0x00
        return self

    def power_on(self):
        """
        Power on the controller and display.
        """
        self._write_cmd(self.HT16K33_GENERIC_SYSTEM_ON)
        self._write_cmd(self.HT16K33_GENERIC_DISPLAY_ON)

    def power_off(self):
        """
        Power on the controller and display.
        """
        self._write_cmd(self.HT16K33_GENERIC_DISPLAY_OFF)
        self._write_cmd(self.HT16K33_GENERIC_SYSTEM_OFF)

    # ********** PRIVATE METHODS **********

    def _render(self):
        """
        Write the display buffer out to I2C
        """
        buffer = bytearray(len(self.buffer) + 1)
        buffer[1:] = self.buffer
        buffer[0] = 0x00
        self.i2c.writeto(self.address, bytes(buffer))

    def _write_cmd(self, byte):
        """
        Writes a single command to the HT16K33. A private method.
        """
        self.i2c.writeto(self.address, bytes([byte]))

class HT16K33Segment(HT16K33):
    """
    Micro/Circuit Python class for the Adafruit 0.56-in 4-digit,
    7-segment LED matrix backpack and equivalent Featherwing.

    Version:    3.0.2
    Bus:        I2C
    Author:     Tony Smith (@smittytone)
    License:    MIT
    Copyright:  2020
    """

    # *********** CONSTANTS **********

    HT16K33_SEGMENT_COLON_ROW = 0x04
    HT16K33_SEGMENT_MINUS_CHAR = 0x10
    HT16K33_SEGMENT_DEGREE_CHAR = 0x11
    HT16K33_SEGMENT_SPACE_CHAR = 0x00

    # The positions of the segments within the buffer
    POS = (0, 2, 6, 8)

    # Bytearray of the key alphanumeric characters we can show:
    # 0-9, A-F, minus, degree
    CHARSET = b'\x3F\x06\x5B\x4F\x66\x6D\x7D\x07\x7F\x6F\x5F\x7C\x58\x5E\x7B\x71\x40\x63'

    # *********** CONSTRUCTOR **********

    def __init__(self, i2c, i2c_address=0x70):
        self.buffer = bytearray(16)
        super(HT16K33Segment, self).__init__(i2c, i2c_address)

    # *********** PUBLIC METHODS **********

    def set_colon(self, is_set=True):
        """
        Set or unset the display's central colon symbol.
        """
        self.buffer[self.HT16K33_SEGMENT_COLON_ROW] = 0x02 if is_set is True else 0x00
        return self

    def set_glyph(self, glyph, digit=0, has_dot=False):
        """
        Present a user-defined character glyph at the specified digit.
        """
        assert 0 <= digit < 4, "ERROR - Invalid digit (0-3) set in set_glyph()"
        assert 0 <= glyph < 0xFF, "ERROR - Invalid glyph (0x00-0xFF) set in set_glyph()"
        self.buffer[self.POS[digit]] = glyph
        if has_dot is True: self.buffer[self.POS[digit]] |= 0x80
        return self

    def set_number(self, number, digit=0, has_dot=False):
        """
        Present single decimal value (0-9) at the specified digit.
        """
        assert 0 <= digit < 4, "ERROR - Invalid digit (0-3) set in set_number()"
        assert 0 <= number < 10, "ERROR - Invalid value (0-9) set in set_number()"
        return self.set_character(str(number), digit, has_dot)

    def set_character(self, char, digit=0, has_dot=False):
        """
        Present single alphanumeric character at the specified digit.
        """
        assert 0 <= digit < 4, "ERROR - Invalid digit set in set_character()"
        char = char.lower()
        char_val = 0xFF
        if char == "deg":
            char_val = HT16K33_SEGMENT_DEGREE_CHAR
        elif char == '-':
            char_val = self.HT16K33_SEGMENT_MINUS_CHAR
        elif char == ' ':
            char_val = self.HT16K33_SEGMENT_SPACE_CHAR
        elif char in 'abcdef':
            char_val = ord(char) - 87
        elif char in '0123456789':
            char_val = ord(char) - 48
        assert char_val != 0xFF, "ERROR - Invalid char string set in set_character()"
        self.buffer[self.POS[digit]] = self.CHARSET[char_val]
        if has_dot is True: self.buffer[self.POS[digit]] |= 0x80
        return self

'''
Send an AT command - return True if we got an expected
response ('back'), otherwise False
'''
def send_at(cmd, back="OK", timeout=1000):
    # Send the command and get the response (until timeout)
    buffer = send_at_get_resp(cmd, timeout)
    if len(buffer) > 0: return (back in buffer)
    return False

'''
Send an AT command - just return the response
'''
def send_at_get_resp(cmd, timeout=1000):
    # Send the AT command
    modem.write((cmd + "\r\n").encode())

    # Read and return the response (until timeout)
    return read_buffer(timeout)

'''
Read in the buffer by sampling the UART until timeout
'''
def read_buffer(timeout):
    buffer = bytes()
    now = ticks_ms()
    while (ticks_ms() - now) < timeout and len(buffer) < 1025:
        if modem.any():
            buffer += modem.read(1)
    return buffer.decode()

'''
Module startup detection
Send a command to see if the modem is powered up
'''
def boot_modem():
    state = False
    count = 0
    while count < 20:
        if send_at("ATE1"):
            print("The modem is ready")
            return True
        if not state:
            print("Powering the modem")
            module_power()
            state = True
        sleep(4)
        count += 1
    return False

'''
Power the module on/off
'''
def module_power():
    pwr_key = Pin(14, Pin.OUT)
    pwr_key.value(1)
    sleep(1.5)
    pwr_key.value(0)

'''
Check we are attached
'''
def check_network():
    is_connected = False
    response = send_at_get_resp("AT+COPS?")
    line = split_msg(response, 1)
    if "+COPS:" in line:
        is_connected = (line.find(",") != -1)
        if is_connected: print("Network information:", line)
    return is_connected

'''
Configure the modem
'''
def configure_modem():
    # NOTE AT commands can be sent together, not one at a time.
    # Set the error reporting level, set SMS text mode, delete left-over SMS
    # select LTE-only mode, select Cat-M only mode, set the APN to 'super' for Super SIM
    send_at("AT+CMEE=2;+CMGF=1;+CMGD=,4;+CNMP=38;+CMNB=1;+CGDCONT=1,\"IP\",\"super\"")
    # Set SSL version, SSL no verify, set HTTPS request parameters
    send_at("AT+CSSLCFG=\"sslversion\",1,3;+SHSSL=1,\"\";+SHCONF=\"BODYLEN\",1024;+SHCONF=\"HEADERLEN\",350")
    print("Modem configured for Cat-M and Super SIM")

'''
Open/close a data connection to the server
'''
def open_data_conn():
    # Activate a data connection using PDP 0,
    # but first check it's not already open
    response = send_at_get_resp("AT+CNACT?")
    line = split_msg(response, 1)
    status = get_field_value(line, 1)

    if status == "0":
        # Inactive data connection so start one up
        success = send_at("AT+CNACT=0,1", "ACTIVE", 2000)
    elif status in ("1", "2"):
        # Active or operating data connection
        success = True

    print("Data connection", "active" if success else "inactive")
    return success

def close_data_conn():
    # Just close the connection down
    send_at("AT+CNACT=0,0")
    print("Data connection inactive")

'''
Start/end an HTTP session
'''
def start_session(server):
    # Deal with an existing session if there is one
    if send_at("AT+SHSTATE?", "1"):
        print("Closing existing HTTP session")
        send_at("AT+SHDISC")

    # Configure a session with the server...
    send_at("AT+SHCONF=\"URL\",\"" + server + "\"")

    # ...and open it
    resp = send_at_get_resp("AT+SHCONN", 2000)
    # The above command may take a while to return, so
    # continue to check the UART until we have a response,
    # or 90s passes (timeout)
    now = ticks_ms()
    while ((ticks_ms() - now) < 90000):
        #if len(resp) > 0: print(resp)
        if "OK" in resp: return True
        if "ERROR" in resp: return False
        resp = read_buffer(1000)
    return False

def end_session():
    # Break the link to the server
    send_at("AT+SHDISC")
    print("HTTP session closed")

'''
Set a standard request header
'''
def set_request_header():
    global req_head_set

    # Check state variable to see if we need to
    # set the standard request header
    if not req_head_set:
        send_at("AT+SHCHEAD")
        send_at("AT+SHAHEAD=\"Content-Type\",\"application/x-www-form-urlencoded\";+SHAHEAD=\"User-Agent\",\"twilio-pi-pico/1.0.0\"")
        send_at("AT+SHAHEAD=\"Cache-control\",\"no-cache\";+SHAHEAD=\"Connection\",\"keep-alive\";+SHAHEAD=\"Accept\",\"*/*\"")
        req_head_set = True

'''
Make a request to the specified server
'''
def issue_request(server, path, body, verb):
    result = ""

    # Check the request verb
    code = 0
    verbs = ["GET", "PUT", "POST", "PATCH", "HEAD"]
    if verb.upper() in verbs:
        code = verbs.index(verb) + 1
    else:
        print("ERROR -- Unknown request verb specified")
        return ""

    # Attempt to open a data session
    if start_session(server):
        print("HTTP session open")
        # Issue the request...
        set_request_header()
        print("HTTP request verb code:",code)
        if body != None: set_request_body(body)
        response = send_at_get_resp("AT+SHREQ=\"" + path + "\"," + str(code))
        start = ticks_ms()
        while ((ticks_ms() - start) < 90000):
            if "+SHREQ:" in response: break
            response = read_buffer(1000)

        # ...and process the response
        lines = split_msg(response)
        for line in lines:
            if len(line) == 0: continue
            if "+SHREQ:" in line:
                status_code = get_field_value(line, 1)
                if int(status_code) > 299:
                    print("ERROR -- HTTP status code",status_code)
                    break

                # Get the data from the modem
                data_length = get_field_value(line, 2)
                if data_length == "0": break
                response = send_at_get_resp("AT+SHREAD=0," + data_length)

                # The JSON data may be multi-line so store everything in the
                # response that comes after (and including) the first '{'
                pos = response.find("{")
                if pos != -1: result = response[pos:]
        end_session()
    else:
        print("ERROR -- Could not connect to server")
    return result

'''
Flash the Pico LED
'''
def led_blink(blinks):
    for i in range(0, blinks):
        led_off()
        sleep(0.25)
        led_on()
        sleep(0.25)

def led_on():
    led.value(1)

def led_off():
    led.value(0)

'''
Split a response from the modem into separate lines,
removing empty lines and returning all that's left or,
if 'want_line' has a non-default value, return that one line
'''
def split_msg(msg, want_line=99):
    lines = msg.split("\r\n")
    results = []
    for i in range(0, len(lines)):
        if i == want_line:
            return lines[i]
        if len(lines[i]) > 0:
            results.append(lines[i])
    return results

'''
Extract the SMS index from a modem response line
'''
def get_sms_number(line):
    return get_field_value(line, 1)

'''
Extract a comma-separated field value from a line
'''
def get_field_value(line, field_num):
    parts = line.split(",")
    if len(parts) > field_num:
        return parts[field_num]
    return ""

'''
Blink the LED n times after extracting n from the command string
'''
def process_command_led(msg):
    blinks = msg[4:]
    print("Blinking LED",blinks,"time(s)")
    try:
        led_blink(int(blinks))
    except:
        print("BAD COMMAND:",blinks)

'''
Display the decimal value n after extracting n from the command string
'''
def process_command_num(msg):
    value = msg[4:]
    print("Setting",value,"on the LED")
    try:
        # Extract the decimal value (string) from 'msg' and convert
        # to a hex integer for easy presentation of decimal digits
        hex_value = int(value, 16)
        display.set_number((hex_value & 0xF000) >> 12, 0)
        display.set_number((hex_value & 0x0F00) >>  8, 1)
        display.set_number((hex_value & 0x00F0) >>  4, 2)
        display.set_number((hex_value & 0x000F), 3).update()
    except:
        print("BAD COMMAND:",value)

'''
Get a temperature reading and send it back as an SMS
'''
def process_command_tmp():
    print("Sending a temperature reading")
    celsius_temp = "{:.2f}".format(sensor.read_temp())
    if send_at("AT+CMGS=\"000\"", ">"):
        # '>' is the prompt sent by the modem to signal that
        # it's waiting to receive the message text.
        # 'chr(26)' is the code for ctrl-z, which the modem
        # uses as an end-of-message marker
        r = send_at_get_resp(celsius_temp + chr(26))

'''
Make a request to a sample server
'''
def process_command_get():
    print("Requesting data...")
    server = "YOUR_BEECEPTOR_URL"
    endpoint_path = "/api/v1/status"

    # Attempt to open a data connection
    if open_data_conn():
        result = issue_request(server, endpoint_path, None, "GET")
        if len(result) > 0:
            # Decode the received JSON
            try:
                response = json.loads(result)
                # Extract an integer value and show it on the display
                if "status" in response:
                    process_command_num("NUM=" + str(response["status"]))
            except:
                print("ERROR -- No JSON data received. Raw:\n",result)
        else:
            print("ERROR -- No JSON data received")

        # Close the open connection
        close_data_conn()

'''
Listen for incoming SMS Commands
'''
def listen():
    print("Listening for Commands...")
    while True:
        # Did we receive a Unsolicited Response Code (URC)?
        buffer = read_buffer(5000)
        if len(buffer) > 0:
            lines = split_msg(buffer)
            for line in lines:
                if "+CMTI:" in line:
                    # We received an SMS, so get it...
                    num = get_sms_number(line)
                    msg = send_at_get_resp("AT+CMGR=" + num, 2000)

                    # ...and process it for commands
                    cmd = split_msg(msg, 2).upper()
                    if cmd.startswith("LED="):
                        process_command_led(cmd)
                    elif cmd.startswith("NUM="):
                        process_command_num(cmd)
                    elif cmd.startswith("TMP"):
                        process_command_tmp()
                    elif cmd.startswith("GET"):
                        process_command_get()
                    else:
                        print("UNKNOWN COMMAND:",cmd)
                    # Delete all SMS now we're done with them
                    send_at("AT+CMGD=,4")

# Globals
req_head_set = False

# Set up the modem UART
modem = UART(0, 115200)

# Set up I2C and the display
i2c = I2C(1, scl=Pin(3), sda=Pin(2))
display = HT16K33Segment(i2c)
display.set_brightness(2)
display.clear().draw()

# Set up the MCP9808 sensor
sensor = MCP9808(i2c=i2c)

# Set the LED and turn it off
led = Pin(25, Pin.OUT)
led_off()

# Start the modem
if boot_modem():
    configure_modem()

    # Check we're attached
    state = True
    while not check_network():
        if state:
            led_on()
        else:
            led_off()
        state = not state

    # Light the LED
    led_on()

    # Begin listening for commands
    listen()
else:
    # Error! Blink LED 5 times
    led_blink(5)
    led_off()

This is the basis of the code you'll work on through the remainder of the guide. What does it do? Much of it is the code you worked on last time, so let's focus on the additions.

To communicate with Internet resources, the modem needs to establish a data connection through the cellular network, and then an HTTP connection to the target server. Lastly, it creates and sends an HTTP request to that server, and reads back the response.

The function open_data_conn() handles the first part, by sending the AT command CNACT=0,1. The 0 is the 'Packet Data Protocol (PDP) context', essentially one of a number of IP channels the modem provides. The 1 is the instruction to enable the data connection.

When the data connection is up, the code calls start_session() to open an HTTP connection to a specific server, which is passed in as an argument. The server is set using AT+SHCONF="URL","<SERVER_DOMAIN>" and the connection then opened with AT+SHCONN.

Requests are made through the function issue_request(). It calls start_session() and then sets up the request: we build the header on the modem (and keep it for future use) and then send AT+SHREQ= with the path to a resource and the value 1 as parameters — the 1 indicates it is a GET request.

The response returned by the modem contains information about the data returned by the server, which is stored on the modem. The code uses this information to get the HTTP status code — to check the request was successful — and the response's length. If the latter is non-zero, the code sends AT+SHREAD= to retrieve that many bytes from the modem's cache. issue_request() extracts any JSON in the response and returns it.

All this is triggered by the receipt of an SMS command, GET, which causes the function process_command_get() to be called. This function calls open_data_conn() and then issue_request(). It parses the received data as JSON and displays the value of a certain field on the LED display using code you worked on in the previous tutorial.

When the code runs, it turns off the Pico's built-in LED. The LED will flash rapidly five times if there was a problem booting the modem.

The LED is turned on when the device is attached to the network. If the LED is flashing slowly, that means it has not yet attached. Please be patient; it will attach shortly.


3. Set up a data source

Before you can run the code, you need to set up the data that will be retrieved. You're going to use Beeceptor as a proxy for the Internet resource your IoT device will be communicating with. In a real-world application, you would sign up to use a specific service and access that, but Beeceptor makes a very handy stand-in. Let's set it up to receive HTTP GET requests from the device.

  1. In a web browser tab, go to Beeceptor .

  2. Enter an endpoint name in the large text field and click Create Endpoint:

  3. On the screen that appears next, click on the upper of the two clipboard icons to copy the endpoint URL:

  4. Keep the tab open.

  5. Jump back to your text editor and locate the process_command_get() function in the Python code. Paste the endpoint URL you got from step 3, in place of YOUR_BEECEPTOR_URL .

  6. Save the file and then transfer it over to the Pico.

  7. Hop back to Beeceptor and click on Mocking Rules (0) in the page shown above and then click Create New Rule .

  8. In the third field, add api/v1/status right after the / that's already there.

  9. Under Response Body, paste the following JSON, your test API's sample output:

{ "userId":10,"status":1234,"title":"delectus aut autem","completed":false,"datapoints":[1,2,3,4,5,6,7,8,9,0] }
  1. The panel should look like this:

  1. Click Save Rule and then close the Mocking Rules panel by clicking the X in the top right corner.

  2. Again, keep the tab open.


4. Try out the code

Switch over to Minicom (or PuTTY). When you see the Listening for commands... message, open a separate terminal tab or window and enter the following command:

twilio api:supersim:v1:sms-commands:create \
  --sim "<YOUR_SIM_NAME_OR_SID>" \
  --payload GET

You'll need to replace the sections in angle brackets (< and >) with your own information, just as you did last time. Your SIM's SID — or friendly name if you've set one — and the specified account credentials are all accessible from the Twilio Console.

This command uses the Super SIM API's SMS Commands API to send a machine-to-machine message to the Pico. The --payload parameter tells Twilio what the body of the SMS should be: it's whatever comes after the equals sign. In this case, that's GET, the command to which we want the Pico to respond.

The listen() function in your Python code keeps an ear open for incoming SMS messages, which are signalled by the module transmitting a string that includes the characters +CMTI:. If it appears, the code sends a new AT command to the modem to get the message (AT+CMGR) and then awaits a response. When the response comes, the code processes it and extracts the GET — which tells the device to make an HTTP request of that type.

You'll see all this in your terminal window:

You should also see 1234 displayed on the LED — one part of the data received from the API you connected to!

The code listed above will output state messages, but if you want to see the full flow of AT command requests and their responses, you'll need to add a handful of extra lines. To do so, drop in this function:

'''
Output raw data
'''
def debug_output(msg):
    for line in split_msg(msg): print(">>> ",line)

and add this line right before the final return in the function read_buffer():

debug_output(buffer.decode())

The output will now look like this:


5. Post data from the device

Reaching out across the Internet and requesting information is only half of the story: you also want to push data out to the cloud. Our Pico-based IoT demo is well prepared to be a source of information: it includes a temperature sensor which you can read and transmit the result by SMS if you send the command TMP by text message.

Not all data receivers accept input by SMS, however. Most will accept POST requests, though, so let's add the code to the application to support that. There are a number of changes and additions to make.

Add the following code right below the def set_request_header(): function definition:

'''
Set request body
'''
def set_request_body(body):
    send_at("AT+SHCPARA;+SHPARA=\"data\",\"" + body + "\"")

'''
Make a GET, POST requests to the specified server
'''
def get_data(server, path):
    return issue_request(server, path, None, "GET")

def send_data(server, path, data):
    return issue_request(server, path, data, "POST")

Replace the existing issue_request() function with this code:

def issue_request(server, path, body, verb):
    result = ""

    # Check the request verb
    code = 0
    verbs = ["GET", "PUT", "POST", "PATCH", "HEAD"]
    if verb.upper() in verbs:
        code = verbs.index(verb) + 1
    else:
        print("ERROR -- Unknown request verb specified")
        return ""

    # Attempt to open a data session
    if start_session(server):
        print("HTTP session open")
        # Issue the request...
        set_request_header()
        print("HTTP request verb code:",code)
        if body != None: set_request_body(body)
        response = send_at_get_resp("AT+SHREQ=\"" + path + "\"," + str(code))
        start = ticks_ms()
        while ((ticks_ms() - start) < 90000):
            if "+SHREQ:" in response: break
            response = read_buffer(1000)

        # ...and process the response
        lines = split_msg(response)
        for line in lines:
            if len(line) == 0: continue
            if "+SHREQ:" in line:
                status_code = get_field_value(line, 1)
                if int(status_code) > 299:
                    print("ERROR -- HTTP status code",status_code)
                    break

                # Get the data from the modem
                data_length = get_field_value(line, 2)
                if data_length == "0": break
                response = send_at_get_resp("AT+SHREAD=0," + data_length)

                # The JSON data may be multi-line so store everything in the
                # response that comes after (and including) the first '{'
                pos = response.find("{")
                if pos != -1: result = response[pos:]
        end_session()
    else:
        print("ERROR -- Could not connect to server")
    return result

Replace the process_command_get() function with all of the following code:

'''
Make a request to a sample server
'''
def process_command_get():
    print("Requesting data...")
    server = "YOUR_BEECEPTOR_URL"
    endpoint_path = "/api/v1/status"
    process_request(server, endpoint_path)

def process_command_post():
    print("Sending data...")
    server = "YOUR_BEECEPTOR_URL"
    endpoint_path = "/api/v1/logs"
    process_request(server, endpoint_path, "{:.2f}".format(sensor.read_temp()))

def process_request(server, path, data=None):
    # Attempt to open a data connection
    if open_data_conn():
        if data is not None:
            result = send_data(server, path, data)
        else:
            result = get_data(server, path)

        if len(result) > 0:
            # Decode the received JSON
            try:
                response = json.loads(result)
                # Extract an integer value and show it on the display
                if "status" in response:
                    process_command_num("NUM=" + str(response["status"]))
            except:
                print("ERROR -- No JSON data received. Raw:\n",result)
        else:
            print("ERROR -- No JSON data received")

        # Close the open connection
        close_data_conn()

When you've done that, copy and paste your Beeceptor endpoint URL into the places marked YOUR_BEECEPTOR_URL.

Finally, add the following lines to the listen() function, right below the code that looks for a GET command:

elif cmd.startswith("POST"):
    process_command_post()

Finally, transfer the updated program to the Pico.


6. Set up a data sink

You can't send data without somewhere to post it, so set that up now. Once more, Beeceptor comes to our assistance: you're going to add a mocking rule as a stand-in for an application server that will take in the data the device posts and return a status message.

  1. Switch to the web browser tab showing Beeceptor .

  2. Click on Mocking Rules (1) and then Create New Rule .

  3. Under Method , select POST .

  4. In the third field, add api/v1/logs right after the / that's already there.

  5. Under Response Body, paste the following JSON:

{ "status":4567 }
  1. The panel should look like this:

  1. Click Save Rule and then close the Mocking Rules panel by clicking the X in the top right corner.

  2. Again, keep the tab open.


7. Try out the code — part deux

Switch over to Minicom (or PuTTY). When you see the Listening for commands... message, open a separate terminal tab or window and enter the following command, filling in your details where necessary:

twilio api:supersim:v1:sms-commands:create \
  --sim "<YOUR_SIM_NAME_OR_SID>" \
  --payload POST

This time, you'll see all this in your terminal window:

You should also see 4567 displayed on the LED — one part of the data received bacl from the API. Speaking of the API, what did it see? Take a look at Beeceptor. It records the receipt of a POST request to /api/v1/logs, and if you click on the entry in the table, then the JSON icon ({:}) above the Request body panel, you'll see the celsius temperature as received data:


8. Next steps

You now have a Raspberry Pi Pico-based IoT device that can send and receive data across the Internet. In this demo, you've used test APIs for getting and posting data, and triggered both by manually sending an SMS command. Why not adapt the code not only to make use of different APIs — perhaps one you might make use of in your production IoT device — but also to do so automatically, at a time appropriate to the application? For example, you might include a regular weather forecast update, or pull in the output from a Slack channel. You might post device status data to your own cloud.

Wherever you take your Raspberry Pi Pico-based IoT device next, we can't wait to see what you build!


Note

When this was written, of course: the Pico W has been release since then.

Last updated