#!/usr/bin/python # Magic Module # Magic - Python module to classify like the 'file' command using a 'magic' file # See: 'man 4 magic' and 'man file' # # Copyright (C) 2002 Thomas Mangin # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import re import string import convert # Need to have a checksum on the cache and source file to update at object creation # Could use circle safe_pickle (see speed performance impact) # This program take some input file, we should check the permission on those files .. # Some code cleanup and better error catching are needed # Implement the missing part of the magic file definition class Failed (Exception): pass class Magic: data_size = { 'byte':1, 'short':2, 'long':4, 'string':1, 'pstring':1, 'date': 4, 'ldate': 4 } type_size = { 'b':1, 'B':1, 's':2, 'S':2, 'l':4, 'L':5 } se_offset_abs="^\(([0\\\][xX][\dA-Fa-f]+|[0\\\][0-7]*|\d+)(\.[bslBSL])*\)" se_offset_add="^\(([0\\\][xX][\dA-Fa-f]+|[0\\\][0-7]*|\d+)(\.[bslBSL])*([-+])([0\\\][xX][\dA-Fa-f]+|[0\\\][0-7]*|\d+)\)" def __init__ (self,filename,cachename): self.entries = 0 self._level = {} self._direct = {} self._offset_relatif = {} self._offset_type = {} self._offset_delta = {} self._endian = {} self._kind = {} self._oper = {} self._mask = {} self._test = {} self._data = {} self._length = {} self._mime = {} import os if not os.path.isfile(cachename): self.read_magic(filename) self.write_cache(cachename) self.read_cache(cachename) # read_magic subfunction def __split (self,line): result = '' split = line.split() again = 1 # Make sure the split function did not split too much while again: again = 0 pos = 0 part = [] top = len(split) while pos < top: if convert.is_final_dash(split[pos]): result = split[pos] + ' ' index = line.find(result) if index != -1: char = line[index+len(result)] if char != ' ' and char != '\t': pos += 1 result += split[pos] again = 1 else: result = split[pos] part.append(result) pos += 1 split = part return part def __level(self,text): return string.count(text,'>') def __strip_start (self,char,text): if text[0] == char: return text[1:] return text def __direct_offset(self,text): if text[0] == '(' and text[-1] == ')': return 0 return 1 def __offset (self,text): direct = self.__direct_offset(text) offset_type = 'l' offset_delta = 0L offset_relatif = 0L # Get the offset information if direct: offset_delta = convert.convert(text) else: match_abs = re.compile(self.se_offset_abs).match(text) match_add = re.compile(self.se_offset_add).match(text) if match_abs: offset_relatif = convert.convert(match_abs.group(1)) if match_abs.group(2) != None: offset_type = match_abs.group(2)[1] elif match_add: offset_relatif = convert.convert(match_add.group(1)) if match_add.group(2) != None: offset_type = match_add.group(2)[1] if match_add.group(3) == '-': offset_delta = 0L - match_add.group(4) else: offset_delta = convert.convert(match_add.group(4)) return (direct,offset_type,offset_delta,offset_relatif) def __oper_mask (self,text): type_mask_and = string.split(text,'&') type_mask_or = string.split(text,'^') if len(type_mask_and) > 1: oper = '&' mask = convert.convert(type_mask_and[1]) rest = type_mask_and[0] return (oper,mask,rest) elif len(type_mask_or) > 1: oper = '^' mask = convert.convert(type_mask_or[1]) rest = type_mask_or[0] return (oper,mask,rest) else: return ('',0L,text) def __endian (self,full_type): if full_type.startswith('be'): return 'big' elif full_type.startswith('le'): return 'little' return 'local' def __kind (self,full_type,endian): if endian == 'local': kind = full_type else: kind = full_type[2:] # XXX: string case and white space compaction option not implemented # XXX: Not very used ... if kind.startswith("string/"): NOT_DONE_YET=kind[7:] kind="string" # XXX: No idea what are those extension if kind.startswith("ldate-"): NOT_DONE_YET=kind[6:] kind="ldate" return kind def __test_result (self,test_result): if test_result[0] in "=><&!^": test = test_result[0] result = test_result[1:] return (test,result) elif test_result == 'x': test = 'x' result = 'x' return (test,result) else: test = '=' result = test_result return (test,result) def __string (self,list): r = [] for s in list: if type(s) is str: if s == "\\0": r.append(chr(0)) else: r.append(s) elif s <10: r.append(ord(str(s))) else: r.append(s) return r def __data (self,kind,result): pos = 0 data = list('') prev = '' while pos < len(result): if convert.is_c_escape(result[pos:]): # \0 is not a number it is the null string if result[pos+1] == '0': data.append(result[pos]) data.append(0L) # \rnt are special else: data.append(result[pos:pos+2]) pos +=2 elif kind == "string" and (result[pos] in string.ascii_letters or result[pos] in string.digits): data.append(ord(result[pos])*1L) pos +=1 else: base = convert.which_base(result[pos:]) if base == 0: data.append(ord(result[pos])*1L) pos += 1 else: size_base = convert.size_base(base) size_number = convert.size_number(result[pos:]) start = pos + size_base end = pos + size_number nb = convert.base10(result[start:end],base) pos += size_number data.append(nb*1L) return data def __length (self, kind, data): # Calculate the size of the data to read in the file if kind == "string": replace = "" for i in data: # except: Too lazy to handle the '\r' and co otherwise try: replace+=chr(i) except: replace+='*' # This is for "\0" replace=replace.replace('*\0','*') # This is for two "\" replace=replace.replace('\\\\','*') # This is for the remaining "\{whatever}" replace=replace.replace('\\','') length = len(replace) else: length = self.data_size[kind] return length def __mime (self,list): mime='' for name in list: mime += name + " " mime = mime.rstrip() mime = mime.replace("\\a","\a") mime = mime.replace("\\b","\b") mime = mime.replace("\\f","\f") mime = mime.replace("\\n","\n") mime = mime.replace("\\r","\r") mime = mime.replace("\\t","\t") mime = mime.replace("\\v","\v") mime = mime.replace("\\0","\0") return mime def read_magic (self,magic_file): self.magic = [] try: f = open(magic_file,'rb') except: raise StandardError, "No valid magic file called \"" + str(magic_file) + "\"" index = 0 for line in f.readlines(): line = line.strip() if line and not line.startswith('#'): part = self.__split(line) # If the line is missing a text string assume it is '\b' while len(part) < 4: part.append('\b') # Get the level of the test level = self.__level(part[0]) # XXX: What does the & is used for in ">>&2" as we do not know skip it offset_string = self.__strip_start('&',part[0][level:]) # offset such as ([.[bslBSL]][+-][]) are indirect offset (direct,offset_type,offset_delta,offset_relatif) = self.__offset(offset_string) # The type can be associated to a netmask (oper,mask,rest) = self.__oper_mask(part[1]) # No idea what this 'u' is so skip it full_type = self.__strip_start('u',rest) endian = self.__endian(full_type) kind = self.__kind(full_type,endian) # Get the comparaison test and result (test,result) = self.__test_result(part[2]) # Get the value to check against data = self.__data(kind,result) # Get the length of the data length = self.__length(kind,data) # Special characters mime = self.__mime(part[3:]) # Append the line to the list self._level[index] = level self._direct[index] = direct self._offset_type[index] = offset_type self._offset_delta[index] = offset_delta self._offset_relatif[index] = offset_relatif self._endian[index] = endian self._kind[index] = kind self._oper[index] = oper self._mask[index] = mask self._test[index] = test self._data[index] = data self._length[index] = length self._mime[index] = mime self.entries = index index += 1 f.close() def write_cache (self,name): f = open (name,'wb') import cPickle cPickle.dump(self._level,f,1) cPickle.dump(self._direct,f,1) cPickle.dump(self._offset_relatif,f,1) cPickle.dump(self._offset_type,f,1) cPickle.dump(self._offset_delta,f,1) cPickle.dump(self._endian,f,1) cPickle.dump(self._kind,f,1) cPickle.dump(self._oper,f,1) cPickle.dump(self._mask,f,1) cPickle.dump(self._test,f,1) cPickle.dump(self._data,f,1) cPickle.dump(self._length,f,1) cPickle.dump(self._mime,f,1) f.close() def read_cache (self,name): f = open (name,'rb') import cPickle self._level = cPickle.load(f) self._direct = cPickle.load(f) self._offset_relatif = cPickle.load(f) self._offset_type = cPickle.load(f) self._offset_delta = cPickle.load(f) self._endian = cPickle.load(f) self._kind = cPickle.load(f) self._oper = cPickle.load(f) self._mask = cPickle.load(f) self._test = cPickle.load(f) self._data = cPickle.load(f) self._length = cPickle.load(f) self._mime = cPickle.load(f) self.entries = len(self._level) f.close() # classify subfuntions def __indirect_offset (self,file,type,offset): # Raise file error if file too short f.seek(offset) if type == 'l': delta = convert.little4(self.__read(f,4)) elif type == 'L': delta = convert.big4(self.__read(f,4)) elif type == 's': delta = convert.little2(self.__read(f,2)) elif type == 'S': delta = convert.big2(self.__read(f,2)) elif type == 'b': delta = ord(self.__read(f,1)) elif type == 'B': delta = ord(self.__read(f,1)) return offset + delta def __read (self,file,number): # This may retun IOError data = file.read(number) if not data: raise IOError, "out of file access" return data def __convert (self,kind,endian,data): # Can raise StandardError and IOError value = 0 # Convert the data from the file if kind == 'byte': if len(data) < 1: raise StandardError, "Should never happen, not enough data" value= ord(data[0]) elif kind == 'short': if len(data) < 2: raise StandardError, "Should never happen, not enough data" if endian == 'local': value= convert.local2(data) elif endian == 'little': value= convert.little2(data) elif endian == 'big': value= convert.big2(data) else: raise StandardError, "Endian type unknown" elif kind == 'long': if len(data) < 4: raise StandardError, "Should never happen, not enough data" if endian == 'local': value= convert.local4(data) elif endian == 'little': value= convert.little4(data) elif endian == 'big': value= convert.big4(data) else: raise StandardError, "Endian type unknown" elif kind == 'date': # XXX: Not done yet pass elif kind == 'ldate': # XXX: Not done yet pass elif kind == 'string': # Nothing to do pass elif kind == 'pstring': # Not done here anymore pass # #Convert it to be like a string # size=ord(data[0]) # # Not sure the test is right (at one byte) # if file_length < offset + size: # value= self.__read(f,size) # leng = size # kind = "string" else: raise StandardError, "Type " + str(kind) + " not recognised" return value def __binary_mask(self,oper,value,mask): if oper == '&': value &= mask elif oper == '^': value ^= mask elif oper == '': pass else: raise StandardError, "Binary operator unknown " + str(oper) return value def __read_string (self,file): # This may retun IOError limit=0 result = "" while limit < 100: char = self.__read(file,1) # chr(0) == '\x00' if char == '\x00' or char == '\n': break result += char limit += 1 if limit == 100: raise Failed() return result def __is_null_string(self,data): return len(data) == 2 and data[0] == '\\' and data[1] == 0L def classify(self,name): f = open(name,'rb') return self.classify_from_file_object( f ) # Hydrus Network Dev added this in July 2013 to facilitate checking mime from a file string in memory with cStringIO def classify_from_file_object( self, f ): if not self.entries: raise StandardError, "Not initialised properly" # Are we still looking for the ruleset to apply or are we in a rule found_rule = 0 # When we found the rule, what is the level that we successfull passed in_level = 0 # If we failed part of the rule there is no point looking for higher level subrule allow_next = 0 # String provided by the successfull rule result = "" f.seek(0,2) file_length = f.tell() for i in range(self.entries): level = self._level[i] # Optimisation: Skipping all the rule we do not match the first prerequesite if not found_rule and level > 0: # Nothing to do with this rule continue # We are currently working a rule if found_rule: # Finished: we performed a series of test and it is now completed if level == 0: break # No need to check a level if the previous one failed if level > allow_next: # Safely ignore this rule continue # The full magic rule direct = self._direct[i] offset_type = self._offset_type[i] offset_delta = self._offset_delta[i] offset_relatif = self._offset_relatif[i] endian = self._endian[i] kind = self._kind[i] oper = self._oper[i] mask = self._mask[i] test = self._test[i] data = self._data[i] leng = self._length[i] mime = self._mime[i] # This is what the file should contain to succed the test value = 0 # Does the magic line checked match the content of the file ? success = 0 # The content of the file that may be used for substitution with %s replace = None try: # Get the offset of the information to read if direct == 1: offset = offset_delta else: offset = self.__indirect_offset(file,offset_type,offset_delta) # If it is out of the file then the test fails. if file_length < offset: raise Failed() # Make sure we can read the data at the offset position f.seek(offset) extract=self.__read(f,leng) if not extract: raise Failed() # Convert the little/big endian value from the file value = self.__convert(kind,endian,extract) # If the value is masked, remove the unwanted bits value = self.__binary_mask(oper,value,mask) # Perform the test if test == '=': # If we are comparing string the string is already read if kind == 'string': # The string \0 seems special and it seems to be what to do if self.__is_null_string(data): success = 1 # Other string perform a byte to byte comparaison elif len(data) == len(extract): success=1 for index in range(len(data)): # XXX: Does this fail for '\r' test if ord(extract[index]) != data[index]: success = 0 elif kind == 'pstring': raise Failed, "pstring not implemented" else: success = (data[0] == value) replace = value elif test == '>': # If we are > a string, we have to read it from the file if kind == 'string': if self.__is_null_string(data): if ord(extract[0]) != 0: replace = extract + self.__read_string(f) success = 1 else: raise Failed, ">[^0] Not implemented" elif kind == 'pstring': raise Failed, "pstring not implemented" else: success = (value > data[0]) replace = value elif test == '<': if kind == 'string': success = 1 minimum = min(len(data),len(extract)) if len(extract) > minimum: success = 0 else: for index in range(minimum): if data[index] > extract[index]: success = 0 break elif kind == 'pstring': raise Failed, "pstring not implemented" else: success = (value < data[0]) replace = value elif test == '&': success = ((value & data[0]) == data[0]) replace = value elif test == '^': # XXX: To be tested with a known file success = ((value ^ data[0]) == 0) replace = value elif test == '!': # XXX: To be tested with a known file # XXX: Wrong so must be a binary inversion test # success = (value != data[0]) success = 0 replace = value elif test == 'x': # XXX: copy from the code in test == '>', should create a function if kind == 'string': limit=0 while 1: if ord(extract[0]) == 0 or limit > 100: break replace += extract extract = self.__read(f,1) limit += 1 if limit <= 100: success = 1 elif kind == 'pstring': raise Failed, "pstring not implemented" else: success = 1 replace = value else: raise StandardError, "test used '"+test+"' is not defined" if success: found_rule = 1 in_level = level allow_next = level+1 if replace is not None: try: mime = mime % replace except: pass if mime != []: result += mime result += ' ' else: raise Failed() except Failed, IOError: allow_next = level except: # The code must not raise any exception when it fails. pass # Hydrus Network Dev commented this out #f.close() if found_rule == 0: # XXX: API Change this was previously returning "unknown" return None # The magic file use "backspace" to concatenate what is normally separated with a space" return result.rstrip().lstrip('').replace(' \x08','') # XXX: Kept the old return just in case ... return result.rstrip().lstrip('\x08').replace(' \x08','') if __name__ == '__main__': import sys try: binname = sys.argv[1] except: binname = sys.argv[0] try: filename = sys.argv[2] except: filename = "magic.linux" try: cachename = sys.argv[3] except: cachename = "delete-me" magic = Magic(filename,cachename) classify = magic.classify(binname) if classify: print binname + ": " + classify else: print binname + ": Can not recognise file type"