Expand source code
def send_email(
*,
smtp_server_var: str,
smtp_port_var: str,
smtp_user_var: str,
smtp_password_var: str,
from_email_var: str,
to_emails_var: str,
subject_var: str,
email_body_var: str,
email_body_path_var: str,
) -> None:
# email configuration
smtp_server = os.environ[smtp_server_var or "SMTP_SERVER"]
smtp_port = int(os.getenv((smtp_port_var or "SMTP_PORT"), 587))
smtp_user = os.environ[smtp_user_var or "SMTP_USER"]
smtp_password = os.environ[smtp_password_var or "SMTP_PASSWORD"]
from_email = os.getenv((from_email_var or "FROM_EMAIL"), smtp_user)
to_emails = os.environ[to_emails_var or "TO_EMAILS"]
subject = os.environ[subject_var or "SUBJECT"]
email_body = os.getenv((email_body_var or "EMAIL_BODY"))
email_body_path = os.getenv((email_body_path_var or "EMAIL_BODY_PATH"))
if email_body and email_body_path:
print(
f"""
Warning: Both {(email_body_var or 'EMAIL_BODY')} and {(email_body_path_var or 'EMAIL_BODY_PATH')} are set.
Using {(email_body_var or 'EMAIL_BODY')}.
"""
)
if email_body:
body = email_body
elif email_body_path:
try:
with open(email_body_path, "r") as file:
body = file.read()
except Exception as e:
print(f"Failed to read email body from file: {e}")
sys.exit(1)
else:
raise ValueError(
f"Error: Either {(email_body_var or 'EMAIL_BODY')} or {(email_body_path_var or 'EMAIL_BODY_PATH')} must be set."
)
# Create the email message
msg = MIMEMultipart()
msg["From"] = from_email
msg["To"] = to_emails
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
# Send the emails
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(smtp_user, smtp_password)
text = msg.as_string()
for to_email in to_emails.split(","):
server.sendmail(from_email, to_email.strip(), text)
print(f"Email to {to_email.strip()} sent successfully!")
except Exception as e:
print(f"Failed to send email: {e}")
finally:
server.quit()