Files
DMARCReceiver/dmarcreceiver/model.py
2020-01-22 18:02:35 -05:00

108 lines
4.4 KiB
Python

from zope.sqlalchemy import ZopeTransactionExtension
from sqlalchemy import Column, Integer, String, Unicode, Enum, CheckConstraint, ForeignKey, DateTime, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
import sqlalchemy.types as satypes
import sqlalchemy.dialects.postgresql as dpg
from sqlalchemy.ext.declarative import declarative_base
maker = sessionmaker(autoflush=True, autocommit=False,
extension=ZopeTransactionExtension())
DBSession = scoped_session(maker)
DeclarativeBase = declarative_base()
metadata = DeclarativeBase.metadata
def init_model():
from sqlalchemy import create_engine
from dmarcreceiver.config import config
engine = create_engine(config['db_uri'])
DBSession.configure(bind=engine)
metadata.bind = engine
return DBSession
Alignment = Enum('r', 's')
Disposition = Enum('none', 'quarantine', 'reject')
DMARCResult = Enum('pass', 'fail')
PolicyOverride = Enum('forwarded', 'sampled_out', 'trusted_forwarder', 'other')
SPFResultType = Enum('none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', 'permerror')
DKIMResultType = Enum('none', 'pass', 'fail', 'policy', 'neutral', 'temperror', 'permerror')
class INET(satypes.TypeDecorator):
impl = satypes.CHAR
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(dpg.INET())
else:
return dialect.type_descriptor(satypes.CHAR(40))
class Report(DeclarativeBase):
__tablename__ = 'reports'
id = Column(Integer, primary_key=True)
org_name = Column(String, nullable=False)
email = Column(String, nullable=False)
extra_contact_info = Column(String, nullable=True)
report_id = Column(String, nullable=False)
date_begin = Column(DateTime, nullable=False)
date_end = Column(DateTime, nullable=False)
domain = Column(String, nullable=False)
adkim = Column(Alignment, nullable=False)
aspf = Column(Alignment, nullable=False)
p = Column(Disposition, nullable=False)
sp = Column(Disposition, nullable=False)
pct = Column(Integer, CheckConstraint('pct >= 0 AND pct <= 100'), nullable=False)
class ReportError(DeclarativeBase):
__tablename__ = 'report_errors'
id = Column(Integer, primary_key=True)
report_id = Column(Integer, ForeignKey(Report.id, onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
error = Column(String, nullable=False)
class ReportRecord(DeclarativeBase):
__tablename__ = 'report_records'
id = Column(Integer, primary_key=True)
report_id = Column(Integer, ForeignKey(Report.id, onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
source_ip = Column(INET, nullable=False)
count = Column(Integer, nullable=False, default=0)
disposition = Column(Disposition, nullable=True)
dkim = Column(DMARCResult, nullable=True)
spf = Column(DMARCResult, nullable=True)
envelope_to = Column(String, nullable=True)
header_from = Column(String, nullable=False)
report = relationship(Report, backref='records')
class OverrideReason(DeclarativeBase):
__tablename__ = 'report_record_override_reasons'
id = Column(Integer, primary_key=True)
record_id = Column(Integer, ForeignKey(ReportRecord.id, onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
policy_override_type = Column(PolicyOverride, nullable=False)
comment = Column(String, nullable=True)
record = relationship(ReportRecord, backref='override_reasons')
class DKIMResult(DeclarativeBase):
__tablename__ = 'report_record_dkim_results'
id = Column(Integer, primary_key=True)
record_id = Column(Integer, ForeignKey(ReportRecord.id, onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
domain = Column(String, nullable=False)
result = Column(DKIMResultType, nullable=False)
human_result = Column(String, nullable=True)
record = relationship(ReportRecord, backref='dkim_results')
class SPFResult(DeclarativeBase):
__tablename__ = 'report_record_spf_results'
id = Column(Integer, primary_key=True)
record_id = Column(Integer, ForeignKey(ReportRecord.id, onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
domain = Column(String, nullable=False)
result = Column(SPFResultType, nullable=False)
record = relationship(ReportRecord, backref='spf_results')
if __name__ == '__main__':
engine = create_engine('sqlite:///data.db')
metadata.bind = engine
DBSession.configure(bind=engine)