import sys, os import zipfile from datetime import datetime from email import message_from_file from tempfile import TemporaryDirectory from lxml.etree import parse as parse_xml from dmarcreceiver.model import DBSession, Report, ReportError, ReportRecord, OverrideReason, DKIMResult, SPFResult, metadata, init_model import transaction from .util import sendmail from .config import config def parse_metadata(tree): org_name, = tree.xpath('org_name/text()') email, = tree.xpath('email/text()') extra_contact_info = tree.xpath('extra_contact_info/text()') if len(extra_contact_info) == 0: extra_contact_info = None else: extra_contact_info = extra_contact_info[0] report_id, = tree.xpath('report_id/text()') date_begin, = tree.xpath('date_range/begin/text()') date_begin = datetime.fromtimestamp(int(date_begin)) date_end, = tree.xpath('date_range/end/text()') date_end = datetime.fromtimestamp(int(date_end)) return Report( org_name=org_name, email=email, extra_contact_info=extra_contact_info, report_id=report_id, date_begin=date_begin, date_end=date_end ) def scoop_elements(obj, tree, *elements): for elt in elements: try: value, = tree.xpath('./' + elt + '/text()') except ValueError: value = None else: if all([ c.isdigit() for c in value ]): value = int(value) setattr(obj, elt, value) def parse_report(f): tree = parse_xml(f) metadata_tree, = tree.xpath('/feedback/report_metadata') report = parse_metadata(metadata_tree) scoop_elements(report, tree.xpath('/feedback/policy_published')[0], 'domain', 'adkim', 'aspf', 'p', 'sp', 'pct') for record_node in tree.xpath('/feedback/record'): record = ReportRecord() scoop_elements(record, record_node.xpath('./row')[0], 'source_ip', 'count') scoop_elements(record, record_node.xpath('./row/policy_evaluated')[0], 'disposition', 'dkim', 'spf') for reason_node in record_node.xpath('./row/policy_evaluated/reason'): reason = OverrideReason() reason.policy_override_type, = reason_node.xpath('./type/text()') comment = reason_node.xpath('./comment/text()') if len(comment) > 0: reason.comment = comment[0] record.override_reasons.append(reason) scoop_elements(record, record_node.xpath('./identifiers')[0], 'envelope_to', 'header_from') for dkim_node in record_node.xpath('./auth_results/dkim'): dkim = DKIMResult() scoop_elements(dkim, dkim_node, 'domain', 'result', 'human_result') t = dkim_node.xpath('./selector/text()') if len(t) > 0: dkim.human_result = t[0] record.dkim_results.append(dkim) for spf_node in record_node.xpath('./auth_results/spf'): spf = SPFResult() scoop_elements(spf, spf_node, 'domain', 'result') record.spf_results.append(spf) report.records.append(record) DBSession.add(report) transaction.commit() def read_config_if_present(args): if args.config_file: if os.access(args.config_file, os.R_OK): config.read_file(open(args.config_file, 'rt')) def receive_report(args): read_config_if_present(args) init_model() # read email message from stdin msg = message_from_file(sys.stdin) # check for zip file content_type = msg['content-type'] if content_type.find(';') != -1: content_type = content_type.split(';',1)[0] if content_type != 'application/zip': # not a zip file - bounce to postmaster bounce_address = config.get('bounce_address') if args.bounce_address: bounce_address = args.bounce_address if bounce_address: msg['To'] = bounce_address sendmail(msg) return with TemporaryDirectory( prefix=os.path.splitext(os.path.basename(sys.argv[0]))[0] + '-' ) as tempdir: filename = msg.get_filename() if filename is None: filename = 'report.zip' fn = os.path.join(tempdir, filename) with open(fn, 'wb') as f: f.write(msg.get_payload(decode=True)) with zipfile.ZipFile(fn, 'r') as z: namelist = z.namelist() report_fn = os.path.join(tempdir, os.path.basename(namelist[0])) z.extract(namelist[0], path=tempdir) with open(report_fn, 'rb') as f: parse_report(f) def init(args): read_config_if_present(args) init_model() metadata.create_all()