Files
conference/app.py
T
2026-04-25 07:17:47 +00:00

4629 lines
169 KiB
Python

import base64
import io
import os
import random
import smtplib
import string
import uuid
import requests
from datetime import datetime
from email.encoders import encode_base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from functools import wraps
import bcrypt
import mysql.connector
import qrcode
from babel import Locale
from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for, send_file
from flask_cors import CORS
from flask_babel import Babel, gettext as _, ngettext, force_locale
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import mm
from reportlab.pdfgen import canvas
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from mysql.connector import Error
from config import Config
app = Flask(__name__, template_folder='templates', static_folder='static')
app.config.from_object(Config)
CORS(app)
# Supported locales
SUPPORTED_LOCALES = ['en', 'nl', 'de', 'fr', 'es', 'it', 'pl']
def get_locale():
"""Determine the best locale based on URL parameter, cookie, session, or user preference."""
from flask import session, request
from config import Config
# 1. Check URL parameter
lang = request.args.get('lang')
if lang and lang in SUPPORTED_LOCALES:
session['locale'] = lang
return lang
# 2. Check session
if 'locale' in session and session['locale'] in SUPPORTED_LOCALES:
return session['locale']
# 3. Check cookie (for returning visitors)
lang = request.cookies.get('locale')
if lang and lang in SUPPORTED_LOCALES:
session['locale'] = lang
return lang
# 4. Check user preference from database
if 'user_id' in session and 'user_type' in session:
user_lang = get_user_preferred_language(session['user_id'], session.get('user_type'))
if user_lang:
return user_lang
# 5. Default
return Config.BABEL_DEFAULT_LOCALE
def get_locale_string():
"""Return locale as string for Babel."""
locale = get_locale()
# Convert 'en' to 'en_US' format if needed, but Babel 4.0 should handle short codes
return locale
# Initialize Babel with locale_selector
babel = Babel()
babel.init_app(
app,
default_locale=Config.BABEL_DEFAULT_LOCALE,
default_translation_directories=Config.BABEL_TRANSLATION_DIRECTORIES,
locale_selector=get_locale_string
)
def get_user_preferred_language(user_id, user_type):
"""Get user's preferred language from database."""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if user_type == 'organizer':
cursor.execute("SELECT preferred_language FROM organizers WHERE id = %s", (user_id,))
elif user_type == 'attendee':
cursor.execute("SELECT preferred_language FROM attendees WHERE id = %s", (user_id,))
elif user_type == 'staff':
cursor.execute("SELECT preferred_language FROM staff WHERE id = %s", (user_id,))
else:
cursor.close()
conn.close()
return None
result = cursor.fetchone()
cursor.close()
conn.close()
if result and result.get('preferred_language') in SUPPORTED_LOCALES:
return result['preferred_language']
except Error:
pass
return None
# English translations map (replaces database translations)
ENGLISH_TRANSLATIONS = {
'dashboard': 'Dashboard',
'logout': 'Logout',
'login': 'Login',
'organizer_login': 'Organizer Login',
'register': 'Register',
'events': 'Events',
'attendees': 'Attendees',
'profile': 'Profile',
'create_event': 'Create Event',
'edit_event': 'Edit Event',
'delete_event': 'Delete Event',
'event_details': 'Event Details',
'staff': 'Staff',
'add_staff': 'Add Staff',
'remove_staff': 'Remove Staff',
'breakout_sessions': 'Break-out Sessions',
'registered_attendees': 'Registered Attendees',
'checked_in': 'Checked In',
'not_checked_in': 'Not Checked In',
'invite_pending': 'Invite Pending',
'active': 'Active',
'name': 'Name',
'email': 'Email',
'status': 'Status',
'actions': 'Actions',
'start': 'Start',
'end': 'End',
'location': 'Location',
'max_attendees': 'Max Attendees',
'registration_link': 'Registration Link',
'details': 'Details',
'unlimited': 'unlimited',
'registered': 'registered',
'my_events': 'My Events',
'no_events_yet': "You haven't created any events yet.",
'create_first_event': 'Create your first event',
'organizer': 'Organizer',
'first_name': 'First Name',
'last_name': 'Last Name',
'description': 'Description',
'view': 'View',
'remove': 'Remove',
'add_attendee': 'Add Attendee',
'add_breakout_session': 'Add Break-out Session',
'add_staff_member': 'Add Staff Member',
'no_staff_yet': 'No staff members added yet.',
'no_breakout_sessions_yet': 'No break-out sessions created yet.',
'confirm_remove_staff': 'Are you sure you want to remove',
'from_event': 'from this event',
'remove_staff_member': 'Remove Staff Member',
'time': 'Time',
'organisation': 'Organisation',
'role': 'Role',
'role_profession': 'Role / Profession',
'phone': 'Phone',
'linkedin': 'LinkedIn',
'about_me': 'About Me',
'cancel': 'Cancel',
'welcome': 'Welcome to NetEvents',
'connect_with_professionals': 'Connect with professionals at networking events',
'register_as_organizer': 'Register as Organizer',
'already_have_account': 'Already have an account? Login as:',
'presenter': 'Presenter',
'visitor': 'Visitor',
'organiser': 'Organiser',
'upcoming_events': 'Upcoming Events',
'no_upcoming_events': 'No upcoming events at the moment. Check back soon!',
'platform_features': 'Platform Features',
'for_organizers': 'For Organizers',
'for_attendees': 'For Attendees',
'view_event': 'View Event',
'translation_management': 'Translation Management',
'key': 'Key',
'translation': 'Translation',
'language': 'Language',
'save': 'Save',
'add': 'Add',
'translations': 'Translations',
'presenter_dashboard': 'Presenter Dashboard',
'my_breakout_sessions': 'My Break-out Sessions',
'event': 'Event',
'scan_qr_code': 'Scan QR Code',
'scan_qr_to_connect': 'Scan QR to connect!',
'scan_instructions': 'Point your camera at an attendee QR code to check them in',
'camera_access': 'Camera Access',
'camera_permission_denied': 'Camera permission denied. Please allow camera access to scan QR codes.',
'error_reading_qr': 'Error reading QR code',
'attendee_not_found': 'Attendee not found',
'already_checked_in': 'Already checked in',
'check_in_successful': 'Check-in successful',
'password': 'Password',
'confirm_password': 'Confirm Password',
'complete_registration': 'Complete Registration',
'youre_invited': "You're invited to join",
'set_password_complete': 'Set a password to complete your registration',
'your_profile': 'Your Profile',
'edit_profile': 'Edit Profile',
'save_profile': 'Save Profile',
'profile_updated': 'Profile updated successfully',
'connection_requests': 'Connection Requests',
'my_connections': 'My Connections',
'send_connection_request': 'Send Connection Request',
'connection_sent': 'Connection request sent',
'accept': 'Accept',
'reject': 'Reject',
'pending': 'Pending',
'accepted': 'Accepted',
'rejected': 'Rejected',
'appointments': 'Appointments',
'request_appointment': 'Request Appointment',
'appointment_time': 'Appointment Time',
'appointment_notes': 'Notes',
'appointment_location': 'Location',
'request_sent': 'Appointment request sent',
'appointments_with': 'Appointments with',
'no_appointments': 'No appointments yet',
'breakout_session': 'Break-out Session',
'breakout_session_detail': 'Break-out Session Details',
'speaker': 'Speaker',
'session_full': 'Session Full',
'register_for_session': 'Register for Session',
'registered_for_session': 'Registered for Session',
'unregister_from_session': 'Unregister from Session',
'check_in': 'Check In',
'check_out': 'Check Out',
'event_checked_in': 'Event Checked In',
'event_checked_out': 'Event Checked Out',
'your_events': 'Your Events',
'past_events': 'Past Events',
'no_past_events': 'No past events',
'event_not_found': 'Event not found',
'invalid_invite_link': 'Invalid or expired invite link',
'password_mismatch': 'Passwords do not match',
'password_too_short': 'Password must be at least 8 characters',
'registration_successful': 'Registration successful',
'login_successful': 'Login successful',
'logout_successful': 'Logout successful',
'access_denied': 'Access denied',
'page_not_found': 'Page not found',
'internal_error': 'Internal server error',
'db_error': 'Database error',
'error_adding_staff': 'Error adding staff member',
'error_checking_in': 'Error checking in attendee',
'badge_printed': 'Badge printed',
'no_events': 'No events found',
'create_event': 'Create Event',
'edit_breakout_session': 'Edit Break-out Session',
'delete_breakout_session': 'Delete Break-out Session',
'view_all': 'View All',
'loading': 'Loading...',
'submit': 'Submit',
'close': 'Close',
'back': 'Back',
'next': 'Next',
'previous': 'Previous',
'search': 'Search',
'filter': 'Filter',
'sort_by': 'Sort by',
'ascending': 'Ascending',
'descending': 'Descending',
'rows_per_page': 'Rows per page',
'no_data': 'No data available',
'staff_dashboard': 'Staff Dashboard',
'select_event': 'Select an Event',
'no_events_assigned': 'No events assigned to you yet',
'start_qr_scanner': 'Start QR Scanner',
'back_to_dashboard': 'Back to Dashboard',
'download_badge': 'Download Badge',
'email_badge': 'Email Badge',
'badge_sent': 'Badge sent to your email!',
'attendee_types': 'Attendee Types',
'manage_types': 'Manage Types',
'create_attendee_type': 'Create Attendee Type',
'type_name': 'Type Name',
'e_g_vip_speaker_student': 'e.g., VIP, Speaker, Student',
'price': 'Price',
'optional': 'optional',
'leave_empty_for_free': 'Leave empty for free',
'create_type': 'Create Type',
'existing_types': 'Existing Types',
'registration_link': 'Registration Link',
'copy': 'Copy',
'delete': 'Delete',
'confirm_delete_type': 'Are you sure you want to delete this attendee type?',
'edit_attendee_type': 'Edit Attendee Type',
'back_to_types': 'Back to Types',
'type_details': 'Type Details',
'save_changes': 'Save Changes',
'edit': 'Edit',
'cannot_delete_type_with_attendees': 'Cannot delete: %d attendees assigned to this type',
'no_attendee_types_yet': 'No attendee types defined yet.',
'create_first_type': 'Create the first type',
'back_to_event': 'Back to Event',
'link_copied': 'Link copied!',
'copy_failed': 'Failed to copy link.',
'free': 'Free',
'select_all': 'Select All',
'assign_type': 'Assign Type',
'no_type': 'No Type',
'apply': 'Apply',
'selected': 'selected',
'type': 'Type',
'registration_type': 'Registration Type',
'manage_attendee_types_across_events': 'Manage attendee types across all your events',
'no_attendee_types_for_event': 'No attendee types defined for this event.',
'attendees': 'Attendees',
'manage': 'Manage',
'payment': 'Payment',
'complete_payment': 'Complete Payment',
'back_to_registration': 'Back to Registration',
'order_summary': 'Order Summary',
'event_registration': 'Event Registration',
'attendee_type': 'Attendee Type',
'attendee': 'Attendee',
'total': 'Total',
'name_on_card': 'Name on Card',
'card_number': 'Card Number',
'expiry_date': 'Expiry Date',
'cvv': 'CVV',
'pay_now': 'Pay Now',
'payment_secure': 'Your payment is secure and encrypted',
'john_doe': 'John Doe',
'mm/yy': 'MM/YY',
}
def get_translation(key, locale=None):
"""Get translation for a key in English, with underscores replaced by spaces."""
result = ENGLISH_TRANSLATIONS.get(key, key)
# Replace underscores with spaces and title case
if '_' in str(result):
words = str(result).split('_')
result = ' '.join(word.capitalize() for word in words)
return result
def t(key, locale=None):
"""Shorthand for get_translation."""
return get_translation(key, locale)
def strftime_to_babel_pattern(strftime_format):
"""Convert Python strftime format string to Babel CLDR pattern.
Quotes literal text for proper Babel interpretation.
"""
import re
# Mapping of strftime codes to Babel CLDR codes
strftime_to_babel = {
'%Y': 'yyyy',
'%y': 'yy',
'%m': 'MM',
'%B': 'MMMM',
'%b': 'MMM',
'%d': 'dd',
'%A': 'EEEE',
'%a': 'EEE',
'%H': 'HH',
'%I': 'hh',
'%M': 'mm',
'%S': 'ss',
'%p': 'a',
'%j': 'DDD',
'%U': 'ww',
'%W': 'ww',
}
# Find all strftime format codes and their positions
format_code_pattern = re.compile(r'%[A-Za-z]')
result_parts = []
last_end = 0
for match in format_code_pattern.finditer(strftime_format):
# Get the literal text before this format code
if match.start() > last_end:
literal = strftime_format[last_end:match.start()]
# Quote literal text for Babel (escape single quotes by doubling)
if literal:
quoted_literal = literal.replace("'", "''")
result_parts.append(f"'{quoted_literal}'")
# Convert the format code
code = match.group()
if code in strftime_to_babel:
result_parts.append(strftime_to_babel[code])
last_end = match.end()
# Handle any remaining literal text after the last format code
if last_end < len(strftime_format):
literal = strftime_format[last_end:]
if literal:
quoted_literal = literal.replace("'", "''")
result_parts.append(f"'{quoted_literal}'")
return ''.join(result_parts)
def localized_date(dt, locale=None, format=None):
"""Format a datetime in the specified locale's format."""
if dt is None:
return ''
# Ensure dt is a datetime object
from datetime import datetime as dt_class
if not isinstance(dt, dt_class):
# If it's a string or something else, try to parse it or return empty
if isinstance(dt, str):
try:
dt = dt_class.strptime(dt, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
dt = dt_class.strptime(dt, '%Y-%m-%dT%H:%M:%S')
except ValueError:
return str(dt) if dt else ''
else:
return str(dt) if dt else ''
if locale is None:
locale = get_locale()
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT date_format FROM languages WHERE code = %s", (locale,))
result = cursor.fetchone()
cursor.close()
conn.close()
if format is None and result:
format = strftime_to_babel_pattern(result['date_format'])
elif format is None:
format = 'MMMM d, yyyy h:mm a'
else:
# User provided a format from Jinja filter - convert it too
format = strftime_to_babel_pattern(format)
# Use babel's date formatting for proper locale support
from babel.dates import format_datetime
return format_datetime(dt, format, locale=locale)
except Exception:
# Fallback to strftime (will only properly show English)
try:
return dt.strftime('%B %d, %Y at %H:%M')
except Exception:
return str(dt)
# Jinja2 filters
@app.template_filter('localized_date')
def localized_date_filter(dt, format=None):
return localized_date(dt, get_locale(), format)
@app.template_filter('t')
def translate_filter(key):
# Always use English for translations
return get_translation(key, 'en')
@app.template_filter('spacify')
def spacify_filter(text):
"""Replace underscores with spaces and capitalize each word."""
if text is None or text == '':
return ''
return ' '.join(word.capitalize() for word in str(text).split('_'))
@app.template_filter('default')
def default_filter(value, default_value=''):
"""Return default_value if value is none or undefined."""
if value is None:
return default_value
return value
@app.template_filter('format_currency')
def format_currency_filter(value):
"""Format a number as currency."""
if value is None or value == '':
return ''
try:
return f'{float(value):.2f}'
except (ValueError, TypeError):
return str(value)
@app.context_processor
def utility_processor():
"""Add utility functions to template context."""
def get_currency_symbol():
return '' # Could be made dynamic based on locale
return dict(get_currency_symbol=get_currency_symbol)
# Update session locale when language changes
@app.before_request
def before_request():
"""Before each request, update locale from URL if provided."""
if 'lang' in request.args:
lang = request.args.get('lang')
if lang in SUPPORTED_LOCALES:
session['locale'] = lang
@app.after_request
def prevent_caching(response):
"""Prevent proxy caching to ensure fresh translations."""
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Ensure upload folder exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
def get_db_connection(user=Config.DB_APP_USER, password=Config.DB_APP_PASSWORD):
"""Get a database connection."""
return mysql.connector.connect(
host=Config.DB_HOST,
port=Config.DB_PORT,
user=user,
password=password,
database=Config.DB_NAME
)
def generate_event_code():
"""Generate a unique 10-character alphanumeric event code."""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=10))
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM events WHERE code = %s", (code,))
if not cursor.fetchone():
cursor.close()
conn.close()
return code
cursor.close()
conn.close()
def generate_session_code():
"""Generate a unique 10-character alphanumeric breakout session code."""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=10))
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,))
if not cursor.fetchone():
cursor.close()
conn.close()
return code
cursor.close()
conn.close()
def generate_attendee_code():
"""Generate a unique 10-character alphanumeric attendee code."""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=10))
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM attendees WHERE attendee_code = %s", (code,))
if not cursor.fetchone():
cursor.close()
conn.close()
return code
cursor.close()
conn.close()
def generate_staff_code():
"""Generate a unique 10-character alphanumeric staff code."""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=10))
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM organizers WHERE staff_code = %s", (code,))
if not cursor.fetchone():
cursor.close()
conn.close()
return code
cursor.close()
conn.close()
def generate_type_code():
"""Generate a unique 10-character alphanumeric attendee type code."""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=10))
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM attendee_types WHERE code = %s", (code,))
if not cursor.fetchone():
cursor.close()
conn.close()
return code
cursor.close()
conn.close()
def login_required(f):
"""Decorator to require login."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session or 'user_type' not in session:
flash('Please log in first.')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def get_current_user():
"""Get current logged-in user from session."""
if 'user_id' not in session or 'user_type' not in session:
return None, None
return session['user_id'], session['user_type']
# Utility functions
def hash_password(password):
"""Hash a password."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password, hashed):
"""Verify a password."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def verify_recaptcha(recaptcha_response):
"""Verify a reCAPTCHA response with Google."""
secret_key = app.config.get('RECAPTCHA_SECRET_KEY')
if not secret_key:
return True # Skip verification if no secret key configured
try:
payload = {
'secret': secret_key,
'response': recaptcha_response
}
resp = requests.post('https://www.google.com/recaptcha/api/siteverify', data=payload, timeout=10)
result = resp.json()
return result.get('success', False)
except Exception:
return False
# Rate limiting for failed login attempts
failed_login_attempts = {} # {ip_address: [(timestamp, count), ...]}
# Rate limiting for failed personal page attempts (30-minute block)
failed_personal_page_attempts = {} # {ip_address: [(timestamp, count), ...]}
PERSONAL_PAGE_BLOCK_DURATION = 30 * 60 # 30 minutes in seconds
PERSONAL_PAGE_MAX_ATTEMPTS = 5 # Max failed attempts before blocking
def is_ip_blocked(ip_address):
"""Check if an IP address is blocked due to too many failed login attempts."""
if ip_address not in failed_login_attempts:
return False
# Clean up old entries (older than 1 hour)
current_time = datetime.now()
failed_login_attempts[ip_address] = [
(ts, count) for ts, count in failed_login_attempts[ip_address]
if (current_time - ts).total_seconds() < 3600
]
# Check if still blocked
if len(failed_login_attempts[ip_address]) >= 5:
return True
# Remove empty entries
if not failed_login_attempts[ip_address]:
del failed_login_attempts[ip_address]
return False
def is_ip_blocked_for_personal_page(ip_address):
"""Check if an IP address is blocked due to too many failed personal page attempts."""
if ip_address not in failed_personal_page_attempts:
return False
# Clean up old entries (older than 30 minutes)
current_time = datetime.now()
failed_personal_page_attempts[ip_address] = [
(ts, count) for ts, count in failed_personal_page_attempts[ip_address]
if (current_time - ts).total_seconds() < PERSONAL_PAGE_BLOCK_DURATION
]
# Check if still blocked
if len(failed_personal_page_attempts[ip_address]) >= PERSONAL_PAGE_MAX_ATTEMPTS:
return True
# Remove empty entries
if not failed_personal_page_attempts[ip_address]:
del failed_personal_page_attempts[ip_address]
return False
def record_failed_personal_page_attempt(ip_address):
"""Record a failed personal page attempt for an IP address."""
if ip_address not in failed_personal_page_attempts:
failed_personal_page_attempts[ip_address] = []
failed_personal_page_attempts[ip_address].append((datetime.now(), 1))
def clear_failed_personal_page_attempts(ip_address):
"""Clear failed personal page attempts after successful access."""
if ip_address in failed_personal_page_attempts:
del failed_personal_page_attempts[ip_address]
def record_failed_login(ip_address):
"""Record a failed login attempt for an IP address."""
if ip_address not in failed_login_attempts:
failed_login_attempts[ip_address] = []
failed_login_attempts[ip_address].append((datetime.now(), 1))
def clear_failed_logins(ip_address):
"""Clear failed login attempts after successful login."""
if ip_address in failed_login_attempts:
del failed_login_attempts[ip_address]
def ensure_staff_code_column():
"""Ensure the staff_code column exists in organizers table."""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'organizers' AND COLUMN_NAME = 'staff_code'
""", (Config.DB_NAME,))
if not cursor.fetchone():
cursor.execute("ALTER TABLE organizers ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
conn.commit()
print("Added staff_code column to organizers table")
cursor.close()
conn.close()
except Error as e:
print(f"Error ensuring staff_code column: {e}")
def ensure_all_organizers_have_staff_code():
"""Ensure all existing organizers have a staff_code."""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM organizers WHERE staff_code IS NULL")
organizers_without_code = cursor.fetchall()
for org in organizers_without_code:
staff_code = generate_staff_code()
cursor.execute("UPDATE organizers SET staff_code = %s WHERE id = %s", (staff_code, org['id']))
if organizers_without_code:
conn.commit()
print(f"Generated staff_codes for {len(organizers_without_code)} organizers")
cursor.close()
conn.close()
except Error as e:
print(f"Error generating staff codes: {e}")
def ensure_staff_staff_code_column():
"""Ensure the staff_code column exists in staff table."""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'staff' AND COLUMN_NAME = 'staff_code'
""", (Config.DB_NAME,))
if not cursor.fetchone():
cursor.execute("ALTER TABLE staff ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
conn.commit()
print("Added staff_code column to staff table")
cursor.close()
conn.close()
except Error as e:
print(f"Error ensuring staff staff_code column: {e}")
def ensure_all_staff_have_staff_code():
"""Ensure all existing staff members have a staff_code."""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM staff WHERE staff_code IS NULL")
staff_without_code = cursor.fetchall()
for s in staff_without_code:
staff_code = generate_staff_code()
cursor.execute("UPDATE staff SET staff_code = %s WHERE id = %s", (staff_code, s['id']))
if staff_without_code:
conn.commit()
print(f"Generated staff_codes for {len(staff_without_code)} staff members")
cursor.close()
conn.close()
except Error as e:
print(f"Error generating staff codes: {e}")
def allowed_file(filename):
"""Check if file extension is allowed."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in Config.ALLOWED_EXTENSIONS
def generate_invite_token():
"""Generate a unique invite token for staff."""
return uuid.uuid4().hex
def send_staff_invite_email(staff_email, staff_name, event_name, invite_token, organizer_email):
"""Send invite email to staff member."""
invite_url = url_for('staff_invite', token=invite_token, _external=True)
subject = f"You're invited to join {event_name} as Staff"
body = f"""Hello {staff_name},
You've been invited to join the event "{event_name}" as a staff member.
Click the link below to complete your registration and set your password:
{invite_url}
This link will expire in 7 days.
Best regards,
The Event Team
"""
msg = MIMEMultipart()
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = staff_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
if Config.MAIL_USE_TLS:
server.starttls()
if Config.MAIL_USERNAME:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
def send_attendee_confirmation_email(attendee_email, attendee_name, event_name, event_date, event_location, personal_page_url=None):
"""Send confirmation email to attendee after registration."""
subject = f"Registration confirmed for {event_name}"
personal_page_section = f"""
Your Personal Page:
{personal_page_url}
View your registered events and breakout sessions anytime.
""" if personal_page_url else ""
body = f"""Hello {attendee_name},
Your registration for {event_name} has been confirmed!
Event Details:
- Date: {event_date}
- Location: {event_location}{personal_page_section}
You can now log in to:
- View event details
- RSVP for break-out sessions
- Connect with other attendees
We look forward to seeing you there!
Best regards,
The Event Team
"""
msg = MIMEMultipart()
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = attendee_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
if Config.MAIL_USE_TLS:
server.starttls()
if Config.MAIL_USERNAME:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
def send_breakout_session_update_email(attendee_email, attendee_name, session_name, event_name, changes_text):
"""Send update email to attendee when a breakout session is modified."""
subject = f"Update on Breakout Session: {session_name}"
body = f"""Hello {attendee_name},
The breakout session "{session_name}" for {event_name} has been updated.
Changes made:
{changes_text}
Please log in to view the updated details.
Best regards,
The Event Team
"""
msg = MIMEMultipart()
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = attendee_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
if Config.MAIL_USE_TLS:
server.starttls()
if Config.MAIL_USERNAME:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
def send_event_update_email(attendee_email, attendee_name, event_name, changes_text):
"""Send update email to attendee when an event is modified."""
subject = f"Update on Event: {event_name}"
body = f"""Hello {attendee_name},
The event "{event_name}" has been updated.
Changes made:
{changes_text}
Please log in to view the updated details.
Best regards,
The Event Team
"""
msg = MIMEMultipart()
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = attendee_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
if Config.MAIL_USE_TLS:
server.starttls()
if Config.MAIL_USERNAME:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
def generate_badge_pdf(attendee, event=None):
"""Generate an A4 PDF badge for an attendee.
The PDF is designed to be folded horizontally and vertically once,
with the attendee info in the lower-left quarter.
Includes dotted fold lines and QR code.
"""
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=A4)
width, height = A4 # 595.28 x 841.89 points
# A4 dimensions in mm
a4_width_mm = 210
a4_height_mm = 297
# Margins - 10cm higher than before (10cm = 100mm)
left_margin = 15 * mm
bottom_margin = 15 * mm + 100 * mm # 10cm higher
# Dotted line settings
dash_length = 5
gap_length = 5
# Draw vertical dotted line down the middle
center_x = width / 2
c.setDash(dash_length, gap_length)
c.setLineWidth(0.5)
c.line(center_x, 0, center_x, height)
# Draw horizontal dotted line over the middle
center_y = height / 2
c.line(0, center_y, width, center_y)
# Reset dash for other drawing
c.setDash([])
# Draw VISITOR bar at top of bottom-left quadrant, left edge to center, below dotted line
bar_height = 50 # Height of the bar in points (doubled)
c.setFillColorRGB(0, 0, 0) # Black color
c.rect(0, center_y - bar_height, center_x, bar_height, fill=True, stroke=False)
# Draw VISITOR text in white, centered
c.setFillColorRGB(1, 1, 1) # White
c.setFont("Helvetica-Bold", 27) # 150% of 18
text = "VISITOR"
text_width = c.stringWidth(text, "Helvetica-Bold", 27)
text_x = (center_x - text_width) / 2
c.drawString(text_x, center_y - bar_height + 12, text)
# Reset fill color to black for subsequent text
c.setFillColorRGB(0, 0, 0)
# Font setup - 32pt (twice as high as 16pt) for name, 16pt for org/role
# Bold for name, regular for org/role
c.setFont("Helvetica-Bold", 32)
# Position in lower-left quarter
# Content y position (from bottom of page) - moved 1.5cm (42.5pt) further up
y_name = bottom_margin + 40 - 42.5 # Start with name
# Draw attendee first name (bold, 32pt)
first_name = attendee.get('first_name', '')
last_name = attendee.get('last_name', '')
# Check if names fit within the width limit (half of A4 width minus margins)
max_name_width = (width / 2) - left_margin - 10 # 10pt padding
full_name_width = c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", 32)
# If full name is too wide, reduce font size
font_size = 32
if full_name_width > max_name_width:
# Try smaller sizes
for size in [28, 24, 20, 18, 16]:
if c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", size) <= max_name_width:
font_size = size
break
c.setFont("Helvetica-Bold", font_size)
# Draw first name
c.drawString(left_margin, y_name, first_name)
# Draw last name on next line
y_last_name = y_name - (font_size * 1.2)
c.drawString(left_margin, y_last_name, last_name)
# Extra blank line after last name
y_org = y_last_name - (font_size * 1.4)
# Draw organization (regular, 16pt, 150% = 24pt)
c.setFont("Helvetica", 24)
y_org = y_name - 50
c.drawString(left_margin, y_org, attendee.get('organisation', '') or '')
# Draw role (regular, 16pt, 150% = 24pt)
c.setFont("Helvetica", 24)
y_role = y_org - 40
c.drawString(left_margin, y_role, attendee.get('role', '') or '')
# Generate QR code for attendee_code
attendee_code = attendee.get('attendee_code', '')
if attendee_code:
qr = qrcode.QRCode(version=1, box_size=10, border=1)
qr.add_data(attendee_code)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
# Convert to bytes and then to ImageReader for reportlab
qr_buffer = io.BytesIO()
qr_img.save(qr_buffer, format='PNG')
qr_buffer.seek(0)
from reportlab.lib.utils import ImageReader
qr_reader = ImageReader(qr_buffer)
# Draw QR code at bottom-left with 15mm left padding, flush to bottom edge
qr_size = 40 * mm # QR code size
qr_x = 15 * mm # 15mm from left
qr_y = 25 * mm # 10mm from top of bottom_bar (15mm + 10mm)
c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size)
# Draw bottom black bar with event info (from bottom edge, 150mm high)
if event:
bar_bottom_height = 15 * mm # 15mm high
c.setFillColorRGB(0, 0, 0) # Black color
c.rect(0, 0, center_x, bar_bottom_height, fill=True, stroke=False)
# Draw event name centered
c.setFillColorRGB(1, 1, 1) # White
c.setFont("Helvetica-Bold", 14)
event_name = event.get('name', '') or ''
event_name_width = c.stringWidth(event_name, "Helvetica-Bold", 14)
event_name_x = (center_x - event_name_width) / 2
c.drawString(event_name_x, bar_bottom_height - 21, event_name)
# Draw start and end date/time on the same line, centered
c.setFont("Helvetica", 10)
start_time = event.get('start_time')
end_time = event.get('end_time')
if start_time and end_time:
start_str = localized_date(start_time, get_locale(), '%d/%m/%Y %H:%M')
end_str = localized_date(end_time, get_locale(), '%d/%m/%Y %H:%M')
combined_str = start_str + " - " + end_str
combined_width = c.stringWidth(combined_str, "Helvetica", 10)
combined_x = (center_x - combined_width) / 2
c.drawString(combined_x, bar_bottom_height - 37, combined_str)
c.save()
buffer.seek(0)
return buffer
def generate_rectangular_badges_pdf(attendees, event=None, attendee_types_map=None):
"""Generate an A4 PDF with rectangular labels.
Label dimensions: 80mm x 50mm
Layout: 2 columns x 5 rows = 10 badges per A4 page.
Each badge contains: attendee type (black bar), name, organization, role, and QR code.
"""
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=A4)
width, height = A4 # 595.28 x 841.89 points
# Label dimensions: 80mm x 50mm
label_width_mm = 80
label_height_mm = 50
label_width = label_width_mm * mm
label_height = label_height_mm * mm
badges_per_row = 2
badges_per_col = 5
badges_per_page = badges_per_row * badges_per_col
# Page margins
margin_left_mm = 15
margin_right_mm = 15
margin_top_mm = 12
margin_bottom_mm = 12
# Calculate spacing
available_width = width - (margin_left_mm * mm) - (margin_right_mm * mm)
available_height = height - (margin_top_mm * mm) - (margin_bottom_mm * mm)
# Gap between labels
gap_x = (available_width - badges_per_row * label_width) / (badges_per_row + 1)
gap_y = (available_height - badges_per_col * label_height) / (badges_per_col + 1)
# Badge positions (starting from top-left)
badge_positions = []
for row in range(badges_per_col):
for col in range(badges_per_row):
x = margin_left_mm * mm + gap_x + col * (label_width + gap_x)
y = height - margin_top_mm * mm - gap_y - row * (label_height + gap_y) - label_height
badge_positions.append((x, y))
# Draw badges for each attendee
for i, attendee in enumerate(attendees):
badge_index = i % badges_per_page
# New page if needed
if badge_index == 0 and i > 0:
c.showPage()
if badge_index < len(badge_positions):
x, y = badge_positions[badge_index]
# Draw label border (light gray)
c.setStrokeColorRGB(0.7, 0.7, 0.7)
c.setLineWidth(0.5)
c.rect(x, y, label_width, label_height, fill=False, stroke=True)
# Get attendee type name from mapping
attendee_type_id = attendee.get('attendee_type_id')
attendee_type_name = ''
if attendee_type_id and attendee_types_map:
type_info = attendee_types_map.get(attendee_type_id, {})
attendee_type_name = type_info.get('name', '') or ''
# Draw black bar at top with attendee type in white
if attendee_type_name:
bar_height = 8 * mm
c.setFillColorRGB(0, 0, 0)
c.rect(x, y + label_height - bar_height, label_width, bar_height, fill=True, stroke=False)
c.setFillColorRGB(1, 1, 1)
c.setFont("Helvetica-Bold", 16)
c.drawCentredString(x + label_width / 2, y + label_height - bar_height + 1.5 * mm, attendee_type_name[:20])
# Draw attendee name (20pt)
first_name = attendee.get('first_name', '') or ''
last_name = attendee.get('last_name', '') or ''
full_name = f"{first_name} {last_name}".strip()
font_size = 20
max_name_width = label_width - 2 * mm
name_width = c.stringWidth(full_name, "Helvetica-Bold", font_size)
if name_width > max_name_width:
# Scale down to fit
font_size = int(font_size * max_name_width / name_width)
c.setFont("Helvetica-Bold", font_size)
c.setFillColorRGB(0, 0, 0)
name_y = y + label_height - 22 * mm
c.drawCentredString(x + label_width / 2, name_y, full_name)
# Draw organization
org = attendee.get('organisation', '') or ''
c.setFont("Helvetica", 14)
org_y = name_y - 8 * mm
if org:
c.drawCentredString(x + label_width / 2, org_y, org[:22])
# Draw role
role = attendee.get('role', '') or ''
c.setFont("Helvetica", 14)
if role:
role_y = org_y - 7 * mm
c.drawCentredString(x + label_width / 2, role_y, role[:22])
# Generate QR code at bottom-right
attendee_code = attendee.get('attendee_code', '')
if attendee_code:
qr_size = 12 * mm
qr_x = x + label_width - qr_size - 2 * mm
qr_y = y + 10 * mm
qr = qrcode.QRCode(version=1, box_size=2, border=0)
qr.add_data(attendee_code)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
qr_buffer = io.BytesIO()
qr_img.save(qr_buffer, format='PNG')
qr_buffer.seek(0)
from reportlab.lib.utils import ImageReader
qr_reader = ImageReader(qr_buffer)
c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size)
# Draw black bar at bottom with event name and start date
bar_height = 8 * mm
c.setFillColorRGB(0, 0, 0)
c.rect(x, y, label_width, bar_height, fill=True, stroke=False)
c.setFillColorRGB(1, 1, 1)
c.setFont("Helvetica-Bold", 14)
event_name = (event.get('name', '') or '') if event else ''
start_time = event.get('start_time', '') if event else ''
event_date_str = ''
if start_time:
if hasattr(start_time, 'strftime'):
event_date_str = start_time.strftime('%d-%m-%Y')
else:
event_date_str = str(start_time)[:10]
bar_text = f"{event_name}{event_date_str}" if event_date_str else event_name
c.drawCentredString(x + label_width / 2, y + bar_height / 2 - 1.5 * mm, bar_text)
c.save()
buffer.seek(0)
return buffer
def send_badge_email(attendee_email, attendee_name, event_name, pdf_buffer):
"""Send email with badge PDF attachment."""
subject = f"Your Badge for {event_name}"
body = f"""Hello {attendee_name},
Please find attached your badge for {event_name}.
The badge is designed to be folded once horizontally and once vertically.
Your information appears in the lower-left quarter after folding.
We look forward to seeing you there!
Best regards,
The Event Team
"""
msg = MIMEMultipart()
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = attendee_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# Attach PDF
pdf_buffer.seek(0)
part = MIMEBase('application', 'octet-stream')
part.set_payload(pdf_buffer.read())
encode_base64(part)
part.add_header('Content-Disposition', 'attachment', filename=f'badge_{attendee_name.replace(" ", "_")}.pdf')
msg.attach(part)
try:
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
if Config.MAIL_USE_TLS:
server.starttls()
if Config.MAIL_USERNAME:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
# Routes - Main
@app.route('/')
def index():
"""Home page."""
return render_template('index.html')
@app.route('/staff/<staff_code>')
def staff_login(staff_code):
"""Staff/organizer login via staff_code (no password required)."""
client_ip = request.remote_addr
# Check if IP is blocked (reuse login rate limiting)
if is_ip_blocked(client_ip):
flash('Too many failed login attempts. Please try again later (blocked for 1 hour).')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# First check if it's an organizer
cursor.execute("SELECT * FROM organizers WHERE staff_code = %s", (staff_code,))
organizer = cursor.fetchone()
if organizer:
cursor.close()
conn.close()
clear_failed_logins(client_ip)
# Log in the organizer
session['user_id'] = organizer['id']
session['user_type'] = 'organizer'
session['event_id'] = None
flash(f'Welcome back, {organizer["name"]}!')
return redirect(url_for('organizer_dashboard'))
# Check if it's a staff member
cursor.execute("SELECT * FROM staff WHERE staff_code = %s", (staff_code,))
staff_member = cursor.fetchone()
cursor.close()
conn.close()
if not staff_member:
record_failed_login(client_ip)
flash('Invalid staff link.')
return redirect(url_for('index'))
# Clear failed login attempts on successful staff code login
clear_failed_logins(client_ip)
# Log in the staff member
session['user_id'] = staff_member['id']
session['user_type'] = 'staff'
session['event_id'] = staff_member['event_id']
flash(f'Welcome back, {staff_member["first_name"]}!')
return redirect(url_for('staff_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
# Routes - Auth
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page - supports both organizer and attendee login."""
client_ip = request.remote_addr
# Check if IP is blocked
if is_ip_blocked(client_ip):
flash('Too many failed login attempts. Please try again later (blocked for 1 hour).')
return render_template('auth/login.html')
if request.method == 'POST':
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
user_type = request.form.get('user_type', 'attendee')
if not email or not password:
flash('Email and password are required.')
return render_template('auth/login.html')
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if user_type == 'organizer':
cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,))
elif user_type == 'breakout_organizer':
cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,))
elif user_type == 'staff':
cursor.execute("SELECT * FROM staff WHERE email = %s", (email,))
else:
cursor.execute("SELECT * FROM attendees WHERE email = %s", (email,))
user = cursor.fetchone()
cursor.close()
conn.close()
if user and verify_password(password, user['password_hash']):
clear_failed_logins(client_ip)
session['user_id'] = user['id']
session['user_type'] = user_type
session['event_id'] = user.get('event_id') if user_type in ['attendee', 'staff'] else None
if user_type == 'organizer':
flash(f'Welcome back, {user["name"]}!')
return redirect(url_for('organizer_dashboard'))
elif user_type == 'breakout_organizer':
flash(f'Welcome back, {user["name"]}!')
return redirect(url_for('presenter_dashboard'))
elif user_type == 'staff':
flash(f'Welcome back, {user["first_name"]}!')
return redirect(url_for('staff_dashboard'))
else:
flash(f'Welcome back, {user["first_name"]}!')
return redirect(url_for('attendee_dashboard'))
record_failed_login(client_ip)
flash('Invalid email or password.')
except Error as e:
flash(f'Database error: {e}')
# Handle pre-selection of user type from query string
default_type = request.args.get('type', 'attendee')
return render_template('auth/login.html', default_type=default_type)
@app.route('/presenter/dashboard')
@login_required
def presenter_dashboard():
"""Presenter (break-out organiser) dashboard."""
if session.get('user_type') != 'breakout_organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get sessions where this organizer is assigned as presenter
cursor.execute("""
SELECT bs.*, e.name as event_name
FROM breakout_sessions bs
JOIN breakout_session_organizers bso ON bs.id = bso.breakout_session_id
JOIN events e ON bs.event_id = e.id
WHERE bso.organizer_id = %s
ORDER BY bs.start_time
""", (session['user_id'],))
sessions = cursor.fetchall()
cursor.execute("SELECT name FROM organizers WHERE id = %s", (session['user_id'],))
organizer = cursor.fetchone()
cursor.close()
conn.close()
return render_template('presenter/dashboard.html',
sessions=sessions,
presenter_name=organizer['name'])
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/staff/dashboard')
@login_required
def staff_dashboard():
"""Staff dashboard - list events for staff member."""
if session.get('user_type') != 'staff':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],))
staff_member = cursor.fetchone()
cursor.execute("SELECT * FROM events WHERE id IN (SELECT event_id FROM staff WHERE id = %s) ORDER BY start_time", (session['user_id'],))
events = cursor.fetchall()
cursor.close()
conn.close()
return render_template('staff/staff_events.html', events=events, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}")
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/staff/<int:event_id>/dashboard')
@login_required
def staff_event_dashboard(event_id):
"""Staff dashboard for a specific event - only accessible by staff members assigned to this event."""
if session.get('user_type') != 'staff':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Verify staff is assigned to this event
cursor.execute("SELECT 1 FROM staff WHERE id = %s AND event_id = %s", (session['user_id'], event_id))
if not cursor.fetchone():
flash('Access denied.')
cursor.close()
conn.close()
return redirect(url_for('staff_dashboard'))
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('staff_dashboard'))
cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],))
staff_member = cursor.fetchone()
cursor.close()
conn.close()
return render_template('staff/dashboard.html', event=event, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}")
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('staff_dashboard'))
@app.route('/register/organizer', methods=['GET', 'POST'])
def register_organizer():
"""Register a new organizer."""
if request.method == 'POST':
name = request.form.get('name', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not all([name, email, password]):
flash('All fields are required.')
return render_template('register.html', user_type='organizer')
if password != confirm_password:
flash('Passwords do not match.')
return render_template('register.html', user_type='organizer')
if len(password) < 6:
flash('Password must be at least 6 characters.')
return render_template('register.html', user_type='organizer')
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if email exists
cursor.execute("SELECT id FROM organizers WHERE email = %s", (email,))
if cursor.fetchone():
flash('Email already registered.')
cursor.close()
conn.close()
return render_template('register.html', user_type='organizer')
# Create organizer
password_hash = hash_password(password)
staff_code = generate_staff_code()
cursor.execute(
"INSERT INTO organizers (email, password_hash, name, staff_code) VALUES (%s, %s, %s, %s)",
(email, password_hash, name, staff_code)
)
conn.commit()
cursor.close()
conn.close()
flash('Registration successful! Please log in.')
return redirect(url_for('login'))
except Error as e:
flash(f'Database error: {e}')
return render_template('register.html', user_type='organizer')
@app.route('/register/attendee/<code>', methods=['GET', 'POST'])
def register_attendee(code):
"""Register a new attendee for an event using event code."""
# Verify event exists
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
event = cursor.fetchone()
cursor.close()
conn.close()
if not event:
flash('Event not found.')
return redirect(url_for('index'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
if request.method == 'POST':
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
organisation = request.form.get('organisation', '').strip()
role = request.form.get('role', '').strip()
introduction = request.form.get('introduction', '').strip()
if not all([first_name, last_name, email, password]):
flash('First name, last name, email, and password are required.')
return render_template('register.html', user_type='attendee', event=event)
if password != confirm_password:
flash('Passwords do not match.')
return render_template('register.html', user_type='attendee', event=event)
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if email already registered for this event
cursor.execute(
"SELECT id FROM attendees WHERE email = %s AND event_id = %s",
(email, event['id'])
)
if cursor.fetchone():
flash('Email already registered for this event.')
cursor.close()
conn.close()
return render_template('register.html', user_type='attendee', event=event)
password_hash = hash_password(password)
attendee_code = generate_attendee_code()
cursor.execute("""
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (event['id'], email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code))
conn.commit()
cursor.close()
conn.close()
flash('Registration successful! Please log in.')
return redirect(url_for('login'))
except Error as e:
flash(f'Database error: {e}')
return render_template('register.html', user_type='attendee', event=event)
@app.route('/logout')
def logout():
"""Log out current user."""
session.clear()
flash('You have been logged out.')
return redirect(url_for('index'))
# Routes - Organizer
@app.route('/organizer/dashboard')
@login_required
def organizer_dashboard():
"""Organizer dashboard."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],))
events = cursor.fetchall()
# Get attendee counts, breakout sessions, and staff for each event
for event in events:
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s", (event['id'],))
event['attendee_count'] = cursor.fetchone()['count']
cursor.execute("""
SELECT bs.id, bs.name, bs.max_attendees,
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (event['id'],))
event['breakout_sessions'] = cursor.fetchall()
cursor.execute("SELECT id, first_name, last_name, email FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
event['staff'] = cursor.fetchall()
cursor.execute("SELECT id, first_name, last_name, email, checked_in FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
event['attendees'] = cursor.fetchall()
cursor.close()
conn.close()
return render_template('organizer/dashboard.html', events=events)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/organizer/attendee-types')
@login_required
def all_attendee_types():
"""Show all attendee types across all events."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get all events for this organizer
cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],))
events = cursor.fetchall()
# Get attendee types for each event
for event in events:
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],))
event['attendee_types'] = cursor.fetchall()
# Get attendee count per type
for at in event['attendee_types']:
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s AND attendee_type_id = %s",
(event['id'], at['id']))
at['attendee_count'] = cursor.fetchone()['count']
cursor.close()
conn.close()
return render_template('organizer/all_attendee_types.html', events=events)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/organizer/event/create', methods=['GET', 'POST'])
@login_required
def create_event():
"""Create a new event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
if request.method == 'POST':
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip()
start_time = request.form.get('start_time', '')
end_time = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
max_attendees = request.form.get('max_attendees', '')
if not all([name, start_time, location]):
flash('Name, start time, and location are required.')
return render_template('organizer/create_event.html')
try:
event_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
except ValueError:
flash('Invalid start time format.')
return render_template('organizer/create_event.html')
event_end = None
if end_time:
try:
event_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
except ValueError:
flash('Invalid end time format.')
return render_template('organizer/create_event.html')
try:
conn = get_db_connection()
cursor = conn.cursor()
event_code = generate_event_code()
cursor.execute("""
INSERT INTO events (organizer_id, code, name, description, start_time, end_time, location, max_attendees)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (session['user_id'], event_code, name, description, event_start, event_end, location, max_attendees if max_attendees else None))
conn.commit()
cursor.close()
conn.close()
flash('Event created successfully!')
return redirect(url_for('organizer_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return render_template('organizer/create_event.html')
@app.route('/organizer/event/<int:event_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_event(event_id):
"""Edit an existing event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
if request.method == 'POST':
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip()
location = request.form.get('location', '').strip()
max_attendees = request.form.get('max_attendees', '')
if not all([name, location]):
flash('Name and location are required.')
cursor.close()
conn.close()
return render_template('organizer/edit_event.html', event=event)
cursor.execute("""
UPDATE events
SET name = %s, description = %s, location = %s, max_attendees = %s
WHERE id = %s
""", (name, description, location, max_attendees if max_attendees else None, event_id))
conn.commit()
# Capture changes for the confirmation pop-up
changes = []
old_event = event
if old_event['name'] != name:
changes.append(('Name', old_event['name'], name))
if (old_event['description'] or '') != description:
changes.append(('Description', old_event['description'] or '', description))
if old_event['location'] != location:
changes.append(('Location', old_event['location'], location))
if str(old_event['max_attendees'] or '') != max_attendees:
changes.append(('Max Attendees', old_event['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited'))
cursor.close()
conn.close()
if changes:
# Get updated event
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
updated_event = cursor.fetchone()
cursor.close()
conn.close()
return render_template('organizer/edit_event_confirm.html', event=updated_event, changes=changes)
else:
flash('Event updated successfully!')
return redirect(url_for('event_detail', code=event['code']))
cursor.close()
conn.close()
return render_template('organizer/edit_event.html', event=event)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/notify', methods=['POST'])
@login_required
def notify_event_attendees(event_id):
"""Send email notification to event attendees about changes."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
# Get attendees for this event
cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event_id,))
attendees = cursor.fetchall()
cursor.close()
conn.close()
# Get changes text from form
changes_text = request.form.get('changes_text', 'Event details have been updated.')
# Send emails
sent_count = 0
failed_count = 0
for attendee in attendees:
full_name = f"{attendee['first_name']} {attendee['last_name']}"
if send_event_update_email(attendee['email'], full_name, event['name'], changes_text):
sent_count += 1
else:
failed_count += 1
if failed_count == 0:
flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.')
else:
flash(f'Notifications sent: {sent_count}, failed: {failed_count}.')
return redirect(url_for('event_detail', code=event['code']))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/breakout-sessions')
@login_required
def list_breakout_sessions(event_id):
"""List breakout sessions for an event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("""
SELECT bs.*,
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (event_id,))
sessions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('organizer/breakout_sessions.html', event=event, sessions=sessions)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/breakout-session/create', methods=['GET', 'POST'])
@login_required
def create_breakout_session(event_id):
"""Create a new breakout session."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
cursor.close()
conn.close()
if not event:
flash('Event not found.')
return redirect(url_for('organizer_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
# Check if event is single-day
is_single_day = False
prefilled_date = ''
if event.get('start_time') and event.get('end_time'):
start = event['start_time']
end = event['end_time']
if hasattr(start, 'date') and hasattr(end, 'date'):
is_single_day = start.date() == end.date()
if is_single_day:
prefilled_date = start.strftime('%Y-%m-%d')
if request.method == 'POST':
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip()
start_time = request.form.get('start_time', '')
end_time = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
max_attendees = request.form.get('max_attendees', '')
if not all([name, start_time, end_time, location]):
flash('Name, start time, end time, and location are required.')
return render_template('organizer/create_breakout_session.html', event=event)
try:
session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
except ValueError:
flash('Invalid time format.')
return render_template('organizer/create_breakout_session.html', event=event)
if session_end <= session_start:
flash('End time must be after start time.')
return render_template('organizer/create_breakout_session.html', event=event)
if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']:
flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).')
return render_template('organizer/create_breakout_session.html', event=event)
try:
conn = get_db_connection()
cursor = conn.cursor()
session_code = generate_session_code()
cursor.execute("""
INSERT INTO breakout_sessions (code, event_id, name, description, start_time, end_time, location, max_attendees)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (session_code, event_id, name, description, session_start, session_end, location, max_attendees if max_attendees else None))
breakout_session_id = cursor.lastrowid
# Add event organizer as breakout session organizer
cursor.execute("""
INSERT INTO breakout_session_organizers (breakout_session_id, organizer_id)
VALUES (%s, %s)
""", (breakout_session_id, session['user_id']))
conn.commit()
cursor.close()
conn.close()
flash('Breakout session created successfully!')
return redirect(url_for('list_breakout_sessions', event_id=event_id))
except Error as e:
flash(f'Database error: {e}')
return render_template('organizer/create_breakout_session.html', event=event, prefilled_date=prefilled_date)
@app.route('/organizer/breakout-session/<int:session_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_breakout_session(session_id):
"""Edit a breakout session."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get breakout session and verify organizer has access
cursor.execute("""
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id, e.max_attendees as event_max_attendees
FROM breakout_sessions bs
JOIN events e ON bs.event_id = e.id
WHERE bs.id = %s
""", (session_id,))
breakout_session = cursor.fetchone()
if not breakout_session:
flash('Breakout session not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
if breakout_session['event_organizer_id'] != session['user_id']:
flash('Access denied.')
cursor.close()
conn.close()
return redirect(url_for('index'))
event = {'id': breakout_session['event_id'], 'name': breakout_session['event_name'], 'max_attendees': breakout_session['event_max_attendees']}
if request.method == 'POST':
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip()
start_time = request.form.get('start_time', '')
end_time = request.form.get('end_time', '')
location = request.form.get('location', '').strip()
max_attendees = request.form.get('max_attendees', '')
if not all([name, start_time, end_time, location]):
flash('Name, start time, end time, and location are required.')
cursor.close()
conn.close()
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
try:
session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
except ValueError:
flash('Invalid time format.')
cursor.close()
conn.close()
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
if session_end <= session_start:
flash('End time must be after start time.')
cursor.close()
conn.close()
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']:
flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).')
cursor.close()
conn.close()
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
cursor.execute("""
UPDATE breakout_sessions
SET name = %s, description = %s, start_time = %s, end_time = %s, location = %s, max_attendees = %s
WHERE id = %s
""", (name, description, session_start, session_end, location, max_attendees if max_attendees else None, session_id))
conn.commit()
# Capture changes for the confirmation pop-up
changes = []
old_session = breakout_session
if old_session['name'] != name:
changes.append(('Name', old_session['name'], name))
if old_session['description'] != description:
changes.append(('Description', old_session['description'] or '', description))
if old_session['start_time'].strftime('%Y-%m-%dT%H:%M') != start_time:
changes.append(('Start Time', old_session['start_time'].strftime('%B %d, %Y at %H:%M'), session_start.strftime('%B %d, %Y at %H:%M')))
if old_session['end_time'].strftime('%Y-%m-%dT%H:%M') != end_time:
changes.append(('End Time', old_session['end_time'].strftime('%B %d, %Y at %H:%M'), session_end.strftime('%B %d, %Y at %H:%M')))
if old_session['location'] != location:
changes.append(('Location', old_session['location'], location))
if str(old_session['max_attendees'] or '') != max_attendees:
changes.append(('Max Attendees', old_session['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited'))
# Get updated session data
cursor.execute("""
SELECT bs.*, e.name as event_name
FROM breakout_sessions bs
JOIN events e ON bs.event_id = e.id
WHERE bs.id = %s
""", (session_id,))
updated_session = cursor.fetchone()
cursor.close()
conn.close()
if changes:
return render_template('organizer/edit_breakout_session_confirm.html',
session=updated_session, event=event, changes=changes)
else:
flash('Breakout session updated successfully!')
return redirect(url_for('view_breakout_session', code=breakout_session['code']))
cursor.close()
conn.close()
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/breakout-session/<int:session_id>/notify', methods=['POST'])
@login_required
def notify_breakout_session_attendees(session_id):
"""Send email notification to breakout session attendees about changes."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get breakout session and verify organizer has access
cursor.execute("""
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id
FROM breakout_sessions bs
JOIN events e ON bs.event_id = e.id
WHERE bs.id = %s
""", (session_id,))
breakout_session = cursor.fetchone()
if not breakout_session:
flash('Breakout session not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
if breakout_session['event_organizer_id'] != session['user_id']:
flash('Access denied.')
cursor.close()
conn.close()
return redirect(url_for('index'))
# Get attendees who RSVPed to this session
cursor.execute("""
SELECT a.email, a.first_name, a.last_name
FROM breakout_session_rsvps bsr
JOIN attendees a ON bsr.attendee_id = a.id
WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered'
""", (session_id,))
rsvps = cursor.fetchall()
cursor.close()
conn.close()
# Get changes text from form
changes_text = request.form.get('changes_text', 'Session details have been updated.')
# Send emails
sent_count = 0
failed_count = 0
for rsvp in rsvps:
full_name = f"{rsvp['first_name']} {rsvp['last_name']}"
if send_breakout_session_update_email(rsvp['email'], full_name, breakout_session['name'],
breakout_session['event_name'], changes_text):
sent_count += 1
else:
failed_count += 1
if failed_count == 0:
flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.')
else:
flash(f'Notifications sent: {sent_count}, failed: {failed_count}.')
return redirect(url_for('view_breakout_session', code=breakout_session['code']))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/breakout-session/<code>')
@login_required
def view_breakout_session(code):
"""Breakout session detail page for organizers."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get breakout session and verify organizer has access
cursor.execute("""
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id
FROM breakout_sessions bs
JOIN events e ON bs.event_id = e.id
WHERE bs.code = %s
""", (code,))
session_data = cursor.fetchone()
if not session_data:
flash('Breakout session not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
# Check if user is event organizer or breakout session organizer
cursor.execute("""
SELECT id FROM breakout_session_organizers
WHERE breakout_session_id = %s AND organizer_id = %s
""", (session_data['id'], session['user_id']))
is_organizer = cursor.fetchone()
if session_data['event_organizer_id'] != session['user_id'] and not is_organizer:
flash('Access denied.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
# Get RSVPs
cursor.execute("""
SELECT bsr.*, a.first_name, a.last_name, a.email, a.organisation, a.role
FROM breakout_session_rsvps bsr
JOIN attendees a ON bsr.attendee_id = a.id
WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered'
ORDER BY a.last_name, a.first_name
""", (session_data['id'],))
rsvps = cursor.fetchall()
cursor.close()
conn.close()
return render_template('organizer/breakout_session_detail.html', session=session_data, rsvps=rsvps)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/attendee/breakout-sessions')
@login_required
def attendee_breakout_sessions():
"""List breakout sessions for attendee's event."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
event = cursor.fetchone()
cursor.execute("""
SELECT bs.*,
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count,
(SELECT status FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (session['user_id'], session['event_id']))
sessions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('attendee/breakout_sessions.html', event=event, sessions=sessions)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/attendee/breakout-session/<code>/rsvp', methods=['POST'])
@login_required
def rsvp_breakout_session(code):
"""RSVP for a breakout session."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get breakout session
cursor.execute("SELECT * FROM breakout_sessions WHERE code = %s", (code,))
breakout = cursor.fetchone()
cursor.close()
if not breakout:
conn.close()
return jsonify({'error': 'Breakout session not found'}), 404
session_id = breakout['id']
# Check capacity
if breakout['max_attendees']:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT COUNT(*) as count FROM breakout_session_rsvps
WHERE breakout_session_id = %s AND status = 'registered'
""", (session_id,))
current_count = cursor.fetchone()['count']
cursor.close()
if current_count >= breakout['max_attendees']:
conn.close()
return jsonify({'error': 'Breakout session is full'}), 400
# Check for overlapping sessions
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT bs.id, bs.name FROM breakout_sessions bs
JOIN breakout_session_rsvps bsr ON bs.id = bsr.breakout_session_id
WHERE bsr.attendee_id = %s
AND bsr.status = 'registered'
AND bs.id != %s
AND (
(bs.start_time <= %s AND bs.end_time > %s)
OR (bs.start_time < %s AND bs.end_time >= %s)
OR (bs.start_time >= %s AND bs.end_time <= %s)
)
""", (session['user_id'], session_id,
breakout['start_time'], breakout['start_time'],
breakout['end_time'], breakout['end_time'],
breakout['start_time'], breakout['end_time']))
overlapping = cursor.fetchall()
cursor.close()
if overlapping:
names = ', '.join([s['name'] for s in overlapping])
conn.close()
return jsonify({'error': f'Overlaps with: {names}'}), 400
# Check if already registered
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, status FROM breakout_session_rsvps
WHERE breakout_session_id = %s AND attendee_id = %s
""", (session_id, session['user_id']))
existing = cursor.fetchone()
cursor.close()
if existing:
if existing['status'] == 'registered':
conn.close()
return jsonify({'error': 'Already registered'}), 400
else:
# Reactivate cancelled RSVP
cursor = conn.cursor()
cursor.execute("""
UPDATE breakout_session_rsvps SET status = 'registered'
WHERE id = %s
""", (existing['id'],))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'message': 'RSVP restored'})
else:
# Create new RSVP
cursor = conn.cursor()
cursor.execute("""
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
VALUES (%s, %s, 'registered')
""", (session_id, session['user_id']))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/attendee/breakout-session/<code>/cancel-rsvp', methods=['POST'])
@login_required
def cancel_breakout_session_rsvp(code):
"""Cancel RSVP for a breakout session."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,))
breakout = cursor.fetchone()
cursor.close()
if not breakout:
conn.close()
return jsonify({'error': 'Breakout session not found'}), 404
cursor = conn.cursor()
cursor.execute("""
UPDATE breakout_session_rsvps SET status = 'cancelled'
WHERE breakout_session_id = %s AND attendee_id = %s AND status = 'registered'
""", (breakout['id'], session['user_id']))
affected = cursor.rowcount
conn.commit()
cursor.close()
conn.close()
if affected > 0:
return jsonify({'success': True})
else:
return jsonify({'error': 'RSVP not found'}), 404
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/attendee/event/attendance', methods=['GET', 'POST'])
@login_required
def update_event_attendance():
"""Update attendance status for main event."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
if request.method == 'GET':
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT attendance_status FROM attendees WHERE id = %s", (session['user_id'],))
result = cursor.fetchone()
cursor.close()
conn.close()
return jsonify({'status': result['attendance_status'] if result else 'attending'})
except Error as e:
return jsonify({'error': str(e)}), 500
status = request.json.get('status') if request.json else request.form.get('status')
if status not in ['attending', 'not_attending']:
return jsonify({'error': 'Invalid status'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE attendees SET attendance_status = %s WHERE id = %s
""", (status, session['user_id']))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/organizer/event/<code>')
@login_required
def event_detail(code):
"""Event detail page for organizer."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE code = %s AND organizer_id = %s", (code, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event['id'],))
attendees = cursor.fetchall()
cursor.execute("""
SELECT bs.*,
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as registered_count
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (event['id'],))
breakout_sessions = cursor.fetchall()
cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
staff = cursor.fetchall()
# Get attendee types for this event
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],))
attendee_types = cursor.fetchall()
# Build a mapping of attendee type id to type name for attendees
attendee_type_map = {at['id']: at for at in attendee_types}
for attendee in attendees:
attendee['attendee_type_name'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('name') if attendee.get('attendee_type_id') else None
attendee['attendee_type_price'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('price') if attendee.get('attendee_type_id') else None
# Get other events by this organizer for batch staff add
cursor.execute("SELECT id, name, code FROM events WHERE organizer_id = %s AND id != %s ORDER BY name", (session['user_id'], event['id']))
other_events = cursor.fetchall()
cursor.close()
conn.close()
return render_template('organizer/event_detail.html', event=event, attendees=attendees, breakout_sessions=breakout_sessions, staff=staff, other_events=other_events, attendee_types=attendee_types)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/attendee-types', methods=['GET', 'POST'])
@login_required
def manage_attendee_types(event_id):
"""Manage attendee types for an event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event_id,))
attendee_types = cursor.fetchall()
if request.method == 'POST':
name = request.form.get('name', '').strip()
price = request.form.get('price', '').strip()
if not name:
flash('Type name is required.')
cursor.close()
conn.close()
return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types)
try:
price_value = float(price) if price else 0.00
except ValueError:
price_value = 0.00
type_code = generate_type_code()
cursor.execute("""
INSERT INTO attendee_types (event_id, code, name, price)
VALUES (%s, %s, %s, %s)
""", (event_id, type_code, name, price_value))
conn.commit()
flash(f'Attendee type "{name}" created successfully!')
cursor.close()
conn.close()
return redirect(url_for('manage_attendee_types', event_id=event_id))
cursor.close()
conn.close()
return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_attendee_type(event_id, type_id):
"""Edit an attendee type."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id))
attendee_type = cursor.fetchone()
if not attendee_type:
flash('Attendee type not found.')
cursor.close()
conn.close()
return redirect(url_for('manage_attendee_types', event_id=event_id))
# Check how many attendees are assigned to this type
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE attendee_type_id = %s", (type_id,))
attendee_count = cursor.fetchone()['count']
if request.method == 'POST':
name = request.form.get('name', '').strip()
price = request.form.get('price', '').strip()
if not name:
flash('Type name is required.')
cursor.close()
conn.close()
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type)
try:
price_value = float(price) if price else 0.00
except ValueError:
price_value = 0.00
cursor.execute("""
UPDATE attendee_types SET name = %s, price = %s WHERE id = %s
""", (name, price_value, type_id))
conn.commit()
flash(f'Attendee type "{name}" updated successfully!')
cursor.close()
conn.close()
return redirect(url_for('manage_attendee_types', event_id=event_id))
cursor.close()
conn.close()
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type, attendee_count=attendee_count)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/delete', methods=['POST'])
@login_required
def delete_attendee_type(event_id, type_id):
"""Delete an attendee type."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id))
attendee_type = cursor.fetchone()
if not attendee_type:
flash('Attendee type not found.')
cursor.close()
conn.close()
return redirect(url_for('manage_attendee_types', event_id=event_id))
# Set attendee_type_id to NULL for all attendees with this type
cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE attendee_type_id = %s", (type_id,))
# Delete the attendee type
cursor.execute("DELETE FROM attendee_types WHERE id = %s", (type_id,))
conn.commit()
flash(f'Attendee type "{attendee_type["name"]}" deleted successfully!')
cursor.close()
conn.close()
return redirect(url_for('manage_attendee_types', event_id=event_id))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/attendees/batch-assign-type', methods=['POST'])
@login_required
def batch_assign_attendee_type(event_id):
"""Batch assign attendee type to selected attendees."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
attendee_ids = request.form.getlist('attendee_ids')
attendee_type_id = request.form.get('attendee_type_id', '').strip()
if not attendee_ids:
flash('No attendees selected.')
cursor.close()
conn.close()
return redirect(url_for('event_detail', code=event['code']))
if attendee_type_id:
# Validate attendee type exists for this event
cursor.execute("SELECT id FROM attendee_types WHERE id = %s AND event_id = %s", (attendee_type_id, event_id))
if not cursor.fetchone():
flash('Invalid attendee type.')
cursor.close()
conn.close()
return redirect(url_for('event_detail', code=event['code']))
for attendee_id in attendee_ids:
cursor.execute("UPDATE attendees SET attendee_type_id = %s WHERE id = %s AND event_id = %s",
(attendee_type_id, attendee_id, event_id))
conn.commit()
flash(f'Successfully assigned type to {len(attendee_ids)} attendee(s).')
else:
# Remove type from selected attendees
for attendee_id in attendee_ids:
cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE id = %s AND event_id = %s",
(attendee_id, event_id))
conn.commit()
flash(f'Successfully removed type from {len(attendee_ids)} attendee(s).')
cursor.close()
conn.close()
return redirect(url_for('event_detail', code=event['code']))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/staff', methods=['GET', 'POST'])
@login_required
def manage_event_staff(event_id):
"""Manage staff for an event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
staff_members = cursor.fetchall()
cursor.close()
conn.close()
if request.method == 'POST':
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
if not all([first_name, last_name, email]):
flash('First name, last name, and email are required.')
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Check if staff with this email already exists for this event
cursor.execute("SELECT id FROM staff WHERE event_id = %s AND email = %s", (event_id, email))
if cursor.fetchone():
flash('A staff member with this email already exists for this event.')
cursor.close()
conn.close()
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
invite_token = generate_invite_token()
staff_code = generate_staff_code()
cursor.execute("""
INSERT INTO staff (event_id, email, first_name, last_name, invite_token, staff_code)
VALUES (%s, %s, %s, %s, %s, %s)
""", (event_id, email, first_name, last_name, invite_token, staff_code))
conn.commit()
# Get organizer email for sending
cursor.execute("SELECT email FROM organizers WHERE id = %s", (session['user_id'],))
organizer_email = cursor.fetchone()['email']
cursor.close()
conn.close()
# Send invite email
full_name = f"{first_name} {last_name}"
if send_staff_invite_email(email, full_name, event['name'], invite_token, organizer_email):
flash(f'Staff member added! Invite email sent to {email}.')
else:
flash(f'Staff member added but failed to send invite email.')
return redirect(url_for('event_detail', code=event['code']))
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/staff/invite/<token>', methods=['GET', 'POST'])
def staff_invite(token):
"""Staff invite acceptance page."""
recaptcha_site_key = app.config.get('RECAPTCHA_SITE_KEY', '')
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM staff WHERE invite_token = %s AND invite_used = FALSE", (token,))
staff_member = cursor.fetchone()
if not staff_member:
flash('Invalid or expired invite link.')
cursor.close()
conn.close()
return redirect(url_for('index'))
cursor.execute("SELECT * FROM events WHERE id = %s", (staff_member['event_id'],))
event = cursor.fetchone()
cursor.close()
conn.close()
if request.method == 'POST':
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
recaptcha_response = request.form.get('g-recaptcha-response', '')
if recaptcha_site_key and not verify_recaptcha(recaptcha_response):
flash('Please complete the reCAPTCHA verification.')
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
if not password or len(password) < 6:
flash('Password must be at least 6 characters.')
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
if password != confirm_password:
flash('Passwords do not match.')
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
conn = get_db_connection()
cursor = conn.cursor()
password_hash = hash_password(password)
cursor.execute("""
UPDATE staff SET password_hash = %s, invite_used = TRUE, invite_token = NULL
WHERE id = %s
""", (password_hash, staff_member['id']))
conn.commit()
cursor.close()
conn.close()
flash('Registration complete! Please log in with your email and password.')
return redirect(url_for('login'))
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/organizer/event/<int:event_id>/staff/<int:staff_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_staff(event_id, staff_id):
"""Edit a staff member."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id))
staff_member = cursor.fetchone()
if not staff_member:
flash('Staff member not found.')
cursor.close()
conn.close()
return redirect(url_for('manage_event_staff', event_id=event_id))
if request.method == 'POST':
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
if not all([first_name, last_name, email]):
flash('First name, last name, and email are required.')
cursor.close()
conn.close()
return render_template('organizer/edit_staff.html', event=event, staff=staff_member)
cursor.execute("""
UPDATE staff SET first_name = %s, last_name = %s, email = %s
WHERE id = %s
""", (first_name, last_name, email, staff_id))
conn.commit()
cursor.close()
conn.close()
flash('Staff member updated successfully!')
return redirect(url_for('manage_event_staff', event_id=event_id))
cursor.close()
conn.close()
return render_template('organizer/edit_staff.html', event=event, staff=staff_member)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/staff/<int:staff_id>/delete', methods=['POST'])
@login_required
def delete_staff(event_id, staff_id):
"""Delete a staff member."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id))
staff_member = cursor.fetchone()
if not staff_member:
flash('Staff member not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("DELETE FROM staff WHERE id = %s", (staff_id,))
conn.commit()
cursor.close()
conn.close()
flash('Staff member removed successfully!')
return redirect(url_for('organizer_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/batch-add-staff', methods=['POST'])
@login_required
def batch_add_staff(event_id):
"""Batch add staff from another event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
source_event_id = request.form.get('source_event_id')
if not source_event_id:
flash('Please select an event.')
return redirect(url_for('organizer_dashboard'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Verify target event belongs to organizer and get its code
cursor.execute("SELECT code FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
target_event = cursor.fetchone()
if not target_event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
# Verify source event belongs to same organizer
cursor.execute("SELECT id FROM events WHERE id = %s AND organizer_id = %s", (source_event_id, session['user_id']))
source_event = cursor.fetchone()
if not source_event:
flash('Source event not found.')
cursor.close()
conn.close()
return redirect(url_for('event_detail', code=target_event['code']))
# Get staff from source event
cursor.execute("SELECT first_name, last_name, email FROM staff WHERE event_id = %s", (source_event_id,))
source_staff = cursor.fetchall()
if not source_staff:
flash('No staff members found in the selected event.')
cursor.close()
conn.close()
return redirect(url_for('event_detail', code=target_event['code']))
# Get existing staff emails in target event to avoid duplicates
cursor.execute("SELECT email FROM staff WHERE event_id = %s", (event_id,))
existing_emails = set(row['email'].lower() for row in cursor.fetchall())
# Add staff that don't already exist
added_count = 0
for member in source_staff:
if member['email'].lower() not in existing_emails:
staff_code = generate_staff_code()
cursor.execute("""
INSERT INTO staff (event_id, first_name, last_name, email, invite_token, invite_used, created_at, staff_code)
VALUES (%s, %s, %s, %s, UUID(), FALSE, NOW(), %s)
""", (event_id, member['first_name'], member['last_name'], member['email'], staff_code))
existing_emails.add(member['email'].lower())
added_count += 1
conn.commit()
cursor.close()
conn.close()
flash(f'{added_count} staff member(s) added from other event.')
return redirect(url_for('event_detail', code=target_event['code']))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/breakout-session/<int:session_id>/delete', methods=['POST'])
@login_required
def delete_breakout_session(session_id):
"""Delete a breakout session."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT bs.*, e.organizer_id, e.code
FROM breakout_sessions bs
JOIN events e ON bs.event_id = e.id
WHERE bs.id = %s
""", (session_id,))
session_obj = cursor.fetchone()
if not session_obj or session_obj['organizer_id'] != session['user_id']:
flash('Session not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("DELETE FROM breakout_session_rsvps WHERE breakout_session_id = %s", (session_id,))
cursor.execute("DELETE FROM breakout_sessions WHERE id = %s", (session_id,))
conn.commit()
cursor.close()
conn.close()
flash('Breakout session deleted successfully!')
return redirect(url_for('organizer_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/attendee/<int:attendee_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_attendee(attendee_id):
"""Edit an attendee."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT a.*, e.organizer_id, e.code
FROM attendees a
JOIN events e ON a.event_id = e.id
WHERE a.id = %s
""", (attendee_id,))
attendee = cursor.fetchone()
if not attendee or attendee['organizer_id'] != session['user_id']:
flash('Attendee not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
# Get attendee types for this event
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (attendee['event_id'],))
attendee_types = cursor.fetchall()
if request.method == 'POST':
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
organisation = request.form.get('organisation', '').strip()
role = request.form.get('role', '').strip()
introduction = request.form.get('introduction', '').strip()
attendee_type_id = request.form.get('attendee_type_id', '').strip()
cursor.execute("""
UPDATE attendees
SET first_name = %s, last_name = %s, email = %s, organisation = %s, role = %s, introduction = %s, attendee_type_id = %s
WHERE id = %s
""", (first_name, last_name, email, organisation, role, introduction, attendee_type_id if attendee_type_id else None, attendee_id))
conn.commit()
cursor.close()
conn.close()
flash('Attendee updated successfully!')
return redirect(url_for('organizer_dashboard'))
cursor.close()
conn.close()
return render_template('organizer/edit_attendee.html', attendee=attendee, attendee_types=attendee_types)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/attendee/<int:attendee_id>/delete', methods=['POST'])
@login_required
def delete_attendee(attendee_id):
"""Delete an attendee."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT a.*, e.organizer_id
FROM attendees a
JOIN events e ON a.event_id = e.id
WHERE a.id = %s
""", (attendee_id,))
attendee = cursor.fetchone()
if not attendee or attendee['organizer_id'] != session['user_id']:
flash('Attendee not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("DELETE FROM attendees WHERE id = %s", (attendee_id,))
conn.commit()
cursor.close()
conn.close()
flash('Attendee deleted successfully!')
return redirect(url_for('organizer_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/badges')
@login_required
def event_badges(event_id):
"""Badge printing view for event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
attendees = cursor.fetchall()
cursor.close()
conn.close()
# Generate QR codes for each attendee
for attendee in attendees:
qr_data = f"NETEVENT:{event_id}:{attendee['id']}"
qr_img = qrcode.make(qr_data, box_size=4, border=1)
buffer = io.BytesIO()
qr_img.save(buffer, format='PNG')
buffer.seek(0)
attendee['qr_code'] = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
return render_template('organizer/badges.html', event=event, attendees=attendees)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/badges/rectangular/download')
@login_required
def download_rectangular_badges(event_id):
"""Download rectangular badge PDFs for event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
attendees = cursor.fetchall()
# Get attendee types for this event
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
attendee_types = cursor.fetchall()
attendee_types_map = {at['id']: at for at in attendee_types}
cursor.close()
conn.close()
if not attendees:
flash('No attendees found for this event.')
return redirect(url_for('event_badges', event_id=event_id))
# Generate rectangular badge PDF
pdf_buffer = generate_rectangular_badges_pdf(attendees, event, attendee_types_map)
filename = f"badges_rectangular_{event.get('code', event_id)}.pdf"
return send_file(
pdf_buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=filename
)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@app.route('/organizer/event/<int:event_id>/attendees/excel/download')
@login_required
def download_attendees_excel(event_id):
"""Download all attendees as Excel sheet."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
cursor.close()
conn.close()
flash('Event not found.')
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
attendees = cursor.fetchall()
# Get attendee types
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
attendee_types = cursor.fetchall()
attendee_types_map = {at['id']: at for at in attendee_types}
cursor.close()
conn.close()
if not attendees:
flash('No attendees found.')
return redirect(url_for('event_badges', event_id=event_id))
# Create Excel workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Attendees"
# Header styling
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill("solid", fgColor="333333")
header_alignment = Alignment(horizontal="center")
headers = ['ID', 'Voornaam', 'Achternaam', 'Email', 'Organisatie', 'Rol', 'Type', 'Ingecheckt', 'Aangemeld op']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
# Data rows
for row, attendee in enumerate(attendees, 2):
ws.cell(row=row, column=1, value=attendee.get('id'))
ws.cell(row=row, column=2, value=attendee.get('first_name', ''))
ws.cell(row=row, column=3, value=attendee.get('last_name', ''))
ws.cell(row=row, column=4, value=attendee.get('email', ''))
ws.cell(row=row, column=5, value=attendee.get('organisation', ''))
ws.cell(row=row, column=6, value=attendee.get('role', ''))
ws.cell(row=row, column=7, value=attendee_types_map.get(attendee.get('attendee_type_id'), {}).get('name', ''))
ws.cell(row=row, column=8, value='Ja' if attendee.get('checked_in') else 'Nee')
ws.cell(row=row, column=9, value=str(attendee.get('created_at', ''))[:19] if attendee.get('created_at') else '')
# Auto-fit column widths
for col in ws.columns:
max_length = 0
column_letter = col[0].column_letter
for cell in col:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
ws.column_dimensions[column_letter].width = min(max_length + 2, 30)
filename = f"attendees_{event.get('code', event_id)}.xlsx"
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
@login_required
def event_stats(event_id):
"""Attendance statistics for event."""
if session.get('user_type') != 'organizer':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
return redirect(url_for('organizer_dashboard'))
cursor.execute("SELECT COUNT(*) as total FROM attendees WHERE event_id = %s", (event_id,))
total = cursor.fetchone()['total']
cursor.execute("SELECT COUNT(*) as checked_in FROM attendees WHERE event_id = %s AND checked_in = TRUE", (event_id,))
checked_in = cursor.fetchone()['checked_in']
cursor.close()
conn.close()
return jsonify({
'total': total,
'checked_in': checked_in,
'not_checked_in': total - checked_in
})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/organizer/event/<int:event_id>/checkin/<int:attendee_id>', methods=['POST'])
@login_required
def checkin_attendee(event_id, attendee_id):
"""Check in an attendee."""
if session.get('user_type') != 'organizer':
return jsonify({'error': 'Access denied'}), 403
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if already checked in
cursor.execute(
"SELECT checked_in FROM attendees WHERE id = %s AND event_id = %s",
(attendee_id, event_id)
)
result = cursor.fetchone()
if not result:
cursor.close()
conn.close()
return jsonify({'error': 'Attendee not found'}), 404
already_checked_in = result[0]
cursor.execute(
"UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s",
(attendee_id, event_id)
)
conn.commit()
# Get attendee name for response
cursor.execute(
"SELECT first_name, last_name FROM attendees WHERE id = %s",
(attendee_id,)
)
attendee = cursor.fetchone()
cursor.close()
conn.close()
return jsonify({
'success': True,
'already_checked_in': bool(already_checked_in),
'attendee_name': f"{attendee[0]} {attendee[1]}" if attendee else 'Unknown'
})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/organizer/event/<int:event_id>/checkin/code', methods=['POST'])
@login_required
def checkin_by_code(event_id):
"""Check in an attendee by their unique attendee code."""
if session.get('user_type') not in ['organizer', 'staff']:
return jsonify({'error': 'Access denied'}), 403
data = request.get_json()
attendee_code = data.get('attendee_code') if data else None
if not attendee_code:
return jsonify({'error': 'Attendee code required'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Find attendee by code and event
cursor.execute(
"SELECT id, first_name, last_name, checked_in FROM attendees WHERE attendee_code = %s AND event_id = %s",
(attendee_code, event_id)
)
attendee = cursor.fetchone()
if not attendee:
cursor.close()
conn.close()
return jsonify({'error': 'Attendee not found'}), 404
already_checked_in = attendee['checked_in']
cursor.execute(
"UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s",
(attendee['id'], event_id)
)
conn.commit()
cursor.close()
conn.close()
return jsonify({
'success': True,
'already_checked_in': bool(already_checked_in),
'attendee_name': f"{attendee['first_name']} {attendee['last_name']}"
})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/organizer/event/<int:event_id>/scan')
@login_required
def event_scan(event_id):
"""QR code scanner page for check-in."""
if session.get('user_type') not in ['organizer', 'staff']:
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Organizers can access any event they own, staff can only access their event
if session.get('user_type') == 'organizer':
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
else:
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
# Verify staff belongs to this event
if session.get('event_id') != event_id:
flash('Access denied.')
cursor.close()
conn.close()
return redirect(url_for('index'))
event = cursor.fetchone()
if not event:
flash('Event not found.')
cursor.close()
conn.close()
if session.get('user_type') == 'organizer':
return redirect(url_for('organizer_dashboard'))
return redirect(url_for('staff_event_dashboard', event_id=session.get('event_id')))
cursor.execute("SELECT id, first_name, last_name, checked_in FROM attendees WHERE event_id = %s", (event_id,))
attendees = cursor.fetchall()
cursor.close()
conn.close()
if session.get('user_type') == 'staff':
return render_template('staff/scan.html', event=event, attendees=attendees)
return render_template('organizer/scan.html', event=event, attendees=attendees)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('organizer_dashboard'))
# Routes - Attendee
@app.route('/attendee/dashboard')
@login_required
def attendee_dashboard():
"""Attendee dashboard."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
event = cursor.fetchone()
if event:
# Get attendee info
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
attendee = cursor.fetchone()
# Get connections
cursor.execute("""
SELECT c.*, a.first_name, a.last_name, a.organisation, a.role
FROM connections c
JOIN attendees a ON c.connected_attendee_id = a.id
WHERE c.attendee_id = %s AND c.status = 'accepted'
""", (session['user_id'],))
connections = cursor.fetchall()
# Get pending connections
cursor.execute("""
SELECT c.*, a.first_name, a.last_name, a.organisation, a.role
FROM connections c
JOIN attendees a ON c.attendee_id = a.id
WHERE c.connected_attendee_id = %s AND c.status = 'pending'
""", (session['user_id'],))
pending_connections = cursor.fetchall()
# Get appointments
cursor.execute("""
SELECT ap.*, a.first_name as requester_first_name, a.last_name as requester_last_name,
at.first_name as target_first_name, at.last_name as target_last_name
FROM appointments ap
JOIN attendees a ON ap.requester_id = a.id
JOIN attendees at ON ap.target_id = at.id
WHERE ap.requester_id = %s OR ap.target_id = %s
ORDER BY ap.appointment_time
""", (session['user_id'], session['user_id']))
appointments = cursor.fetchall()
else:
event = None
attendee = None
connections = []
pending_connections = []
appointments = []
cursor.close()
conn.close()
return render_template('attendee/dashboard.html', event=event, attendee=attendee,
connections=connections, pending_connections=pending_connections,
appointments=appointments)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/attendee/badge/download')
@login_required
def download_badge():
"""Generate and download attendee badge PDF."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get attendee info
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
attendee = cursor.fetchone()
# Get event info
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
event = cursor.fetchone()
cursor.close()
conn.close()
if not attendee:
flash('Attendee not found.')
return redirect(url_for('attendee_dashboard'))
# Generate PDF
pdf_buffer = generate_badge_pdf(attendee, event)
return send_file(
pdf_buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=f'badge_{attendee["first_name"]}_{attendee["last_name"]}.pdf'
)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/attendee/badge/email', methods=['POST'])
@login_required
def email_badge():
"""Send badge PDF to attendee via email."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get attendee info
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
attendee = cursor.fetchone()
# Get event info
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
event = cursor.fetchone()
cursor.close()
conn.close()
if not attendee:
flash('Attendee not found.')
return redirect(url_for('attendee_dashboard'))
# Generate PDF
pdf_buffer = generate_badge_pdf(attendee, event)
# Send email
if send_badge_email(attendee['email'], f"{attendee['first_name']} {attendee['last_name']}",
event['name'] if event else 'Event', pdf_buffer):
flash('Badge sent to your email!')
else:
flash('Failed to send badge email. Please try again.')
return redirect(url_for('attendee_dashboard'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/attendee/profile', methods=['GET', 'POST'])
@login_required
def attendee_profile():
"""Attendee profile page."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
attendee = cursor.fetchone()
cursor.close()
conn.close()
if request.method == 'POST':
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
organisation = request.form.get('organisation', '').strip()
role = request.form.get('role', '').strip()
introduction = request.form.get('introduction', '').strip()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE attendees SET first_name = %s, last_name = %s, organisation = %s, role = %s, introduction = %s
WHERE id = %s
""", (first_name, last_name, organisation, role, introduction, session['user_id']))
conn.commit()
cursor.close()
conn.close()
flash('Profile updated successfully!')
return redirect(url_for('attendee_profile'))
return render_template('attendee/profile.html', attendee=attendee)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/attendee/photo', methods=['POST'])
@login_required
def upload_photo():
"""Upload profile photo."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
if 'photo' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['photo']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if file:
# Generate unique filename
filename = f"{uuid.uuid4()}.jpg"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# Delete old photo if exists
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT profile_picture FROM attendees WHERE id = %s", (session['user_id'],))
old_photo = cursor.fetchone()['profile_picture']
if old_photo:
old_path = os.path.join(app.config['UPLOAD_FOLDER'], old_photo)
if os.path.exists(old_path):
os.remove(old_path)
file.save(filepath)
cursor.execute("UPDATE attendees SET profile_picture = %s WHERE id = %s", (filename, session['user_id']))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'filename': filename})
return jsonify({'error': 'Invalid file type'}), 400
# Routes - Attendees list and connections
@app.route('/attendees')
@login_required
def list_attendees():
"""List other attendees at the same event."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT a.*,
(SELECT status FROM connections WHERE attendee_id = %s AND connected_attendee_id = a.id) as my_status
FROM attendees a
WHERE a.event_id = %s AND a.id != %s
""", (session['user_id'], session['event_id'], session['user_id']))
attendees = cursor.fetchall()
cursor.close()
conn.close()
return render_template('attendee/attendees.html', attendees=attendees)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/connections', methods=['POST'])
@login_required
def create_connection():
"""Send a connection request."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
connected_id = request.form.get('connected_attendee_id')
if not connected_id:
return jsonify({'error': 'Attendee ID required'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
# Check if connection already exists
cursor.execute("""
SELECT id FROM connections
WHERE attendee_id = %s AND connected_attendee_id = %s
""", (session['user_id'], connected_id))
if cursor.fetchone():
cursor.close()
conn.close()
return jsonify({'error': 'Connection already exists'}), 400
cursor.execute("""
INSERT INTO connections (attendee_id, connected_attendee_id, status)
VALUES (%s, %s, 'pending')
""", (session['user_id'], connected_id))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/connections/<int:connection_id>', methods=['PUT'])
@login_required
def update_connection(connection_id):
"""Accept or reject a connection request."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
status = request.json.get('status') if request.json else request.form.get('status')
if status not in ['accepted', 'rejected']:
return jsonify({'error': 'Invalid status'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
# Verify this connection is targeting the current user
cursor.execute("""
SELECT id FROM connections
WHERE id = %s AND connected_attendee_id = %s AND status = 'pending'
""", (connection_id, session['user_id']))
if not cursor.fetchone():
cursor.close()
conn.close()
return jsonify({'error': 'Connection not found'}), 404
cursor.execute("UPDATE connections SET status = %s WHERE id = %s", (status, connection_id))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/attendee/scan')
@login_required
def attendee_scan():
"""QR scanner page for attendees to scan each other."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
return render_template('attendee/scan.html')
@app.route('/attendee/scan-request', methods=['POST'])
@login_required
def scan_request_connection():
"""Send a connection request when scanning someone's QR code."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
scanned_id = request.json.get('scanned_id') if request.json else request.form.get('scanned_id')
if not scanned_id:
return jsonify({'error': 'Scanned attendee ID required'}), 400
scanned_id = int(scanned_id)
# Can't connect to yourself
if scanned_id == session['user_id']:
return jsonify({'error': 'Cannot connect to yourself'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Verify scanned attendee exists and is at same event
cursor.execute("""
SELECT id FROM attendees WHERE id = %s AND event_id = %s
""", (scanned_id, session['event_id']))
if not cursor.fetchone():
cursor.close()
conn.close()
return jsonify({'error': 'Attendee not found at this event'}), 404
# Check if connection already exists
cursor.execute("""
SELECT id, status FROM connections
WHERE attendee_id = %s AND connected_attendee_id = %s
""", (session['user_id'], scanned_id))
existing = cursor.fetchone()
if existing:
if existing['status'] == 'accepted':
cursor.close()
conn.close()
return jsonify({'error': 'Already connected'}), 400
elif existing['status'] == 'pending':
cursor.close()
conn.close()
return jsonify({'error': 'Request already pending'}), 400
else:
# Rejected - allow new request
cursor.execute("""
UPDATE connections SET status = 'pending', attendee_id = %s, connected_attendee_id = %s
WHERE id = %s
""", (session['user_id'], scanned_id, existing['id']))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'message': 'Request sent, awaiting approval'})
else:
# Create new connection request
cursor.execute("""
INSERT INTO connections (attendee_id, connected_attendee_id, status)
VALUES (%s, %s, 'pending')
""", (session['user_id'], scanned_id))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'message': 'Request sent, awaiting approval'})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/attendee/connection-requests')
@login_required
def connection_requests():
"""Show pending connection requests for the current user."""
if session.get('user_type') != 'attendee':
flash('Access denied.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Get pending requests where current user is the target
cursor.execute("""
SELECT c.*, a.first_name, a.last_name, a.email, a.organisation, a.role,
a.introduction, a.profile_picture
FROM connections c
JOIN attendees a ON c.attendee_id = a.id
WHERE c.connected_attendee_id = %s AND c.status = 'pending'
ORDER BY c.created_at DESC
""", (session['user_id'],))
requests = cursor.fetchall()
cursor.close()
conn.close()
return render_template('attendee/connection_requests.html', requests=requests)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('attendee_dashboard'))
@app.route('/attendee/connection-request/<int:connection_id>', methods=['POST'])
@login_required
def respond_to_connection(connection_id):
"""Approve or reject a connection request."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
action = request.json.get('action') if request.json else request.form.get('action')
if action not in ['approve', 'reject']:
return jsonify({'error': 'Invalid action'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Verify this request is for the current user
cursor.execute("""
SELECT c.*, a.first_name, a.last_name, a.profile_picture,
a.email, a.organisation, a.role, a.introduction
FROM connections c
JOIN attendees a ON c.attendee_id = a.id
WHERE c.id = %s AND c.connected_attendee_id = %s AND c.status = 'pending'
""", (connection_id, session['user_id']))
connection = cursor.fetchone()
if not connection:
cursor.close()
conn.close()
return jsonify({'error': 'Connection request not found'}), 404
if action == 'approve':
cursor.execute("UPDATE connections SET status = 'accepted' WHERE id = %s", (connection_id,))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'status': 'accepted'})
else:
cursor.execute("UPDATE connections SET status = 'rejected' WHERE id = %s", (connection_id,))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'status': 'rejected'})
except Error as e:
return jsonify({'error': str(e)}), 500
# Routes - Appointments
@app.route('/appointments', methods=['POST'])
@login_required
def create_appointment():
"""Request an appointment."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
target_id = request.form.get('target_id')
appointment_time = request.form.get('appointment_time')
location = request.form.get('location', '').strip()
notes = request.form.get('notes', '').strip()
if not all([target_id, appointment_time]):
return jsonify({'error': 'Target and time are required'}), 400
try:
apt_datetime = datetime.strptime(appointment_time, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({'error': 'Invalid datetime format'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO appointments (event_id, requester_id, target_id, appointment_time, location, notes, status)
VALUES (%s, %s, %s, %s, %s, %s, 'pending')
""", (session['event_id'], session['user_id'], target_id, apt_datetime, location, notes))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
@app.route('/appointments/<int:appointment_id>', methods=['PUT'])
@login_required
def update_appointment(appointment_id):
"""Accept or reject an appointment."""
if session.get('user_type') != 'attendee':
return jsonify({'error': 'Access denied'}), 403
status = request.json.get('status') if request.json else request.form.get('status')
if status not in ['accepted', 'rejected']:
return jsonify({'error': 'Invalid status'}), 400
try:
conn = get_db_connection()
cursor = conn.cursor()
# Verify user is the target of this appointment
cursor.execute("""
SELECT id FROM appointments
WHERE id = %s AND target_id = %s AND status = 'pending'
""", (appointment_id, session['user_id']))
if not cursor.fetchone():
cursor.close()
conn.close()
return jsonify({'error': 'Appointment not found'}), 404
cursor.execute("UPDATE appointments SET status = %s WHERE id = %s", (status, appointment_id))
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True})
except Error as e:
return jsonify({'error': str(e)}), 500
# Routes - Public event listing
@app.route('/events')
def list_events():
"""List all upcoming public events."""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT e.*, o.name as organizer_name,
(SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count
FROM events e
JOIN organizers o ON e.organizer_id = o.id
WHERE e.start_time >= NOW()
ORDER BY e.start_time ASC
""")
events = cursor.fetchall()
cursor.close()
conn.close()
return render_template('index.html', events=events, show_events=True)
except Error as e:
flash(f'Database error: {e}')
return render_template('index.html')
@app.route('/event/register/<code>', methods=['GET', 'POST'])
@app.route('/event/register/<code>/<type_code>', methods=['GET', 'POST'])
def register_event(code, type_code=None):
"""Registration page for an event with breakout session registration."""
# Look up attendee type if type_code is provided
preselected_type = None
if type_code:
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM attendee_types WHERE code = %s", (type_code,))
preselected_type = cursor.fetchone()
cursor.close()
conn.close()
if not preselected_type:
flash('Invalid attendee type.')
return redirect(url_for('index'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
event = cursor.fetchone()
cursor.close()
conn.close()
if not event:
flash('Event not found.')
return redirect(url_for('index'))
# Verify type_code belongs to this event
if preselected_type and preselected_type['event_id'] != event['id']:
flash('Invalid attendee type for this event.')
return redirect(url_for('index'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
# Get breakout sessions for the event
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT bs.*,
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (event['id'],))
sessions = cursor.fetchall()
cursor.close()
conn.close()
except Error as e:
sessions = []
# Check if user is already registered (for showing breakout sessions)
registered = False
user_id = session.get('user_id')
user_type = session.get('user_type')
if user_id and user_type == 'attendee' and session.get('event_id') == event['id']:
registered = True
# Add my_rsvp_status to sessions
for session_item in sessions:
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT status FROM breakout_session_rsvps
WHERE breakout_session_id = %s AND attendee_id = %s
""", (session_item['id'], user_id))
result = cursor.fetchone()
session_item['my_rsvp_status'] = result['status'] if result else None
cursor.close()
conn.close()
except Error:
session_item['my_rsvp_status'] = None
else:
for session_item in sessions:
session_item['my_rsvp_status'] = None
# Handle registration form submission
if request.method == 'POST' and not registered:
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
organisation = request.form.get('organisation', '').strip()
role = request.form.get('role', '').strip()
phone = request.form.get('phone', '').strip()
linkedin = request.form.get('linkedin', '').strip()
introduction = request.form.get('introduction', '').strip()
selected_sessions = request.form.getlist('breakout_sessions')
if not all([first_name, last_name, email, password]):
flash('First name, last name, email, and password are required.')
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type)
if password != confirm_password:
flash('Passwords do not match.')
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type)
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Check if email already registered for this event
cursor.execute(
"SELECT id FROM attendees WHERE email = %s AND event_id = %s",
(email, event['id'])
)
if cursor.fetchone():
flash('Email already registered for this event. Please log in.')
cursor.close()
conn.close()
return redirect(url_for('login'))
# Check if this type requires payment
if preselected_type and preselected_type.get('price') and preselected_type.get('price') > 0:
# Store registration data in session for payment processing
session['pending_registration'] = {
'event_id': event['id'],
'email': email,
'password': password,
'first_name': first_name,
'last_name': last_name,
'organisation': organisation,
'role': role,
'phone': phone,
'linkedin': linkedin,
'introduction': introduction,
'selected_sessions': selected_sessions,
'attendee_type_id': preselected_type['id'],
'type_name': preselected_type['name'],
'price': preselected_type['price']
}
cursor.close()
conn.close()
return redirect(url_for('payment_page', event_code=event['code']))
# Complete registration directly for free types
confirmation_token = uuid.uuid4().hex
password_hash = hash_password(password)
attendee_code = generate_attendee_code()
cursor.execute("""
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (event['id'], email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, preselected_type['id'] if preselected_type else None))
attendee_id = cursor.lastrowid
# Process breakout session selections
for session_id in selected_sessions:
cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s",
(session_id, event['id']))
session_data = cursor.fetchone()
if session_data:
if session_data['max_attendees']:
cursor.execute("""
SELECT COUNT(*) as count FROM breakout_session_rsvps
WHERE breakout_session_id = %s AND status = 'registered'
""", (session_id,))
count = cursor.fetchone()['count']
if count >= session_data['max_attendees']:
continue
cursor.execute("""
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
VALUES (%s, %s, 'registered')
""", (session_id, attendee_id))
conn.commit()
cursor.close()
conn.close()
event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced'
personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True)
send_attendee_confirmation_email(email, first_name, event['name'], event_date, event['location'], personal_page_url)
session['user_id'] = attendee_id
session['user_type'] = 'attendee'
session['event_id'] = event['id']
flash('Registration successful!')
return redirect(url_for('attendee_personal_page', token=confirmation_token))
except Error as e:
flash(f'Database error: {e}')
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=registered, preselected_type=preselected_type)
@app.route('/event/<code>/payment', methods=['GET', 'POST'])
def payment_page(code):
"""Payment page for paid attendee types."""
pending = session.get('pending_registration')
if not pending or pending.get('event_id') is None:
flash('No pending registration found.')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
event = cursor.fetchone()
cursor.close()
conn.close()
if not event:
flash('Event not found.')
return redirect(url_for('index'))
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
if request.method == 'POST':
# Process payment (simulated - in production, integrate with payment gateway like Stripe)
# For now, we just complete the registration
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Generate confirmation token
confirmation_token = uuid.uuid4().hex
password_hash = hash_password(pending['password'])
attendee_code = generate_attendee_code()
cursor.execute("""
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (pending['event_id'], pending['email'], password_hash, pending['first_name'], pending['last_name'],
pending['organisation'], pending['role'], pending['phone'], pending['linkedin'],
pending['introduction'], confirmation_token, attendee_code, pending['attendee_type_id']))
attendee_id = cursor.lastrowid
# Process breakout session selections
for session_id in pending.get('selected_sessions', []):
cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s",
(session_id, pending['event_id']))
session_data = cursor.fetchone()
if session_data:
if session_data['max_attendees']:
cursor.execute("""
SELECT COUNT(*) as count FROM breakout_session_rsvps
WHERE breakout_session_id = %s AND status = 'registered'
""", (session_id,))
count = cursor.fetchone()['count']
if count >= session_data['max_attendees']:
continue
cursor.execute("""
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
VALUES (%s, %s, 'registered')
""", (session_id, attendee_id))
conn.commit()
cursor.close()
conn.close()
# Clear pending registration
session.pop('pending_registration', None)
# Send confirmation email
event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced'
personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True)
send_attendee_confirmation_email(pending['email'], pending['first_name'], event['name'], event_date, event['location'], personal_page_url)
# Auto-login the attendee
session['user_id'] = attendee_id
session['user_type'] = 'attendee'
session['event_id'] = pending['event_id']
flash('Registration and payment successful!')
return redirect(url_for('attendee_personal_page', token=confirmation_token))
except Error as e:
flash(f'Database error: {e}')
return render_template('attendee/payment.html', event=event, pending=pending)
@app.route('/attendee/personal/<token>')
def attendee_personal_page(token):
"""Personal page accessed via confirmation email link."""
client_ip = request.remote_addr
# Check if IP is blocked
if is_ip_blocked_for_personal_page(client_ip):
flash(f'Too many invalid attempts. Please try again later (blocked for 30 minutes).')
return redirect(url_for('index'))
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# Find attendee by confirmation token
cursor.execute("SELECT * FROM attendees WHERE confirmation_token = %s", (token,))
attendee = cursor.fetchone()
# Fallback: also accept attendee_code for old links where token was cleared
if not attendee:
cursor.execute("SELECT * FROM attendees WHERE attendee_code = %s", (token,))
attendee = cursor.fetchone()
if not attendee:
record_failed_personal_page_attempt(client_ip)
flash('Invalid link.')
cursor.close()
conn.close()
return redirect(url_for('index'))
# Clear failed attempts on successful access
clear_failed_personal_page_attempts(client_ip)
# Auto-login the attendee
# Note: confirmation_token is kept so the link remains valid indefinitely
session['user_id'] = attendee['id']
session['user_type'] = 'attendee'
session['event_id'] = attendee['event_id']
# Get all events for this attendee
cursor.execute("""
SELECT e.* FROM events e
WHERE e.id = %s
""", (attendee['event_id'],))
events = cursor.fetchall()
# For each event, get breakout sessions with RSVP status
for event in events:
cursor.execute("""
SELECT bs.*,
(SELECT COUNT(*) FROM breakout_session_rsvps
WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count,
(SELECT status FROM breakout_session_rsvps
WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status
FROM breakout_sessions bs
WHERE bs.event_id = %s
ORDER BY bs.start_time
""", (attendee['id'], event['id']))
event['breakout_sessions'] = cursor.fetchall()
cursor.close()
conn.close()
return render_template('attendee/personal.html', attendee=attendee, events=events)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
@app.route('/event/<int:event_id>')
def view_event(event_id):
"""View event details."""
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT e.*, o.name as organizer_name,
(SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count
FROM events e
JOIN organizers o ON e.organizer_id = o.id
WHERE e.id = %s
""", (event_id,))
event = cursor.fetchone()
cursor.close()
conn.close()
if not event:
flash('Event not found.')
return redirect(url_for('index'))
return render_template('attendee/event.html', event=event)
except Error as e:
flash(f'Database error: {e}')
return redirect(url_for('index'))
if __name__ == '__main__':
# Ensure staff_code column exists in organizers and staff tables
ensure_staff_code_column()
ensure_all_organizers_have_staff_code()
ensure_staff_staff_code_column()
ensure_all_staff_have_staff_code()
app.run(debug=True, host='0.0.0.0', port=5002)