#!/usr/bin/env python | |
# -*- coding: utf-8 -* | |
try: | |
import pyserial as serial | |
import pyserial.tools.list_ports as ser_tools | |
except ImportError: | |
import serial | |
import serial.tools.list_ports as ser_tools | |
import sys, time, struct, logging | |
from optparse import OptionParser | |
try: | |
# python2 | |
import ConfigParser as configparser | |
except ImportError: | |
# python3 | |
import configparser | |
class Fbtool: | |
def __init__(self, config_file, meid=False): | |
self.TGT_CFG_SBC_EN = 0x00000001 | |
self.TGT_CFG_SLA_EN = 0x00000002 | |
self.TGT_CFG_DAA_EN = 0x00000004 | |
self.E_ERROR = 0x1000 | |
self.__ser = None | |
self.__connect_type = 'UNKNOWN' | |
self.__meid = meid | |
cfg = configparser.ConfigParser() | |
cfg.read(config_file) | |
self.__da1_path = cfg.get('DA1', 'da1_path') | |
self.__da1_addr = int(cfg.get('DA1', 'da1_addr'), 16) | |
self.__da1_jump_64 = int(cfg.get('DA1', 'da1_jump_64'), 16) | |
self.__da2_path = cfg.get('DA2', 'da2_path') | |
self.__da2_addr = int(cfg.get('DA2', 'da2_addr'), 16) | |
self.__auth_path = cfg.get('Auth', 'auth_path') | |
self.__cert_path = cfg.get('Cert', 'cert_path') | |
self.__fall_thru_to_fb = int(cfg.get('FALL_THRU_TO_FB', 'fall_thru_to_fb')) | |
logging.debug('da1_path: %s' %(self.__da1_path)) | |
logging.debug('da1_addr: 0x%x' %(self.__da1_addr)) | |
logging.debug('da1_jump_64: 0x%x' %(self.__da1_jump_64)) | |
logging.debug('da2_path: %s' %(self.__da2_path)) | |
logging.debug('ad2_addr: 0x%x' %(self.__da2_addr)) | |
logging.debug('auth_path: %s' %(self.__auth_path)) | |
logging.debug('cert_path: %s' %(self.__cert_path)) | |
def __del__(self): | |
# compatible with pySerial 2.6. | |
# isOpen() is deprecated since version 3.0, 3.0 uses is_open | |
if self.__ser and self.__ser.isOpen(): | |
self.__ser.close() | |
def __match_usb_br(self, vid, pid): | |
if vid == 0x0e8d and pid == 0x0003: | |
self.__connect_type = 'BROM' | |
return True | |
return False | |
def __match_usb_pl(self, vid, pid): | |
if ((vid == 0x0e8d and pid == 0x2000) or (vid == 0x0e8d and pid == 0x3000)): | |
self.__connect_type = 'PRELOADER' | |
return True | |
return False | |
def __match_usb_auto(self, vid, pid): | |
if self.__match_usb_br(vid, pid): | |
return True | |
if self.__match_usb_pl(vid, pid): | |
return True | |
return False | |
def __open_usb_device(self, match_usb_func, comport, vid, pid): | |
if match_usb_func(vid, pid): | |
time.sleep(0.1) | |
try: | |
self.__ser = serial.Serial(comport, 115200) | |
except serial.SerialException as e: | |
logging.debug('%s, retry...' %(str(e))) | |
else: | |
logging.info('Got %s' %(comport)) | |
return True | |
return False | |
def __find_usb_device(self, match_usb_func): | |
while True: | |
ports = ser_tools.comports() | |
if serial.VERSION < '3': | |
ports_list = list(ports) | |
for port in ports_list: | |
if 'USB' in port[2]: | |
if sys.platform.startswith('win'): | |
idx = port[2].index('VID_')+4 | |
vid = int(port[2][idx : idx + 4], 16) | |
idx = port[2].index('PID_')+4 | |
pid = int(port[2][idx : idx + 4], 16) | |
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): | |
idx = port[2].index('VID:PID=') + 8 | |
vid = int(port[2][idx : idx + 4], 16) | |
pid = int(port[2][idx + 5 : idx + 13], 16) | |
elif sys.platform.startswith('darwin'): | |
raise EnvironmentError('Unsupport macOS') | |
else: | |
raise EnvironmentError('Unsupported platform') | |
if self.__open_usb_device(match_usb_func, port[0], vid, pid): | |
return | |
else: | |
for port in ports: | |
if self.__open_usb_device(match_usb_func, port.device, port.vid, port.pid): | |
return | |
def __read8(self): | |
return struct.unpack('!B', self.__ser.read())[0] | |
def __read16(self): | |
return struct.unpack('!H', self.__ser.read(2))[0] | |
def __read32(self): | |
return struct.unpack('!I', self.__ser.read(4))[0] | |
def __write8(self, data, echo): | |
self.__ser.write(struct.pack('!B', data)) | |
if echo: | |
return struct.unpack('!B', self.__ser.read())[0] == data | |
return True | |
def __write16(self, data, echo): | |
self.__ser.write(struct.pack('!H', data)) | |
if echo: | |
return struct.unpack('!H', self.__ser.read(2))[0] == data | |
return True | |
def __write32(self, data, echo): | |
self.__ser.write(struct.pack('!I', data)) | |
if echo: | |
return struct.unpack('!I', self.__ser.read(4))[0] == data | |
return True | |
def __start_cmd(self): | |
cmd = (0xa0, 0x0a, 0x50, 0x05) | |
echo_cmd = (0x5f, 0xf5, 0xaf, 0xfa) | |
i = 0 | |
while (i < len(cmd)): | |
self.__write8(cmd[i], False) | |
if self.__read8() != echo_cmd[i]: | |
i = 0 | |
self.__ser.flushInput() | |
else: | |
i = i + 1 | |
time.sleep(0.1) | |
# self.__ser.flush() | |
self.__ser.flushInput() | |
self.__ser.flushOutput() | |
if self.__connect_type == 'BROM': | |
logging.info('Connect brom') | |
elif self.__connect_type == 'PRELOADER': | |
logging.info('Connect preloader') | |
def __load_binary(self, path): | |
logging.info("Loading file: %s" %path) | |
with open(path, 'rb') as f: | |
return f.read() | |
def __checksum(self, data, length): | |
checksum = 0 | |
for i in range(0, length, 2): | |
checksum ^= struct.unpack('<H', data[i:i+2])[0] | |
checksum &= 0xFFFF | |
return checksum | |
def __get_target_config(self): | |
if not self.__write8(0xd8, True): | |
return -1, None | |
cfg = self.__read32() | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status, None | |
return 0, cfg | |
def __send_auth(self, cfg): | |
if self.TGT_CFG_DAA_EN & cfg == 0: | |
return 0 | |
else: | |
if self.TGT_CFG_SBC_EN & cfg == 0: | |
logging.error('daa=1, sbc=0') | |
return -2 | |
if self.__auth_path == '': | |
logging.error('no auth file') | |
return -3 | |
auth = self.__load_binary(self.__auth_path) | |
auth_len = len(auth) | |
logging.debug("auth file size: 0x%x" %(auth_len)) | |
if not self.__write8(0xe2, True): | |
return -4 | |
if not self.__write32(len(auth), True): | |
return -5 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
self.__ser.write(auth) | |
# compare checksum | |
if self.__read16() != self.__checksum(auth, auth_len): | |
return -6 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def __strip_pl_hdr(self, pl, pl_len): | |
# EMMC_HEADER_V1 | |
identifier, ver, dev_rw_unit = struct.unpack('12s2I', pl[:20]) | |
# GFH_FILE_INFO_V1 | |
gfh = pl[:56] | |
gfh_offset = 0 | |
if identifier.strip(b'\0') == b'EMMC_BOOT' and ver == 1: | |
logging.debug('emmc_hdr: identifier:%s, ver:0x%08x, dev_rw_unit:0x%08x' %(identifier, ver, dev_rw_unit)) | |
# BR_Layout_v1 size: 40 | |
if dev_rw_unit + 40 > pl_len: | |
logging.error('EMMC HDR error. dev_rw_unit=0x%x, brlyt_size=0x%x, pl_len=0x%x' | |
%(dev_rw_unit, brlyt_size, pl_len)) | |
return False, None, None | |
brlyt_identifier, brlyt_ver = struct.unpack('8sI', pl[dev_rw_unit:dev_rw_unit + 12]) | |
logging.debug('brlyt_identifier: %s, brlyt_ver=0x%x' %(brlyt_identifier, brlyt_ver)) | |
if brlyt_identifier.strip(b'\0') != b'BRLYT' or brlyt_ver != 1: | |
logging.error('BRLYT error. ver=0x%x, identifier=%s' %(brlyt_ver, brlyt_identifier)) | |
return False, None, None | |
# BL_Descriptor | |
bl_begin_dev_addr = struct.unpack('I', pl[dev_rw_unit + 28 : dev_rw_unit + 32])[0] | |
if bl_begin_dev_addr + 56 > pl_len: | |
logging.error('BRLYT error. bl_begin_dev_addr=0x%x' %bl_begin_dev_addr) | |
return False, None, None | |
# GFH_FILE_INFO_v1 | |
gfh = pl[bl_begin_dev_addr:bl_begin_dev_addr + 56] | |
gfh_offset = bl_begin_dev_addr | |
gfh_struct =struct.unpack('I2H12sIH2B7I', gfh) | |
gfh_magic_ver = gfh_struct[0] | |
gfh_type = gfh_struct[2] | |
gfh_identifier = gfh_struct[3] | |
gfh_file_len = gfh_struct[9] | |
gfh_jump_offset = gfh_struct[13] | |
gfh_sig_len = gfh_struct[12] | |
if (gfh_magic_ver & 0x00FFFFFF) == 0x004D4D4D and gfh_type == 0 and gfh_identifier.strip(b'\0') == b'FILE_INFO': | |
if gfh_file_len < gfh_jump_offset + gfh_sig_len: | |
logging.error('GFH error. pl_len=0x%x, file_len=0x%x, jump_offset=0x%x, sig_len=0x%x' | |
%(pl_len, gfh_file_len, gfh_jump_offset, gfh_sig_len)) | |
return False, None, None | |
logging.debug('gfh: magic_ver: 0x%08x' %gfh_struct[0]) | |
logging.debug('gfh: size: 0x%04x' %gfh_struct[1]) | |
logging.debug('gfh: type: 0x%04x' %gfh_struct[2]) | |
logging.debug('gfh: identifier: %s' %gfh_struct[3]) | |
logging.debug('gfh: file_ver: 0x%08x' %gfh_struct[4]) | |
logging.debug('gfh: file_type: 0x%04x' %gfh_struct[5]) | |
logging.debug('gfh: flash_dev: 0x%02x' %gfh_struct[6]) | |
logging.debug('gfh: sig_type: 0x%02x' %gfh_struct[7]) | |
logging.debug('gfh: load_addr: 0x%08x' %gfh_struct[8]) | |
logging.debug('gfh: file_len: 0x%08x' %gfh_struct[9]) | |
logging.debug('gfh: max_size: 0x%08x' %gfh_struct[10]) | |
logging.debug('gfh: content_offset: 0x%08x' %gfh_struct[11]) | |
logging.debug('gfh: sig_len: 0x%08x' %gfh_struct[12]) | |
logging.debug('gfh: jump_offset: 0x%08x' %gfh_struct[13]) | |
logging.debug('gfh: attr: 0x%08x' %gfh_struct[14]) | |
strip_pl_len = gfh_file_len - gfh_jump_offset - gfh_sig_len | |
start = gfh_offset + gfh_jump_offset | |
strip_pl = pl[start:start + strip_pl_len] | |
return (True, strip_pl, strip_pl_len) | |
else: | |
return (True, pl, pl_len) | |
def __send_da(self, addr, da, da_len, sig, sig_len): | |
if not self.__write8(0xd7, True): | |
return -1 | |
if not self.__write32(addr, True): | |
return -2 | |
logging.debug('len: 0x%x' %(da_len + sig_len)) | |
if not self.__write32(da_len + sig_len, True): | |
return -3 | |
if not self.__write32(sig_len, True): | |
return -4 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
if da_len > 0: | |
self.__ser.write(da) | |
if sig_len > 0: | |
self.__ser.write(sig) | |
checksum = self.__checksum(da, da_len) ^ self.__checksum(sig, sig_len) | |
data = self.__read16() | |
logging.debug('checksum: 0x%x - 0x%x' %(checksum, data)) | |
if data != checksum: | |
return -5 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def __jump_da(self, addr): | |
if not self.__write8(0xd5, True): | |
return -1 | |
if not self.__write32(addr, True): | |
return -2 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def __jump_da_ex(self, addr): | |
if not self.__write8(0xde, True): | |
return -1 | |
if not self.__write32(addr, True): | |
return -2 | |
if not self.__write8(0x1, True): | |
return -3 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
if not self.__write8(0x64, True): | |
return -4 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def __get_meid(self): | |
if not self.__write8(0xe1, True): | |
return -1 | |
len = self.__read32() | |
logging.debug('meid len: 0x%x' %len) | |
data = struct.unpack('!'+str(len)+'B', self.__ser.read(len)) | |
meid_str = lambda s: ''.join(map(lambda c: '%02x' %c, s)) | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
logging.info(meid_str(data)); | |
return 0 | |
def __send_cert(self, data, len): | |
if not self.__write8(0xe0, True): | |
return -1 | |
if not self.__write32(len, True): | |
return -2 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
self.__ser.write(data) | |
checksum = self.__checksum(data, len) | |
data = self.__read16() | |
if checksum != data: | |
logging.error("checksum: 0x%x - 0x%x" %(checksum, data)) | |
return -3 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def __reboot_platform(self): | |
if not self.__write8(0xd4, True): | |
return -1 | |
if not self.__write32(0x10007000, True): | |
return -2 | |
if not self.__write32(0x1, True): | |
return -3 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
if not self.__write32(0x22000004, True): | |
return -4 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
if not self.__write8(0xd4, True): | |
return -5 | |
if not self.__write32(0x10007014, True): | |
return -6 | |
if not self.__write32(0x1, True): | |
return -7 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
if not self.__write32(0x1209, True): | |
return -8 | |
return 0 | |
def __fall_through_to_fastboot(self): | |
if not self.__write8(0xd9, True): | |
return -1 | |
status = self.__read16() | |
if status >= self.E_ERROR: | |
return status | |
return 0 | |
def start(self): | |
self.__find_usb_device(self.__match_usb_auto) | |
self.__start_cmd() | |
# get meid | |
if self.__meid: | |
status = self.__get_meid() | |
if status != 0: | |
logging.error('get meid (%d)' %status) | |
return -1 | |
return 0 | |
# send cert | |
if self.__cert_path != '': | |
cert = self.__load_binary(self.__cert_path) | |
cert_len = len(cert) | |
logging.debug('cert_len: 0x%x' %cert_len) | |
status = self.__send_cert(cert, cert_len) | |
if status != 0: | |
logging.error('send cert (%d)' %status) | |
return -1 | |
logging.info('Reboot...') | |
status = self.__reboot_platform() | |
if status != 0: | |
logging.error('reboot platform (%d)' %status) | |
return -1 | |
return 0 | |
if self.__connect_type == 'BROM': | |
status, cfg = self.__get_target_config() | |
if status != 0: | |
logging.error('get target config (%s)' %status) | |
return -1 | |
logging.debug('cfg=0x%x' %cfg) | |
status = self.__send_auth(cfg) | |
if status != 0: | |
logging.error('send auth (%d)' %status) | |
return -1 | |
da1 = self.__load_binary(self.__da1_path) | |
da1_len = len(da1) | |
logging.debug('da1 length: 0x%x' %da1_len) | |
status, strip_pl, strip_pl_len = self.__strip_pl_hdr(da1, da1_len) | |
if not status: | |
logging.error('strip pl hdr') | |
return -1 | |
sig_da1 = None | |
sig_da1_len = 0 | |
if self.TGT_CFG_DAA_EN & cfg: | |
sig_da1 = self.__load_binary(self.__da1_path + '.sign') | |
sig_da1_len = len(sig_da1) | |
logging.debug('strip_pl_len: 0x%x' %strip_pl_len) | |
logging.info('Send %s' %self.__da1_path) | |
status = self.__send_da(self.__da1_addr, strip_pl, strip_pl_len, sig_da1, sig_da1_len) | |
if status != 0: | |
logging.error('send da1 (%d)' %status) | |
return -1 | |
logging.info('Jump da') | |
if self.__da1_jump_64 == 0: | |
status = self.__jump_da(self.__da1_addr) | |
else: | |
status = self.__jump_da_ex(self.__da1_addr) | |
if status != 0: | |
logging.error('jump da1 (%d)' %status) | |
return -1 | |
self.__ser.close() | |
if self.__da2_path == '': | |
return 0 | |
# handshake to preloader | |
self.__find_usb_device(self.__match_usb_pl) | |
self.__start_cmd() | |
if self.__fall_thru_to_fb == 1: | |
logging.info("fall through to fastboot") | |
status = self.__fall_through_to_fastboot() | |
self.__ser.close() | |
if status != 0: | |
logging.error("fall through to fastboot failed(%d)" %(status)) | |
return -1 | |
else: | |
return 0 | |
# load da2 (lk) | |
da2 = self.__load_binary(self.__da2_path) | |
da2_len = len(da2) | |
sig_da2 = self.__load_binary(self.__da2_path + '.sign') | |
sig_da2_len = len(sig_da2) | |
logging.info('Send %s' %self.__da2_path) | |
status = self.__send_da(self.__da2_addr, da2, da2_len, sig_da2, sig_da2_len) | |
if status != 0: | |
logging.error('send da2 (%d)' %status) | |
return -1 | |
logging.info('Jump da2') | |
status = self.__jump_da(self.__da2_addr) | |
if status != 0: | |
logging.error('jump da2 (%d)' %status) | |
return -1 | |
self.__ser.close() | |
return 0 | |
if __name__ == '__main__': | |
parser = OptionParser() | |
parser.add_option('-f', '--file', dest='configfile', help='read config file', | |
metavar='FILE', default='dl_addr.ini') | |
parser.add_option('-d', '--debug', action='store_true', dest='debuglog', | |
default=False, help='enable debug log') | |
parser.add_option('-m', '--meid', action='store_true', dest='meid', | |
default=False, help='get meid') | |
options, args = parser.parse_args() | |
config_file = options.configfile | |
meid = options.meid | |
debug = options.debuglog | |
if debug: | |
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') | |
else: | |
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') | |
logging.info('pySerial version: (%s)' %serial.VERSION) | |
if serial.VERSION < '2.6': | |
logging.error('pySerial version(%s) is lower than 2.6, please upgrade!' %serial.VERSION) | |
logging.info('Use config file: %s' %(config_file)) | |
logging.info('Waiting to connect platform...') | |
fbtool = Fbtool(config_file, meid) | |
fbtool.start() |