|
| 1 | +import time |
| 2 | +import pyb |
| 3 | +import _onewire as _ow |
| 4 | + |
| 5 | +class OneWire: |
| 6 | + CMD_SEARCHROM = const(0xf0) |
| 7 | + CMD_READROM = const(0x33) |
| 8 | + CMD_MATCHROM = const(0x55) |
| 9 | + CMD_SKIPROM = const(0xcc) |
| 10 | + |
| 11 | + def __init__(self, pin): |
| 12 | + self.pin = pin |
| 13 | + self.pin.init(pin.OPEN_DRAIN, pin.PULL_NONE) |
| 14 | + |
| 15 | + def reset(self): |
| 16 | + return _ow.reset(self.pin) |
| 17 | + |
| 18 | + def read_bit(self): |
| 19 | + return _ow.readbit(self.pin) |
| 20 | + |
| 21 | + def read_byte(self): |
| 22 | + return _ow.readbyte(self.pin) |
| 23 | + |
| 24 | + def read_bytes(self, count): |
| 25 | + buf = bytearray(count) |
| 26 | + for i in range(count): |
| 27 | + buf[i] = _ow.readbyte(self.pin) |
| 28 | + return buf |
| 29 | + |
| 30 | + def write_bit(self, value): |
| 31 | + return _ow.writebit(self.pin, value) |
| 32 | + |
| 33 | + def write_byte(self, value): |
| 34 | + return _ow.writebyte(self.pin, value) |
| 35 | + |
| 36 | + def write_bytes(self, buf): |
| 37 | + for b in buf: |
| 38 | + _ow.writebyte(self.pin, b) |
| 39 | + |
| 40 | + def select_rom(self, rom): |
| 41 | + self.reset() |
| 42 | + self.write_byte(CMD_MATCHROM) |
| 43 | + self.write_bytes(rom) |
| 44 | + |
| 45 | + def scan(self): |
| 46 | + devices = [] |
| 47 | + diff = 65 |
| 48 | + rom = False |
| 49 | + for i in range(0xff): |
| 50 | + rom, diff = self._search_rom(rom, diff) |
| 51 | + if rom: |
| 52 | + devices += [rom] |
| 53 | + if diff == 0: |
| 54 | + break |
| 55 | + return devices |
| 56 | + |
| 57 | + def _search_rom(self, l_rom, diff): |
| 58 | + if not self.reset(): |
| 59 | + return None, 0 |
| 60 | + self.write_byte(CMD_SEARCHROM) |
| 61 | + if not l_rom: |
| 62 | + l_rom = bytearray(8) |
| 63 | + rom = bytearray(8) |
| 64 | + next_diff = 0 |
| 65 | + i = 64 |
| 66 | + for byte in range(8): |
| 67 | + r_b = 0 |
| 68 | + for bit in range(8): |
| 69 | + b = self.read_bit() |
| 70 | + if self.read_bit(): |
| 71 | + if b: # there are no devices or there is an error on the bus |
| 72 | + return None, 0 |
| 73 | + else: |
| 74 | + if not b: # collision, two devices with different bit meaning |
| 75 | + if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i): |
| 76 | + b = 1 |
| 77 | + next_diff = i |
| 78 | + self.write_bit(b) |
| 79 | + if b: |
| 80 | + r_b |= 1 << bit |
| 81 | + i -= 1 |
| 82 | + rom[byte] = r_b |
| 83 | + return rom, next_diff |
| 84 | + |
| 85 | + def crc8(self, data): |
| 86 | + return _ow.crc8(data) |
| 87 | + |
| 88 | +class DS18B20: |
| 89 | + THERM_CMD_CONVERTTEMP = const(0x44) |
| 90 | + THERM_CMD_RSCRATCHPAD = const(0xbe) |
| 91 | + |
| 92 | + def __init__(self, onewire): |
| 93 | + self.ow = onewire |
| 94 | + self.roms = [] |
| 95 | + |
| 96 | + def scan(self): |
| 97 | + self.roms = [] |
| 98 | + for rom in self.ow.scan(): |
| 99 | + if rom[0] == 0x28: |
| 100 | + self.roms += [rom] |
| 101 | + return self.roms |
| 102 | + |
| 103 | + def start_measure(self): |
| 104 | + if not self.ow.reset(): |
| 105 | + return False |
| 106 | + self.ow.write_byte(CMD_SKIPROM) |
| 107 | + self.ow.write_byte(THERM_CMD_CONVERTTEMP) |
| 108 | + return True |
| 109 | + |
| 110 | + def get_temp(self, rom): |
| 111 | + if not self.ow.reset(): |
| 112 | + return None |
| 113 | + |
| 114 | + self.ow.select_rom(rom) |
| 115 | + self.ow.write_byte(THERM_CMD_RSCRATCHPAD) |
| 116 | + |
| 117 | + buf = self.ow.read_bytes(9) |
| 118 | + if self.ow.crc8(buf): |
| 119 | + return None |
| 120 | + |
| 121 | + return self._convert_temp(buf) |
| 122 | + |
| 123 | + def _convert_temp(self, data): |
| 124 | + temp_lsb = data[0] |
| 125 | + temp_msb = data[1] |
| 126 | + return (temp_msb << 8 | temp_lsb) / 16 |
| 127 | + |
| 128 | +# connect 1-wire temp sensors to GPIO12 for this test |
| 129 | +def test(): |
| 130 | + dat = pyb.Pin(12) |
| 131 | + ow = OneWire(dat) |
| 132 | + |
| 133 | + ds = DS18B20(ow) |
| 134 | + roms = ow.scan() |
| 135 | + print('found devices:', roms) |
| 136 | + |
| 137 | + for i in range(4): |
| 138 | + print('temperatures:', end=' ') |
| 139 | + ds.start_measure() |
| 140 | + time.sleep_ms(750) |
| 141 | + for rom in roms: |
| 142 | + print(ds.get_temp(rom), end=' ') |
| 143 | + print() |
| 144 | + |
| 145 | +#pyb.freq(80000000) |
| 146 | +#pyb.freq(160000000) |
| 147 | +test() |
0 commit comments