mirror of
https://github.com/google/pebble.git
synced 2025-06-28 17:23:24 -04:00
137 lines
4.9 KiB
Python
137 lines
4.9 KiB
Python
# Copyright 2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from array import array
|
|
|
|
from pyftdi.pyftdi.ftdi import Ftdi
|
|
import usb.util
|
|
|
|
class FTDISerialWireDebug(object):
|
|
def __init__(self, vid, pid, interface, direction, output_mask, reset_mask, frequency):
|
|
self._direction = direction
|
|
self._output_mask = output_mask
|
|
self._reset_mask = reset_mask
|
|
self._ftdi = Ftdi()
|
|
try:
|
|
self._ftdi.open_mpsse(vid, pid, interface, direction=direction, frequency=frequency,
|
|
latency=1)
|
|
except:
|
|
self._ftdi = None
|
|
raise
|
|
|
|
# get the FTDI FIFO size and increase the chuncksize to match
|
|
self._ftdi_fifo_size = min(self._ftdi.fifo_sizes)
|
|
self._ftdi.write_data_set_chunksize(self._ftdi_fifo_size)
|
|
|
|
self._cmd_buffer = array('B')
|
|
self._output_enabled = False
|
|
self._pending_acks = 0
|
|
self._sequence_cmd_buffer = None
|
|
|
|
def close(self):
|
|
if not self._ftdi:
|
|
return
|
|
self.send_cmds()
|
|
self._ftdi.close()
|
|
# PyFTDI doesn't do a good job of cleaning up - make sure we release the usb device
|
|
usb.util.dispose_resources(self._ftdi.usb_dev)
|
|
self._ftdi = None
|
|
|
|
def _fatal(self, message):
|
|
raise Exception('FATAL ERROR: {}'.format(message))
|
|
|
|
def _queue_cmd(self, write_data):
|
|
if len(write_data) > self._ftdi_fifo_size:
|
|
raise Exception('Data too big!')
|
|
if self._sequence_cmd_buffer is not None:
|
|
self._sequence_cmd_buffer.extend(write_data)
|
|
else:
|
|
if len(self._cmd_buffer) + len(write_data) > self._ftdi_fifo_size:
|
|
self.send_cmds()
|
|
self._cmd_buffer.extend(write_data)
|
|
|
|
def _set_output_enabled(self, enabled):
|
|
if enabled == self._output_enabled:
|
|
return
|
|
self._output_enabled = enabled
|
|
direction = self._direction & ~(0x00 if enabled else self._output_mask)
|
|
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction])
|
|
|
|
def reset(self):
|
|
# toggle the reset line
|
|
self.reset_lo()
|
|
self.reset_hi()
|
|
|
|
def reset_lo(self):
|
|
direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
|
|
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction | self._reset_mask])
|
|
self.send_cmds()
|
|
|
|
def reset_hi(self):
|
|
direction = self._direction & ~(0x00 if self._output_enabled else self._output_mask)
|
|
self._queue_cmd([Ftdi.SET_BITS_LOW, 0, direction & ~self._reset_mask])
|
|
self.send_cmds()
|
|
|
|
def send_cmds(self):
|
|
if self._sequence_cmd_buffer is not None:
|
|
self._ftdi.write_data(self._sequence_cmd_buffer)
|
|
elif len(self._cmd_buffer) > 0:
|
|
self._ftdi.write_data(self._cmd_buffer)
|
|
self._cmd_buffer = array('B')
|
|
|
|
def write_bits_cmd(self, data, num_bits):
|
|
if num_bits < 0 or num_bits > 8:
|
|
self._fatal('Invalid num_bits')
|
|
elif (data & ((1 << num_bits) - 1)) != data:
|
|
self._fatal('Invalid data!')
|
|
self._set_output_enabled(True)
|
|
self._queue_cmd([Ftdi.WRITE_BITS_NVE_LSB, num_bits - 1, data])
|
|
|
|
def write_bytes_cmd(self, data):
|
|
length = len(data) - 1
|
|
if length < 0 or length > 0xffff:
|
|
self._fatal('Invalid length')
|
|
self._set_output_enabled(True)
|
|
self._queue_cmd([Ftdi.WRITE_BYTES_NVE_LSB, length & 0xff, length >> 8] + data)
|
|
|
|
def read_bits_cmd(self, num_bits):
|
|
if num_bits < 0 or num_bits > 8:
|
|
self._fatal('Invalid num_bits')
|
|
self._set_output_enabled(False)
|
|
self._queue_cmd([Ftdi.READ_BITS_PVE_LSB, num_bits - 1])
|
|
|
|
def read_bytes_cmd(self, length):
|
|
length -= 1
|
|
if length < 0 or length > 0xffff:
|
|
self._fatal('Invalid length')
|
|
self._set_output_enabled(False)
|
|
self._queue_cmd([Ftdi.READ_BYTES_PVE_LSB, length & 0xff, length >> 8])
|
|
|
|
def get_read_bytes(self, length):
|
|
return self._ftdi.read_data_bytes(length)
|
|
|
|
def get_read_fifo_size(self):
|
|
return self._ftdi_fifo_size
|
|
|
|
def start_sequence(self):
|
|
if self._sequence_cmd_buffer is not None:
|
|
self._fatal('Attempted to start a sequence while one is in progress')
|
|
self.send_cmds()
|
|
self._sequence_cmd_buffer = array('B')
|
|
|
|
def end_sequence(self):
|
|
if self._sequence_cmd_buffer is None:
|
|
self._fatal('No sequence started')
|
|
self._sequence_cmd_buffer = None
|
|
|