Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removed dependecy over third party project
  • Loading branch information
pylessard committed Aug 3, 2017
commit fba09e6dc3eb1b8967dcfe57eb4c7868a3586b58
43 changes: 2 additions & 41 deletions Doc/library/socket.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ Constants
.. data:: CAN_ISOTP

CAN_ISOTP, in the CAN protocol family, is the ISO-TP (ISO 15765-2) protocol.
ISO-TP constants, documented in the Linux documentation, are also
defined in the socket module.
ISO-TP constants, documented in the Linux documentation.

Availability: Linux >= 2.6.25

Expand Down Expand Up @@ -1688,7 +1687,7 @@ After binding (:const:`CAN_RAW`) or connecting (:const:`CAN_BCM`) the socket, yo
can use the :meth:`socket.send`, and the :meth:`socket.recv` operations (and
their counterparts) on the socket object as usual.

This example might require special privileges::
This last example might require special privileges::

import socket
import struct
Expand Down Expand Up @@ -1746,44 +1745,6 @@ There is a :mod:`socket` flag to set, in order to prevent this,
the :data:`SO_REUSEADDR` flag tells the kernel to reuse a local socket in
``TIME_WAIT`` state, without waiting for its natural timeout to expire.

The last example shows how to use the socket interface to communicate to a CAN
network using the ISO-TP protocol. Altough Linux defines the required interface
in <linux/can.h>, it does not support a native implementation of the protocol.
The appropriate module must be loaded in the kernel.

modprobe can-isotp

If the system does not support ISO-TP protocol, creating the socket might return::

OSError: [Errno 93] Protocol not supported

This example might require special privileges::

import socket
import struct

# create a iso-tp socket and bind it to the 'vcan0' interface, with rx_addr = 0x123 and tx_addr=0x456
s1 = socket.socket(socket.AF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
s2 = socket.socket(socket.AF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)

# set socket some options before binding to the interface.
stmin = 0x32 # Minimum Separation Time : 50 milliseconds between each CAN frame. See ISO 15765-2 for acceptables values.
bs = 3 # Block Size : sends a FlowControl frame every 3 Consecutive Frames
wftmax = 5 # Maximum Wait Frame Transmission - Maximum number of consecutive Wait Frame.
s2.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_RECV_FC, struct.pack("=BBB", bs, stmin, wftmax))

s1.bind(('vcan0',0x123, 0x456))
s2.bind(('vcan0',0x456, 0x123))

data = b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10"
s1.send(data)
s2.recv(len(data))

# The following transmission will occur
# vcan0 456 [8] 10 0A 01 02 03 04 05 06
# vcan0 123 [3] 30 03 32
# vcan0 456 [5] 21 07 08 09 10


.. seealso::

Expand Down
193 changes: 7 additions & 186 deletions Lib/test/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,50 +365,6 @@ def clientTearDown(self):
self.cli = None
ThreadableTest.clientTearDown(self)

class ThreadableISOTPTest(unittest.TestCase, ThreadableTest):
"""To be able to run this test, a `vcan0` CAN interface can be created with
the following commands:
# modprobe vcan
# ip link add dev vcan0 type vcan
# ifconfig vcan0 up
"""
interface = 'vcan0'
addr_offset=1
address_lock = threading.Lock()
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self,*args, **kwargs)
ThreadableTest.__init__(self)
self.test_opts = {}
(self.cli_addr, self.srv_addr) = ThreadableISOTPTest.getAddressPair()

@classmethod
def getAddressPair(cls):
cls.address_lock.acquire_lock()
latched_offset = cls.addr_offset
cls.addr_offset += 2
if cls.addr_offset > 0x7FE: # 11 bit standard CAN identifier (ISO-11898-2)
cls.addr_offset = 1
cls.address_lock.release_lock()
return (latched_offset, latched_offset+1)

def clientSetUp(self):
try:
self.opts = self.test_opts[self._testMethodName] if self._testMethodName in self.test_opts else {}
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
if 'cli' in self.opts:
for opt_const in self.opts['cli']:
self.serv.setsockopt(socket.SOL_CAN_ISOTP, opt_const, self.opts['cli'][opt_const])
self.cli.bind((self.interface, self.srv_addr, self.cli_addr))
except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass

def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)

class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):

def __init__(self, methodName='runTest'):
Expand Down Expand Up @@ -1766,39 +1722,14 @@ def testBCM(self):


@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
class BasicISOTPTest(unittest.TestCase):
class ISOTPTest(unittest.TestCase):

def testCrucialConstants(self):
socket.AF_CAN
socket.PF_CAN
socket.CAN_ISOTP
socket.SOCK_DGRAM

@unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
'socket.CAN_ISOTP required for this test.')
def testISOTPConstants(self):
socket.CAN_ISOTP

socket.SOL_CAN_ISOTP # Socket Option Level for ISOTP protocol (AF_CAN + 6)
socket.CAN_ISOTP_OPTS # sets struct can_isotp_options within the driver.
socket.CAN_ISOTP_RECV_FC # set flow control options for receiver
socket.CAN_ISOTP_TX_STMIN # override sepration time received in flow control frame
socket.CAN_ISOTP_RX_STMIN # force to ignore messages received in an interval smaller than this value
socket.CAN_ISOTP_LL_OPTS # sets struct can_isotp_ll_options within the driver

# Options flags
socket.CAN_ISOTP_LISTEN_MODE # listen only (do not send FC)
socket.CAN_ISOTP_EXTEND_ADDR # enable extended addressing
socket.CAN_ISOTP_TX_PADDING # enable CAN frame padding tx path
socket.CAN_ISOTP_RX_PADDING # enable CAN frame padding rx path
socket.CAN_ISOTP_CHK_PAD_LEN # check received CAN frame padding
socket.CAN_ISOTP_CHK_PAD_DATA # check received CAN frame padding
socket.CAN_ISOTP_HALF_DUPLEX # half duplex error state handling
socket.CAN_ISOTP_FORCE_TXSTMIN # ignore stmin from received FC
socket.CAN_ISOTP_FORCE_RXSTMIN # ignore CFs depending on rx stmin
socket.CAN_ISOTP_RX_EXT_ADDR # different rx extended addressing


def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass
Expand All @@ -1815,125 +1746,15 @@ def testTooLongInterfaceName(self):
self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,1,2))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use

with self.assertRaisesRegex(OSError, 'interface name too long'):
    s.bind(('x' * 1024, 1, 2))

to make it more readable.


@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_OPTS"),
'socket.CAN_ISOTP_OPTS is required for this test')
def testSetOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
flags = 0x01234567
frame_txtime = 0x89ABCDEF
ext_address = 0x11
txpad_content = 0x21
rxpad_content = 0x31
rx_ext_address = ext_address
opts = struct.pack("=LLBBBB",flags, frame_txtime, ext_address, txpad_content, rxpad_content, rx_ext_address)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_OPTS, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_OPTS, len(opts))
self.assertEqual(readopts,opts)

@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_RECV_FC"),
'socket.CAN_ISOTP_RECV_FC is required for this test')
def testSetFCOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
bs = 0x12
stmin = 0x22
wftmax = 0x32
opts = struct.pack("=BBB",bs, stmin, wftmax)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_RECV_FC, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_RECV_FC, len(opts))
self.assertEqual(readopts,opts)

@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_LL_OPTS"),
'socket.CAN_ISOTP_LL_OPTS is required for this test')
def testSetLLOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
mtu = 16
tx_dl = 8
tx_flags = 0x13
opts = struct.pack("=BBB",mtu, tx_dl, tx_flags)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_LL_OPTS, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_LL_OPTS, len(opts))
self.assertEqual(readopts,opts)


@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_RECV_FC"),
'socket.CAN_ISOTP_RECV_FC is required for this test')
class ISOTPTest(ThreadableISOTPTest):

def __init__(self, methodName='runTest', *args, **kwargs):
ThreadableISOTPTest.__init__(self,methodName, *args, **kwargs)

def setUp(self, *args, **kwargs):
self.test_opts['testTiming'] = {
'cli' : {
socket.CAN_ISOTP_RECV_FC : struct.pack("=BBB",10, 0x7F, 10) # bs, stmin, wftmax
},
'serv' : {}
}

self.serv = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
self.opts = self.test_opts[self._testMethodName] if self._testMethodName in self.test_opts else {}
def testBind(self):
try:
if 'serv' in self.opts:
for opt_const in self.opts['serv']:
self.serv.setsockopt(socket.SOL_CAN_ISOTP, opt_const, self.opts['serv'][opt_const])
self.serv.bind((self.interface, self.cli_addr, self.srv_addr))
except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)

def tearDown(self):
self.serv.close()
self.serv=None

@classmethod
def makeRandomData(cls, n):
data = [random.randint(0,0xFF) for i in range(0,n)]
return struct.pack("B"*len(data), *data)
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
s.bind(("vcan0",1,2))

def _testFullFrame(self):
self.bindata = self.makeRandomData(4095)
self.cli.send(self.bindata)
except OSError as e:
if e.errno not in [errno.ENODEV, errno.EPROTONOSUPPORT]:
raise

def testFullFrame(self):
time.sleep(0.1)
bindata = self.serv.recv(len(self.bindata))
self.assertEqual(bindata, self.bindata)

def _testTiming(self):
self.bindata2 = self.makeRandomData(512)
self.tic = time.time()
self.cli.send(self.bindata2)

def testTiming(self):
"""
The main goal of this test is to validate that the driver correctly receive options via setsockopt.
We are not trying to validate the proper functioning of the driver itself.
"""
time.sleep(0.1)
bindata2 = self.serv.recv(len(self.bindata2))
self.toc = time.time()
self.assertEqual(bindata2, self.bindata2)
measured_time = math.ceil(self.toc-self.tic)
min_time = math.floor(self.calcFrameCnt(len(self.bindata2))*0.127)
self.assertGreaterEqual(measured_time, min_time)

def calcFrameCnt(self, size):
if size < 1 or size > 4095:
raise ValueError('ISOTP frame size must be between 1 and 4095')

if (size < 8):
return 1
else:
return int(1 + math.ceil((float(size) - 6.0) / 7.0))


@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
Expand Down
6 changes: 3 additions & 3 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,7 @@ John Lenton
Kostyantyn Leschenko
Benno Leslie
Christopher Tur Lesniewski-Laas
Pier-Yves Lessard
Alain Leufroy
Mark Levinson
Mark Levitt
Expand Down Expand Up @@ -1356,7 +1357,7 @@ Cheryl Sabella
Patrick Sabin
Sébastien Sablé
Amit Saha
Suman Saha
Suman Shahaf
Hajime Saitou
George Sakkis
Victor Salgado
Expand Down Expand Up @@ -1765,5 +1766,4 @@ Tarek Ziadé
Jelle Zijlstra
Gennadiy Zlobin
Doug Zongker
Peter Åstrand
Pier-Yves Lessard
Peter Åstrand
21 changes: 2 additions & 19 deletions Modules/socketmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -7042,6 +7042,8 @@ PyInit__socket(void)
PyModule_AddIntMacro(m, CAN_SFF_MASK);
PyModule_AddIntMacro(m, CAN_EFF_MASK);
PyModule_AddIntMacro(m, CAN_ERR_MASK);

PyModule_AddIntMacro(m, CAN_ISOTP);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a #ifdef CAN_ISOTP.

#endif
#ifdef HAVE_LINUX_CAN_RAW_H
PyModule_AddIntMacro(m, CAN_RAW_FILTER);
Expand All @@ -7067,25 +7069,6 @@ PyInit__socket(void)
PyModule_AddIntConstant(m, "CAN_BCM_RX_TIMEOUT", RX_TIMEOUT);
PyModule_AddIntConstant(m, "CAN_BCM_RX_CHANGED", RX_CHANGED);
#endif
#ifdef HAVE_LINUX_CAN_ISOTP_H
PyModule_AddIntMacro(m, SOL_CAN_ISOTP);
PyModule_AddIntMacro(m, CAN_ISOTP);
PyModule_AddIntMacro(m, CAN_ISOTP_OPTS);
PyModule_AddIntMacro(m, CAN_ISOTP_RECV_FC);
PyModule_AddIntMacro(m, CAN_ISOTP_TX_STMIN);
PyModule_AddIntMacro(m, CAN_ISOTP_RX_STMIN);
PyModule_AddIntMacro(m, CAN_ISOTP_LL_OPTS);
PyModule_AddIntMacro(m, CAN_ISOTP_LISTEN_MODE);
PyModule_AddIntMacro(m, CAN_ISOTP_EXTEND_ADDR);
PyModule_AddIntMacro(m, CAN_ISOTP_TX_PADDING);
PyModule_AddIntMacro(m, CAN_ISOTP_RX_PADDING);
PyModule_AddIntMacro(m, CAN_ISOTP_CHK_PAD_LEN);
PyModule_AddIntMacro(m, CAN_ISOTP_CHK_PAD_DATA);
PyModule_AddIntMacro(m, CAN_ISOTP_HALF_DUPLEX);
PyModule_AddIntMacro(m, CAN_ISOTP_FORCE_TXSTMIN);
PyModule_AddIntMacro(m, CAN_ISOTP_FORCE_RXSTMIN);
PyModule_AddIntMacro(m, CAN_ISOTP_RX_EXT_ADDR);
#endif
#ifdef SOL_RDS
PyModule_AddIntMacro(m, SOL_RDS);
#endif
Expand Down
4 changes: 0 additions & 4 deletions Modules/socketmodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ typedef int socklen_t;
#include <linux/can/bcm.h>
#endif

#ifdef HAVE_LINUX_CAN_ISOTP_H
#include <linux/can/isotp.h>
#endif

#ifdef HAVE_SYS_SYS_DOMAIN_H
#include <sys/sys_domain.h>
#endif
Expand Down
4 changes: 2 additions & 2 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -8127,8 +8127,8 @@ fi
done


# On Linux, can.h, can/raw.h, can/isotp.h require sys/socket.h
for ac_header in linux/can.h linux/can/raw.h linux/can/bcm.h linux/can/isotp.h
# On Linux, can.h and can/raw.h require sys/socket.h
for ac_header in linux/can.h linux/can/raw.h linux/can/bcm.h
do :
as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
ac_fn_c_check_header_compile "$LINENO" "$ac_header" "$as_ac_Header" "
Expand Down
4 changes: 2 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -2120,8 +2120,8 @@ AC_CHECK_HEADERS(linux/netlink.h,,,[
#endif
])

# On Linux, can.h, can/raw.h, can/isotp.h require sys/socket.h
AC_CHECK_HEADERS(linux/can.h linux/can/raw.h linux/can/bcm.h linux/can/isotp.h,,,[
# On Linux, can.h and can/raw.h require sys/socket.h
AC_CHECK_HEADERS(linux/can.h linux/can/raw.h linux/can/bcm.h,,,[
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
Expand Down
3 changes: 0 additions & 3 deletions pyconfig.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,6 @@
/* Define to 1 if you have the <linux/can.h> header file. */
#undef HAVE_LINUX_CAN_H

/* Define to 1 if you have the <linux/can/isotp.h> header file. */
#undef HAVE_LINUX_CAN_ISOTP_H

/* Define if compiling using Linux 3.6 or later. */
#undef HAVE_LINUX_CAN_RAW_FD_FRAMES

Expand Down