经验 299 分贝 0 家园分 1578 在线时间: 1232 小时 最后登录: 2024-11-29 帖子: 173 精华: 0 注册时间: 2011-3-30 UID: 652083
注册:2011-3-30 10
发表于 2012-7-18 21:52:33
| 显示全部楼层
#coding=utf-8
import struct
import collections
# global definition
# base = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, A, B, C, D, E, F]
base = [str(x) for x in range(10)] + [ chr(x) for x in range(ord('A'),ord('A')+6)]
def dec2bin(num):
mid = [0,0,0,0,0,0,0,0]
i = 0
while True:
if num == 0: break
num,rem = divmod(num, 2)
mid = base[rem]
i = i + 1
return ''.join([str(x) for x in mid[::-1]])
def bin2dec(string_num):
return str(int(string_num, 2))
def isodd(string_num):
if int(string_num)%2:
return True
else:
return False
def Find(type,File,cur):
RecordIdentifier ={132:lambda:FixPart(type,File,cur)}
return RecordIdentifier[type]()
RecordSequence = {'0':'not used','1':'single','2':'first intermediate','3':'intermediate','5':'final'}
ChargeStatus = {'0':'undefined','1':'charge','2':'no charge','3':'charge transfer to B-side'}
RecordOwnerType = {'0':'undefined','142':'Calling Party Number','168':'Called Party Number','170':'Redirecting Number','172':'System Provided Number','179':'Service Subscriber Number'}
def FixPart(type,file,cur):
RecordOwnerTypeCode = 'H5B'
file.seek(cur)
FixPartDict = collections.OrderedDict()
RecordFlag = collections.OrderedDict()
x= struct.unpack(RecordOwnerTypeCode,file.read(7))
FixPartDict['RecordIdentifier'] = type
FixPartDict['RecordLength'] = x[0]
RecordFlagByte1 = dec2bin(x[1])
if RecordFlagByte1[0] == '1':
RecordFlag['flag18'] = 'Record Owner Type in Fixed Part'
else:
RecordFlag['flag18'] = 'Record Owner DN in Fixed Part'
if RecordFlagByte1[1] == '1':
RecordFlag['flag17'] = 'AMA FAU'
else:
RecordFlag['flag17'] = 'No AMA FAU'
if RecordFlagByte1[2] == '1':
RecordFlag['flag16'] = 'Facility Input by Subscriber'
else:
RecordFlag['flag16'] = 'No Facility Input by Subscriber'
if RecordFlagByte1[3] == '1':
RecordFlag['flag15'] = 'Facility Usage'
else:
RecordFlag['flag15'] = 'No Facility Usage'
if RecordFlagByte1[4] == '1':
RecordFlag['flag14'] = 'Connection'
else:
RecordFlag['flag14'] = 'No Connection'
if RecordFlagByte1[5] == '1':
RecordFlag['flag13'] = 'AMA Connection / FAIS'
else:
RecordFlag['flag13'] = 'No AMA Connection / FAIS'
if RecordFlagByte1[6] == '1':
RecordFlag['flag12'] = 'IACAMA'
else:
RecordFlag['flag12'] = 'No IACAMA'
if RecordFlagByte1[7] == '1':
RecordFlag['flag11'] = 'Detailed Billing'
else:
RecordFlag['flag11'] = 'No Detailed Billing'
RecordFlagByte2 = dec2bin(x[1])
if RecordFlagByte2[3] == '1':
RecordFlag['flag25'] = 'ISDN Subscriber'
else:
RecordFlag['flag25'] = 'No ISDN Subscriber'
if RecordFlagByte2[4] == '1':
RecordFlag['flag24'] = 'Analog Subscriber'
else:
RecordFlag['flag24'] = 'No Analog Subscriber'
if RecordFlagByte2[5] == '1':
RecordFlag['flag23'] = 'Ticket for LGC/TPC/CT Controller'
else:
RecordFlag['flag23'] = 'No Ticket for LGC/TPC/CT Controller'
if RecordFlagByte2[6] == '1':
RecordFlag['flag22'] = 'TFS Subscriber'
else:
RecordFlag['flag22'] = 'No TFS Subscriber'
if RecordFlagByte2[7] == '1':
RecordFlag['flag21'] = 'Centrex Intercom'
else:
RecordFlag['flag21'] = 'No Centrex Intercom'
RecordFlagByte3 = dec2bin(x[2])
if RecordFlagByte3[0] == '1':
RecordFlag['flag33'] = 'No Answer'
else:
RecordFlag['flag33'] = 'Answer'
if RecordFlagByte3[1] == '1':
RecordFlag['flag34'] = 'AMA Immediate Output'
else:
RecordFlag['flag34'] = 'No AMA Immediate Output'
if RecordFlagByte3[3] == '1':
RecordFlag['flag35'] = 'AMA Immediate Output Successful'
else:
RecordFlag['flag35'] = 'AMA Immediate Output Not Successful'
if RecordFlagByte3[4] == '1':
RecordFlag['flag37'] = 'Sequence Number in package 150 (Audit Trail)'
else:
RecordFlag['flag37'] = 'No Sequence Number in package 150 (Audit Trail)'
if RecordFlagByte3[5] == '1':
RecordFlag['flag38'] = 'Checksum in package 150 (Audit Trail)'
else:
RecordFlag['flag38'] = 'No Checksum in package 150 (Audit Trail)'
FixPartDict['RecordFlag'] = RecordFlag
FixPartDict['RecordSequence'] = RecordSequence.get(bin2dec(dec2bin(x[4])[0:4]),'reserved')
FixPartDict['ChargeStatus'] = ChargeStatus.get(bin2dec(dec2bin(x[4])[4:8]),'reserved')
if RecordFlag['flag18'] == 'Record Owner Type in Fixed Part':
FixPartDict['RecordOwnerType'] = RecordOwnerType.get(x[5],'reserved')
else:
FixPartDict['LACLength'] = bin2dec(dec2bin(x[5])[0:2])
FixPartDict['OwnerIDLength'] = bin2dec(dec2bin(x[5])[2:8])
#if isodd(bin2dec(dec2bin(x[5])[2:8])):
# readbyte = (int(bin2dec(dec2bin(x[5])[2:8]))+1)/2
#else:
# readbyte = int(bin2dec(dec2bin(x[5])[2:8]))/2
#CodeType = str(readbyte)+'B'
#print readbyte,CodeType
#y= struct.unpack(CodeType,file.read(readbyte))
#step = 0
#while step <= int(bin2dec(dec2bin(x[5])[0:2])):
# FixPartDict['LAC'] += y[step]
#FixPartDict['LAC'] =
#print y
return FixPartDict
f = open('120701A0.AMA', 'rb')
temp = f.read(1)
v, = struct.unpack('B', temp)
cur = f.tell()
print cur
for k,v in Find(v,f,cur).items():
print k,v
f.close()
结果:
1
RecordIdentifier 132
RecordLength 116
RecordFlag OrderedDict([('flag18', 'Record Owner DN in Fixed Part'), ('flag17', 'No AMA FAU'), ('flag16', 'No Facility Input by Subscriber'), ('flag15', 'No Facility Usage'), ('flag14', 'Connection'), ('flag13', 'No AMA Connection / FAIS'), ('flag12', 'IACAMA'), ('flag11', 'No Detailed Billing'), ('flag25', 'No ISDN Subscriber'), ('flag24', 'Analog Subscriber'), ('flag23', 'No Ticket for LGC/TPC/CT Controller'), ('flag22', 'TFS Subscriber'), ('flag21', 'No Centrex Intercom'), ('flag33', 'Answer'), ('flag34', 'No AMA Immediate Output'), ('flag35', 'AMA Immediate Output Not Successful'), ('flag37', 'Sequence Number in package 150 (Audit Trail)'), ('flag38', 'No Checksum in package 150 (Audit Trail)')])
RecordSequence single
ChargeStatus charge
LACLength 0
OwnerIDLength 7