Pebble/python_libs/pblprog/pebble/programmer/ftdi_swd.py
2025-01-27 11:38:16 -08:00

137 lines
4.9 KiB
Python

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from array import array
from pyftdi.pyftdi.ftdi import Ftdi
import usb.util
class FTDISerialWireDebug(object):
def __init__(self, vid, pid, interface, direction, output_mask, reset_mask, frequency):
self._direction = direction
self._output_mask = output_mask
self._reset_mask = reset_mask
self._ftdi = Ftdi()
try:
self._ftdi.open_mpsse(vid, pid, interface, direction=direction, frequency=frequency,
latency=1)
except:
self._ftdi = None
raise
# get the FTDI FIFO size and increase the chuncksize to match
self._ftdi_fifo_size = min(self._ftdi.fifo_sizes)
self._ftdi.write_data_set_chunksize(self._ftdi_fifo_size)
self._cmd_buffer = array('B')
self._output_enabled = False
self._pending_acks = 0
self._sequence_cmd_buffer = None
def close(self):
if not self._ftdi:
return
self.send_cmds()
self._ftdi.close()
# PyFTDI doesn't do a good job of cleaning up - make sure we release the usb device
usb.util.dispose_resources(self._ftdi.usb_dev)
self._ftdi = None
def _fatal(self, message):
raise Exception('FATAL ERROR: {}'.format(message))
def _queue_cmd(self, write_data):
if len(write_data) > self._ftdi_fifo_size:
raise Exception('Data too big!')
if self._sequence_cmd_buffer is not None:
self._sequence_cmd_buffer.extend(write_data)
else:
if len(self._cmd_buffer) + len(write_data) > self._ftdi_fifo_size:
self.send_cmds()
self._cmd_buffer.extend(write_data)
def _set_output_enabled(self, enabled):
if enabled == self._output_enabled:
return
self._output_enabled = enabled
direction = self._direction & ~(0x00 if enabled else self._output_mask)
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction])
def reset(self):
# toggle the reset line
self.reset_lo()
self.reset_hi()
def reset_lo(self):
direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction | self._reset_mask])
self.send_cmds()
def reset_hi(self):
direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction & ~self._reset_mask])
self.send_cmds()
def send_cmds(self):
if self._sequence_cmd_buffer is not None:
self._ftdi.write_data(self._sequence_cmd_buffer)
elif len(self._cmd_buffer) > 0:
self._ftdi.write_data(self._cmd_buffer)
self._cmd_buffer = array('B')
def write_bits_cmd(self, data, num_bits):
if num_bits < 0 or num_bits > 8:
self._fatal('Invalid num_bits')
elif (data & ((1 << num_bits) - 1)) != data:
self._fatal('Invalid data!')
self._set_output_enabled(True)
self._queue_cmd([Ftdi.WRITE_BITS_NVE_LSB, num_bits - 1, data])
def write_bytes_cmd(self, data):
length = len(data) - 1
if length < 0 or length > 0xffff:
self._fatal('Invalid length')
self._set_output_enabled(True)
self._queue_cmd([Ftdi.WRITE_BYTES_NVE_LSB, length & 0xff, length >> 8] + data)
def read_bits_cmd(self, num_bits):
if num_bits < 0 or num_bits > 8:
self._fatal('Invalid num_bits')
self._set_output_enabled(False)
self._queue_cmd([Ftdi.READ_BITS_PVE_LSB, num_bits - 1])
def read_bytes_cmd(self, length):
length -= 1
if length < 0 or length > 0xffff:
self._fatal('Invalid length')
self._set_output_enabled(False)
self._queue_cmd([Ftdi.READ_BYTES_PVE_LSB, length & 0xff, length >> 8])
def get_read_bytes(self, length):
return self._ftdi.read_data_bytes(length)
def get_read_fifo_size(self):
return self._ftdi_fifo_size
def start_sequence(self):
if self._sequence_cmd_buffer is not None:
self._fatal('Attempted to start a sequence while one is in progress')
self.send_cmds()
self._sequence_cmd_buffer = array('B')
def end_sequence(self):
if self._sequence_cmd_buffer is None:
self._fatal('No sequence started')
self._sequence_cmd_buffer = None