Расскажу я вам сегодня о том, как пытался я добраться из питона до интерфейса жесткого диска, и что из этого получилось.
Появляется у меня периодически необходимость тестирования большого количества жестких дисков. Обычно для этого используется досовая Victoria загружающаяся по сети. Она тестирует диски по одному, что не очень удобно. К тому же последнее время пошли платы не имеющие режима IDE, что дополнительно усложняет задачу. По началу у меня возникла идея взять готовый софт под линукс с открытыми исходниками и добавить ему возможность параллельного тестирования нескольких дисков. После беглого поиска выяснилось удручающее состояние этой области в линуксе. Из софта, ведущего при тестировании статистику по времени доступа к секторам и типам ошибок нашел только whdd. Попытка разобраться с кодом whdd закончилась полным провалом. Для меня, ни разу не программиста, код показался очень запутанным. К тому же большую его часть занимает совсем не работа с железом.
Поняв, что простого решения не предвидится я решил попробовать написать подобную программу самостоятельно. Понимая, что подобный проект на C я не осилю я начал изучать возможность прямой работы с дисками из python, которым я частенько пользуюсь для решения простых задач и люблю за простоту и понятность. Информации по этому вопросу в сети кот наплакал, но все же я выяснил, что существует модуль fcntl который в том числе позволяет отправлять устройству ioctl запросы. Теперь у меня появилась возможность отправлять команды диску. Но в линуксе все диски считаются scsi дисками, а для тестирования нужно передавать диску непосредственно ata команды. Оказалось существует механизм ATA Command Pass-Through, позволяющий обернуть ata команду в scsi запрос. Основную информацию о том, как это использовать удалось почерпнуть из исходных текстов проекта sg3_utils. Осталось попробовать реализовать это все на питоне.
Для того, чтобы создать в питоне структуры аналогичные структурам языка C, для последующей передачи их в ioctl, существует модуль ctypes. Отдельно стоит упомянуть количество седых волос появившихся в результате отладки странных глюков с этими структурами. Так я открыл для себя знание о выравнивании структур в C. В результате родились две структуры:
Структура для ATA Pass-Through:
class ataCmd(ctypes.Structure):
_pack_ = 1
_fields_ = [
('opcode', ctypes.c_ubyte),
('protocol', ctypes.c_ubyte),
('flags', ctypes.c_ubyte),
('features', ctypes.c_ushort),
('sector_count', ctypes.c_ushort),
('lba_h_low', ctypes.c_ubyte),
('lba_low', ctypes.c_ubyte),
('lba_h_mid', ctypes.c_ubyte),
('lba_mid', ctypes.c_ubyte),
('lba_h_high', ctypes.c_ubyte),
('lba_high', ctypes.c_ubyte),
('device', ctypes.c_ubyte),
('command', ctypes.c_ubyte),
('control', ctypes.c_ubyte)]
И структура для ioctl:
class sgioHdr(ctypes.Structure):
_pack_ = 1
_fields_ = [
('interface_id', ctypes.c_int), # [i] 'S' for SCSI generic (required)
('dxfer_direction', ctypes.c_int), # [i] data transfer direction
('cmd_len', ctypes.c_ubyte), # [i] SCSI command length ( <= 16 bytes)
('mx_sb_len', ctypes.c_ubyte), # [i] max length to write to sbp
('iovec_count', ctypes.c_ushort), # [i] 0 implies no scatter gather
('dxfer_len', ctypes.c_uint), # [i] byte count of data transfer
('dxferp', ctypes.c_void_p), # [i], [*io] points to data transfer memory
('cmdp', ctypes.c_void_p), # [i], [*i] points to command to perform
('sbp', ctypes.c_void_p), # [i], [*o] points to sense_buffer memory
('timeout', ctypes.c_uint), # [i] MAX_UINT->no timeout (unit: millisec)
('flags', ctypes.c_uint), # [i] 0 -> default, see SG_FLAG...
('pack_id', ctypes.c_int), # [i->o] unused internally (normally)
('usr_ptr', ctypes.c_void_p), # [i->o] unused internally
('status', ctypes.c_ubyte), # [o] scsi status
('masked_status', ctypes.c_ubyte), # [o] shifted, masked scsi status
('msg_status', ctypes.c_ubyte), # [o] messaging level data (optional)
('sb_len_wr', ctypes.c_ubyte), # [o] byte count actually written to sbp
('host_status', ctypes.c_ushort), # [o] errors from host adapter
('driver_status', ctypes.c_ushort), # [o] errors from software driver
('resid', ctypes.c_int), # [o] dxfer_len - actual_transferred
('duration', ctypes.c_uint), # [o] time taken by cmd (unit: millisec)
('info', ctypes.c_uint)] # [o] auxiliary information
Поскольку заполнение этих структур требуется перед каждой дисковой операцией и занимает много места, эта операция вынесена в отдельную функцию. В многобайтовых значениях нужно поменять порядок байтов.
def prepareSgio(cmd, feature, count, lba, direction, sense, buf):
if direction == SG_DXFER_FROM_DEV:
buf_len = ctypes.sizeof(buf)
buf_p = ctypes.cast(buf, ctypes.c_void_p)
prot = 4 << 1 # PIO Data-In
elif direction == SG_DXFER_TO_DEV:
buf_len = ctypes.sizeof(buf)
buf_p = ctypes.cast(buf, ctypes.c_void_p)
prot = 5 << 1 # PIO Data-Out
else:
buf_len = 0
buf_p = None
prot = 3 << 1 # Non-data
if cmd != 0xb0: # not SMART COMMAND
prot = prot | 1 # + EXTEND
sector_lba = lba.to_bytes(6, byteorder='little')
ata_cmd = ataCmd(opcode=0x85, # ATA PASS-THROUGH (16)
protocol=prot,
# flags field
# OFF_LINE = 0 (0 seconds offline)
# CK_COND = 1 (copy sense data in response)
# T_DIR = 1 (transfer from the ATA device)
# BYT_BLOK = 1 (length is in blocks, not bytes)
# T_LENGTH = 2 (transfer length in the SECTOR_COUNT field)
flags=0x2e,
features=swap16(feature),
sector_count=swap16(count),
lba_h_low=sector_lba[3], lba_low=sector_lba[0],
lba_h_mid=sector_lba[4], lba_mid=sector_lba[1],
lba_h_high=sector_lba[5], lba_high=sector_lba[2],
device=0,
command=cmd,
control=0)
sgio = sgioHdr(interface_id=ASCII_S, dxfer_direction=direction,
cmd_len=ctypes.sizeof(ata_cmd),
mx_sb_len=ctypes.sizeof(sense), iovec_count=0,
dxfer_len=buf_len,
dxferp=buf_p,
cmdp=ctypes.addressof(ata_cmd),
sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=1000,
flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0,
msg_status=0, sb_len_wr=0, host_status=0, driver_status=0,
resid=0, duration=0, info=0)
return sgio
Эта функция принимает ata команду, параметры и буферы а возвращает готовую структуру для ioctl запроса. Дальше все просто. Создаем буфер в котором вернутся статус выполнения команды и содержимое ata регистров статуса и ошибки. Создаем буфер для сектора, прочитанного с диска. Заполняем структуры и выполняем нашу первую ata команду.
sense = ctypes.c_buffer(64)
identify = ctypes.c_buffer(512)
sgio = prepareSgio(0xec, 0, 0, 0, SG_DXFER_FROM_DEV, sense, identify) # IDENTIFY
with open(dev, 'r') as fd:
if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0:
return None # fcntl failed!
В ответ получаем сектор с информацией о диске:
0000000: 5a04 ff3f 37c8 1000 0000 0000 3f00 0000 Z..?7.......?...
0000010: 0000 0000 2020 2020 2020 4b4a 3131 3142 .... KJ111B
0000020: 3942 5647 4142 4659 0300 5fea 3800 4b4a 9BVGABFY.._.8.KJ
0000030: 4f41 3341 4145 6948 6174 6863 2069 5548 OA3AAEiHathc iUH
0000040: 3741 3232 3230 4130 414c 3333 2030 2020 7A2220A0AL33 0
0000050: 2020 2020 2020 2020 2020 2020 2020 1080 ..
0000060: 0040 002f 0040 0002 0002 0700 ff3f 1000 .@./.@.......?..
0000070: 3f00 10fc fb00 0001 ffff ff0f 0000 0700 ?...............
0000080: 0300 7800 7800 7800 7800 0000 0000 0000 ..x.x.x.x.......
0000090: 0000 0000 0000 1f00 0617 0000 5e00 4400 ............^.D.
00000a0: fc01 2900 6b34 697d 7347 6934 41bc 6347 ..).k4i}sGi4A.cG
00000b0: 7f40 0401 0000 0000 feff 0000 0000 0800 .@..............
00000c0: ca00 f900 1027 0000 b088 e0e8 0000 0000 .....'..........
00000d0: ca00 0000 0000 875a 0050 a2cc cb22 44fc .......Z.P..."D.
00000e0: 0000 0000 0000 0000 0000 0000 0000 1440 ...............@
00000f0: 1440 0000 0000 0000 0000 0000 0000 0000 .@..............
0000100: 0100 0b00 0000 0000 8020 f10d 20fa 0100 ......... .. ...
0000110: 0040 0404 0403 0000 0000 0502 0604 0504 .@..............
0000120: 0506 0803 0506 0504 0505 0603 0505 0000 ................
0000130: 3741 3342 0000 0a78 0000 bd5d d3a1 0080 7A3B...x...]....
0000140: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000150: 0200 0000 0000 0000 0000 0000 0000 0000 ................
0000160: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000170: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000180: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000190: 0000 0000 0000 0000 0000 0000 3d00 0000 ............=...
00001a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001b0: 0000 201c 0000 0000 0000 0000 1f10 2100 .. ...........!.
00001c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001d0: 0000 0000 0100 e003 0000 0000 0000 0000 ................
00001e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001f0: 0000 0000 0000 0000 0000 0000 0000 a503 ................
В нем содержится полная информация о диске, извлечем основную.
serial = swapString(identify[20:40])
firmware = swapString(identify[46:53])
model = swapString(identify[54:93])
sectors = int.from_bytes(identify[200] + identify[201] + identify[202] + identify[203] +
identify[204] + identify[205] + identify[206] + identify[207], byteorder='little')
В результате получаем:
модель: Hitachi HUA722020ALA330; прошивка: JKAOA3; серийный номер: JK11A1YAJE2N5V; число секторов: 3907029168.
Теперь мы умеем отправлять ata команды диску и получать от него ответы. Потихоньку результат моей работы оформился в библиотечку, содержащую реализацию основного набора ata команд, включая чтение SMART. Кому интересно можно взглянуть на нее здесь. Качество кода сильно не ругайте, я не
Теперь осталось с ее помощью написать утилиту тестирования. Чувствую меня ждет еще много открытий.
Комментарии (24)
Sergey_datex
28.12.2015 17:24Открытий будет еще очень много. Основное — вам необходимо будет точно измерять время выполнения команд (вы же хотите качественно тестировать диски?), а при параллельной работе с этим будут большие проблемы. Многозадачность тоже вносит погрешность.
Обратите внимание на вкладку настроек Victoria Win, и режимы работы таймера.kazenniy
28.12.2015 17:45+1В структуре sgioHdr есть параметр duration. Судя по описанию в scsi/sg.h «time taken by cmd (unit: millisec)», он должен заполнятся системой и содержать время занятое на выполнение команды. Пока не пробовал, но надеюсь это работает и тогда измерять время самому не придется.
Про открытия не сомневаюсь, опыта работы с многопоточностью еще не было.
quartz64
28.12.2015 17:261. Смотрим через smartctl информацию о диске и атрибуты. Бывает, что сразу Failed выдаёт по какому-нибудь параметру — в морг.
2. Запускаем через тот же smartctl короткий и длинный self-test'ы. Не проходит — в морг.
3. Гоняем badblocks с записью и/или скрипт с fio (он умеет проверять записанное по хешам). Смотрим, не растут ли ремапы.
4. Для SAS можно выполнить sg_format и повторить тесты.
Чем плох такой алгоритм?kazenniy
28.12.2015 17:55+1С полумертвыми дисками все просто, мне нужно ловить более тонкие случаи. Например диски выпадают из массива но тесты производителя проходят без ошибок.
Sergey_datex
28.12.2015 17:28Не успел отредактировать предыдущий комментарий:
Как минимум если ядер процессора будет меньше чем тестируемых дисков, появляется большое влияние процессов друг на друга и сильное снижение скорости тестирование (так на win)
amarao
28.12.2015 19:07+4Хотел написать типичный русский комментарий:
Иметь библиотеку для работы с SG-подмножеством команд SCSI было бы неплохо, но то, что вы написали, библиотекой не является. Ни в каком смысле. Не описаны интерфейсы для работы с библиотекой, полностью отсутствует обработка ошибок, тестов нет.
Но всё это неправда. Вы делаете правильную и хорошую вещь, потому что на питоне нет нормального интерфейса для работы со смартом (лучшее, что есть пока что — это pysmart, который враппер над smartmontools), а для sg нет вообще ничего.
Я вокруг этого сейчас много пишу, если очередь дойдёт до sg-шных команд, попробую начать использовать/писать.kazenniy
28.12.2015 20:08+1На моем уровне знания питона я еще плохо понимаю, какими атрибутами должна обладать полноценная библиотека. Пока это просто набор функций. Теперь будет мотивация разобраться с этим. Спасибо за отзыв.
amarao
28.12.2015 22:39Ну, первое, что нужно, это документация с примерами вызовов. Второе — стабильный и более-менее предсказуемый интерфейс. Третье — своя иерархия exception'ов, чтобы не ловить неожиданные и непонятные OSError, IOError непонятно где. Четвёртая — тесты (py.test, например). Особенно часть с упаковкой данных в структуры — оно точно должно быть покрыто тестами, ибо без боли разобраться в этом нельзя будет (в силу предметной области).
Вот это:
231: «Temperature_Celsius» — на современных SSD'шках этот же атрибут соответствует SSD Life Left:
См: media.kingston.com/support/downloads/MKP_306_SMART_attribute.pdf (поиск: 231)kazenniy
29.12.2015 07:11+1Со стабильностью интерфейса пока будет не очень. Некоторые вещи там можно сделать по разному и я пока не понимаю, как лучше. Думаю интерфейс придется устаканивать в процессе пробного использования. Свои exception`ы в планах. Понимаю, что возвращать при ошибках None это не дело. С тестами у меня все плохо. По скольку сейчас для меня питон это эдакий продвинутый bash, то и темы автоматического тестирования я не касался вовсе. Придется учится.
Касательно смарта там вообще сплошная боль. Номера атрибутов у разных производителей могут иметь разное значение. Raw значения могут иметь разный размер и разное количество значений и т.д. Смотрел на этот счет исходники smartmontools. Ребята проделали очень большую работу сводя это воедино.
Может подскажете пример небольшой но правильно написанной библиотеки? Посмотреть, в качестве учебного пособия.amarao
29.12.2015 13:28Во-первых — классы. Будет класс, можно будет менять его свойства (например, менять «что значит этот смарт-атрибут» в зависимости от других полей или даже действий пользователя).
Первая попавшаяся не очень сложная библиотека: github.com/softvar/simplegist
Без тестов, правда.
Вот более сложная библиотека, с тестами: github.com/softvar/json2html
Если утрированно, то весь тест выглядит так:
import pytest import my_module def test_foo(): assert my_module.foo(1) == 1 def test_boo(): assert my_module.foo(2) == my_module.boo(3) def test_raise(): with pytest.raises(ValueError): my_module.foo(None) if __name__ == "__main__": import sys pytest.main("-v %s" % sys.argv[0])
Дальше там начинается вопрос «а как тесты писать» — и тут два слова для поиска: fixtures и mocks.kazenniy
29.12.2015 14:01+1Большое спасибо за наставление на путь истинный. Буду изучать, праздники длинные.
bosha
30.12.2015 10:28Обычно ещё возникают сомнения, что мол, тесты больше самой программы, и тянут больше зависимостей — это тоже норма. :)
Ещё на первых порах будет полезен coverage — он покажет какие части желательно тоже покрыть тестами.
И прикрутите setuptools, да добавьте в pypi. Обычно глобально никто не ставит/копирует библиотеки. Всё через virtualenv/pip.
Tutorial — peterdowns.com/posts/first-time-with-pypi.html
Пример — github.com/bosha/pypushalotkazenniy
30.12.2015 10:57+1С тестами для подобного софта тоже не очень понятно. Это же не программа, которая получила данные/перемолола/выдала данные.Тут для тестов нужна железка, диск. При этом операции записи на диск деструктивны.
bosha
30.12.2015 13:53Как выше написал amarao — для этого используют моки и фикстуры (mocks and fixtures). Есть какое-то определенное поведение диска, которое с помощью определенных библиотек можно эмулировать.Т.е. Вы импортируете эту библиотеку, программируете определенное поведение: при определенных запросах, определенные ответы.
Вот, например, мокинг HTTP — github.com/bosha/pypushalot/blob/master/tests/test_transport.py:
import unittest import httpretty import pushalot.exc from pushalot.transport import ( API_URL, HTTPTransport, ) class TestHTTPTransport(unittest.TestCase): @httpretty.activate def test_raises_exception_if_wrong_json_returned(self): with self.assertRaises(pushalot.exc.PushalotException): # httpretty будет возвращать по POST запросу # на API_URL body c "Whatever.." httpretty.register_uri( httpretty.POST, API_URL, body='Whatever..' ) transport = HTTPTransport() result = transport.send( Title='Test', )
Вообще, если Вы не планируете стать python разработчиком, то думаю, не стоит заморачиваться с этим сильно. Чтобы были хорошие тесты, надо писать код с мыслью в голове, что для всего этого их ещё писать, или вообще сначала писать тесты, а потом код (TDD). Однако, на pypi стоит библиотеку залить. Если что, на праздниках могу сделать pull request на github с необходимыми изменениями (их совсем чуть-чуть). :)kazenniy
30.12.2015 14:05+1Я уже активно все переписываю используя классы и исключения, устаканиваю интерфейс. Так что к этой версии исправления уже не актуальны. Классы оказались во многом удобнее функций. Развивать дальше буду уже новую версию. Надеюсь после праздников выложу на гитхаб.
amarao
31.12.2015 02:08Большая поправка: не нужно делать TDD, чтобы получить большую пользу от тестов. Особенно в питоне. Для питона иметь 100% покрытие кода тестами (даже дурацкими) имеет большой смысл хотя бы для того, чтобы не получать NameError из-за дурацких опечаток.
На практике я (не настоящий программист) обычно всё-таки сначала пишу, потом начинаю писать тесты, и в процессе рефакторю код в местах, где тестам тяжело жить.
В идеальном мире программист имеет полное ТЗ на момент написания программы и точно представляет себе интерфейс, который должен получиться.
В реальном мире оно постоянно меняется под давлением «вновь открывшихся обстоятельств» и куда оно вынесет — никто не знает.
vilgeforce
28.12.2015 19:10Рискну предположить что в «В ответ получаем сектор с информацией о диске:» вместо «сектор» должно быть «буффер».
ZyXI
У меня вопрос: а проверялась ли возможность параллельно запустить несколько экземпляров whdd? Зачем добавлять параллельность туда, где она уже есть?
kazenniy
С whdd у меня сразу как-то не заладилось. У меня в archlinux она падает с Segmentation fault при попытке запуска ata теста чтения. И собранная из AUR и готовый пакет от ubuntu. Думал посмотреть где падает, запустив ее в gdb, а там она работает без сбоев. Не хватило знаний разобраться в чем проблема.