#!/usr/bin/env python # Copyright (c) 2007, Bjoern A. Zeeb. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $Id: icmpv6test.py,v 1.1 2009-02-12 20:05:15 bz Exp $ # # Author: Bjoern A. Zeeb # # Description: This module sends icmpv6 echo request ("ping") packets # optionally with flow label set to the payload length including the # ipv6 header. # # Remarks: this is NOT intended to be a full ping6 program or follow # "all the rules". It is considered to be a prototype. # import sys import signal from pcs.packets.ethernet import * from pcs.packets.ipv6 import * from pcs.packets.icmpv6 import * from pcs import * class TimeoutException(Exception): """TimeoutException""" pass class ICMPv6Ping: def timeout_handler(self, sig, frame): raise TimeoutException() def main(self, *argv): from optparse import OptionParser from time import time parser = OptionParser() parser.add_option("-F", "--flowislength", dest="flowlength", action="store_true", default=False, help="Set flow label to payload length incl. IPv6 header.") parser.add_option("-c", "--count", action="store", type="int", dest="count", help="Set flow label to payload length incl. IPv6 header.") parser.add_option("-d", "--dstaddr6", action="store", type="string", dest="dstaddr6", help="IPv6 address to ping.") # XXX Could not find a routing socket library to automagically set # XXX source and destination addresses. Let getopt handle it for now. parser.add_option("-s", "--srcaddr6", action="store", type="string", dest="srcaddr6", help="IPv6 source address to use.") parser.add_option("-D", "--dstmac", action="store", type="string", dest="dstmac", help="Ethernet destination address to use.") parser.add_option("-S", "--srcmac", action="store", type="string", dest="srcmac", help="Ethernet source address to use.") parser.add_option("-i", "--iface", action="store", type="string", dest="iface", help="Ethernet(like) interface to use.") (options, args) = parser.parse_args(*argv) if options.dstaddr6 == None: parser.error("Missing mandatory argument dstaddr6.") # XXX Could not find a routing socket library to automagically set # XXX source and destination addresses. Let getopt handle it for now. if options.srcaddr6 == None: parser.error("Missing mandatory argument srcaddr6.") if options.dstmac == None: parser.error("Missing mandatory argument dstmac.") if options.srcmac == None: parser.error("Missing mandatory argument srcmac.") if options.iface == None: parser.error("Missing mandatory argument iface.") # Setup the IPv6 header (we will overwrite the flow field later) ip6 = ipv6() ip6.version = 6 ip6.traffic_class = 0 ip6.flow = 0 ip6.length = 0 ip6.next_header = IPPROTO_ICMPV6 ip6.hop = 0x40 # XXX Could not find a routing socket library to automagically set # XXX source and destination addresses. Let getopt handle it for now. ip6.src = inet_pton(AF_INET6, options.srcaddr6) ip6.dst = inet_pton(AF_INET6, options.dstaddr6) # Build ICMPv6 Echo Request payload. icmp6 = icmpv6(ICMP6_ECHO_REQUEST) icmp6.code = 0 icmp6.checksum = 0 icmp6.id = 0xdead icmp6.sequence = 0xbeef # Update IPv6 Header Payload length. ip6.length = len(icmp6.getbytes()) if (options.flowlength != 0): ip6.flow = len(ip6.getbytes()) + ip6.length # Generate Ethernet Header. ether = ethernet() ether.type = ETHERTYPE_IPV6 # XXX Could not find a routing socket library to automagically set # XXX source and destination addresses. Let getopt handle it for now. ether.dst = ether_atob(options.dstmac) ether.src = ether_atob(options.srcmac) # Open descriptors to write to and read from. # XXX We may need/want to give an interface name. output = PcapConnector(options.iface) input = PcapConnector(options.iface) #input.setfilter("icmp6[icmptype] == icmp-echoreply") input.setfilter("icmp6"); # Update ICMPv6 checksum. # XXX PCS bug [ 1642560 ] "computed checksums need masking?" icmp6.checksum = icmp6.cksum(ip6) & 0xffff # "The Packet" packet = Chain([ether, ip6, icmp6]) # Send, wait for reply or timeout. count = 0 while (options.count == None or count < options.count): # Send it out. out = output.write(packet.bytes, len(packet.bytes)) if out == len(packet.bytes): count += 1 print "Ping." t1 = time(); t2 = time(); while (t2 - t1) < 1: try: savesig = signal.signal(signal.SIGALRM, self.timeout_handler) signal.alarm(1) try: reply = input.read() finally: signal.alarm(0) signal.signal(signal.SIGALRM, savesig) except KeyboardInterrupt: sys.exit("bye bye..") except TimeoutException: print "Timeout." break e = input.unpack(reply, input.dlink, input.dloff) #print e if e.type == ETHERTYPE_IPV6: v6 = ipv6(reply[14:len(reply)]) #print v6 if v6.next_header == IPPROTO_ICMPV6: i6 = icmpv6(reply[54:55], reply[54:len(reply)]) #print i6 if i6.type == ICMP6_ECHO_REPLY: i6 = icmpv6(ICMP6_ECHO_REPLY, reply[54:len(reply)]) if i6.id == 0xdead and i6.sequence == 0xbeef: #print Chain([e, v6, i6]) print "Pong." break #print Chain([e, v6, i6]) t2 = time() if (t2 - t1) > 1: print "Timeout." if __name__ == '__main__': i6ping = ICMPv6Ping() i6ping.main() # end