import base64 import io import os import random import smtplib import string import uuid import requests from datetime import datetime from email.encoders import encode_base64 from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from functools import wraps import bcrypt import mysql.connector import qrcode from babel import Locale from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for, send_file from flask_cors import CORS from flask_babel import Babel, gettext as _, ngettext, force_locale from reportlab.lib.pagesizes import A4 from reportlab.lib.units import mm from reportlab.pdfgen import canvas import openpyxl from openpyxl.styles import Font, PatternFill, Alignment from mysql.connector import Error from config import Config app = Flask(__name__, template_folder='templates', static_folder='static') app.config.from_object(Config) CORS(app) # Supported locales SUPPORTED_LOCALES = ['en', 'nl', 'de', 'fr', 'es', 'it', 'pl'] def get_locale(): """Determine the best locale based on URL parameter, cookie, session, or user preference.""" from flask import session, request from config import Config # 1. Check URL parameter lang = request.args.get('lang') if lang and lang in SUPPORTED_LOCALES: session['locale'] = lang return lang # 2. Check session if 'locale' in session and session['locale'] in SUPPORTED_LOCALES: return session['locale'] # 3. Check cookie (for returning visitors) lang = request.cookies.get('locale') if lang and lang in SUPPORTED_LOCALES: session['locale'] = lang return lang # 4. Check user preference from database if 'user_id' in session and 'user_type' in session: user_lang = get_user_preferred_language(session['user_id'], session.get('user_type')) if user_lang: return user_lang # 5. Default return Config.BABEL_DEFAULT_LOCALE def get_locale_string(): """Return locale as string for Babel.""" locale = get_locale() # Convert 'en' to 'en_US' format if needed, but Babel 4.0 should handle short codes return locale # Initialize Babel with locale_selector babel = Babel() babel.init_app( app, default_locale=Config.BABEL_DEFAULT_LOCALE, default_translation_directories=Config.BABEL_TRANSLATION_DIRECTORIES, locale_selector=get_locale_string ) def get_user_preferred_language(user_id, user_type): """Get user's preferred language from database.""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) if user_type == 'organizer': cursor.execute("SELECT preferred_language FROM organizers WHERE id = %s", (user_id,)) elif user_type == 'attendee': cursor.execute("SELECT preferred_language FROM attendees WHERE id = %s", (user_id,)) elif user_type == 'staff': cursor.execute("SELECT preferred_language FROM staff WHERE id = %s", (user_id,)) else: cursor.close() conn.close() return None result = cursor.fetchone() cursor.close() conn.close() if result and result.get('preferred_language') in SUPPORTED_LOCALES: return result['preferred_language'] except Error: pass return None # English translations map (replaces database translations) ENGLISH_TRANSLATIONS = { 'dashboard': 'Dashboard', 'logout': 'Logout', 'login': 'Login', 'organizer_login': 'Organizer Login', 'register': 'Register', 'events': 'Events', 'attendees': 'Attendees', 'profile': 'Profile', 'create_event': 'Create Event', 'edit_event': 'Edit Event', 'delete_event': 'Delete Event', 'event_details': 'Event Details', 'staff': 'Staff', 'add_staff': 'Add Staff', 'remove_staff': 'Remove Staff', 'breakout_sessions': 'Break-out Sessions', 'registered_attendees': 'Registered Attendees', 'checked_in': 'Checked In', 'not_checked_in': 'Not Checked In', 'invite_pending': 'Invite Pending', 'active': 'Active', 'name': 'Name', 'email': 'Email', 'status': 'Status', 'actions': 'Actions', 'start': 'Start', 'end': 'End', 'location': 'Location', 'max_attendees': 'Max Attendees', 'registration_link': 'Registration Link', 'details': 'Details', 'unlimited': 'unlimited', 'registered': 'registered', 'my_events': 'My Events', 'no_events_yet': "You haven't created any events yet.", 'create_first_event': 'Create your first event', 'organizer': 'Organizer', 'first_name': 'First Name', 'last_name': 'Last Name', 'description': 'Description', 'view': 'View', 'remove': 'Remove', 'add_attendee': 'Add Attendee', 'add_breakout_session': 'Add Break-out Session', 'add_staff_member': 'Add Staff Member', 'no_staff_yet': 'No staff members added yet.', 'no_breakout_sessions_yet': 'No break-out sessions created yet.', 'confirm_remove_staff': 'Are you sure you want to remove', 'from_event': 'from this event', 'remove_staff_member': 'Remove Staff Member', 'time': 'Time', 'organisation': 'Organisation', 'role': 'Role', 'role_profession': 'Role / Profession', 'phone': 'Phone', 'linkedin': 'LinkedIn', 'about_me': 'About Me', 'cancel': 'Cancel', 'welcome': 'Welcome to NetEvents', 'connect_with_professionals': 'Connect with professionals at networking events', 'register_as_organizer': 'Register as Organizer', 'already_have_account': 'Already have an account?', 'dont_have_account': "Don't have an account?", 'click_here': 'Click here', 'login_as_organiser': 'login as organiser', 'attendee_profile_link': 'Are you an attendee and want to check your personal profile page?', 'attendee_profile_url': '/attendee/personal/', 'request_profile_link': 'Request Your Personal Page Link', 'request_link_description': 'Enter your email address and we will send you a link to access your personal attendee profile page.', 'send_link': 'Send Link', 'email_sent': 'If that email is registered, we have sent the link.', 'email_not_found': 'No attendee found with that email address.', 'remember_password': 'Remember your password?', 'not_yet_registered': 'Not yet registered?', 'register_for_free': 'Register for FREE!', 'presenter': 'Presenter', 'visitor': 'Visitor', 'organiser': 'Organiser', 'upcoming_events': 'Upcoming Events', 'no_upcoming_events': 'No upcoming events at the moment. Check back soon!', 'platform_features': 'Platform Features', 'for_organizers': 'For Organizers', 'for_attendees': 'For Attendees', 'view_event': 'View Event', 'translation_management': 'Translation Management', 'key': 'Key', 'translation': 'Translation', 'language': 'Language', 'save': 'Save', 'add': 'Add', 'translations': 'Translations', 'presenter_dashboard': 'Presenter Dashboard', 'my_breakout_sessions': 'My Break-out Sessions', 'event': 'Event', 'scan_qr_code': 'Scan QR Code', 'scan_qr_to_connect': 'Scan QR to connect!', 'scan_instructions': 'Point your camera at an attendee QR code to check them in', 'camera_access': 'Camera Access', 'camera_permission_denied': 'Camera permission denied. Please allow camera access to scan QR codes.', 'error_reading_qr': 'Error reading QR code', 'attendee_not_found': 'Attendee not found', 'already_checked_in': 'Already checked in', 'check_in_successful': 'Check-in successful', 'password': 'Password', 'confirm_password': 'Confirm Password', 'complete_registration': 'Complete Registration', 'youre_invited': "You're invited to join", 'set_password_complete': 'Set a password to complete your registration', 'your_profile': 'Your Profile', 'edit_profile': 'Edit Profile', 'save_profile': 'Save Profile', 'profile_updated': 'Profile updated successfully', 'connection_requests': 'Connection Requests', 'my_connections': 'My Connections', 'send_connection_request': 'Send Connection Request', 'connection_sent': 'Connection request sent', 'accept': 'Accept', 'reject': 'Reject', 'pending': 'Pending', 'accepted': 'Accepted', 'rejected': 'Rejected', 'appointments': 'Appointments', 'request_appointment': 'Request Appointment', 'appointment_time': 'Appointment Time', 'appointment_notes': 'Notes', 'appointment_location': 'Location', 'request_sent': 'Appointment request sent', 'appointments_with': 'Appointments with', 'no_appointments': 'No appointments yet', 'breakout_session': 'Break-out Session', 'breakout_session_detail': 'Break-out Session Details', 'speaker': 'Speaker', 'session_full': 'Session Full', 'register_for_session': 'Register for Session', 'registered_for_session': 'Registered for Session', 'unregister_from_session': 'Unregister from Session', 'check_in': 'Check In', 'check_out': 'Check Out', 'event_checked_in': 'Event Checked In', 'event_checked_out': 'Event Checked Out', 'your_events': 'Your Events', 'past_events': 'Past Events', 'no_past_events': 'No past events', 'event_not_found': 'Event not found', 'invalid_invite_link': 'Invalid or expired invite link', 'password_mismatch': 'Passwords do not match', 'password_too_short': 'Password must be at least 8 characters', 'registration_successful': 'Registration successful', 'login_successful': 'Login successful', 'logout_successful': 'Logout successful', 'access_denied': 'Access denied', 'page_not_found': 'Page not found', 'internal_error': 'Internal server error', 'db_error': 'Database error', 'error_adding_staff': 'Error adding staff member', 'error_checking_in': 'Error checking in attendee', 'badge_printed': 'Badge printed', 'no_events': 'No events found', 'create_event': 'Create Event', 'edit_breakout_session': 'Edit Break-out Session', 'delete_breakout_session': 'Delete Break-out Session', 'view_all': 'View All', 'loading': 'Loading...', 'submit': 'Submit', 'close': 'Close', 'back': 'Back', 'next': 'Next', 'previous': 'Previous', 'search': 'Search', 'filter': 'Filter', 'sort_by': 'Sort by', 'ascending': 'Ascending', 'descending': 'Descending', 'rows_per_page': 'Rows per page', 'no_data': 'No data available', 'staff_dashboard': 'Staff Dashboard', 'select_event': 'Select an Event', 'no_events_assigned': 'No events assigned to you yet', 'start_qr_scanner': 'Start QR Scanner', 'back_to_dashboard': 'Back to Dashboard', 'download_badge': 'Download Badge', 'email_badge': 'Email Badge', 'badge_sent': 'Badge sent to your email!', 'attendee_types': 'Attendee Types', 'manage_types': 'Manage Types', 'create_attendee_type': 'Create Attendee Type', 'type_name': 'Type Name', 'e_g_vip_speaker_student': 'e.g., VIP, Speaker, Student', 'price': 'Price', 'optional': 'optional', 'leave_empty_for_free': 'Leave empty for free', 'create_type': 'Create Type', 'existing_types': 'Existing Types', 'registration_link': 'Registration Link', 'copy': 'Copy', 'delete': 'Delete', 'confirm_delete_type': 'Are you sure you want to delete this attendee type?', 'edit_attendee_type': 'Edit Attendee Type', 'back_to_types': 'Back to Types', 'type_details': 'Type Details', 'save_changes': 'Save Changes', 'edit': 'Edit', 'cannot_delete_type_with_attendees': 'Cannot delete: %d attendees assigned to this type', 'no_attendee_types_yet': 'No attendee types defined yet.', 'create_first_type': 'Create the first type', 'back_to_event': 'Back to Event', 'link_copied': 'Link copied!', 'copy_failed': 'Failed to copy link.', 'free': 'Free', 'select_all': 'Select All', 'assign_type': 'Assign Type', 'no_type': 'No Type', 'apply': 'Apply', 'selected': 'selected', 'type': 'Type', 'registration_type': 'Registration Type', 'manage_attendee_types_across_events': 'Manage attendee types across all your events', 'no_attendee_types_for_event': 'No attendee types defined for this event.', 'attendees': 'Attendees', 'manage': 'Manage', 'payment': 'Payment', 'complete_payment': 'Complete Payment', 'back_to_registration': 'Back to Registration', 'order_summary': 'Order Summary', 'event_registration': 'Event Registration', 'attendee_type': 'Attendee Type', 'attendee': 'Attendee', 'total': 'Total', 'name_on_card': 'Name on Card', 'card_number': 'Card Number', 'expiry_date': 'Expiry Date', 'cvv': 'CVV', 'pay_now': 'Pay Now', 'payment_secure': 'Your payment is secure and encrypted', 'john_doe': 'John Doe', 'mm/yy': 'MM/YY', } def get_translation(key, locale=None): """Get translation for a key in English, with underscores replaced by spaces.""" result = ENGLISH_TRANSLATIONS.get(key, key) # Replace underscores with spaces and title case if '_' in str(result): words = str(result).split('_') result = ' '.join(word.capitalize() for word in words) return result def t(key, locale=None): """Shorthand for get_translation.""" return get_translation(key, locale) def strftime_to_babel_pattern(strftime_format): """Convert Python strftime format string to Babel CLDR pattern. Quotes literal text for proper Babel interpretation. """ import re # Mapping of strftime codes to Babel CLDR codes strftime_to_babel = { '%Y': 'yyyy', '%y': 'yy', '%m': 'MM', '%B': 'MMMM', '%b': 'MMM', '%d': 'dd', '%A': 'EEEE', '%a': 'EEE', '%H': 'HH', '%I': 'hh', '%M': 'mm', '%S': 'ss', '%p': 'a', '%j': 'DDD', '%U': 'ww', '%W': 'ww', } # Find all strftime format codes and their positions format_code_pattern = re.compile(r'%[A-Za-z]') result_parts = [] last_end = 0 for match in format_code_pattern.finditer(strftime_format): # Get the literal text before this format code if match.start() > last_end: literal = strftime_format[last_end:match.start()] # Quote literal text for Babel (escape single quotes by doubling) if literal: quoted_literal = literal.replace("'", "''") result_parts.append(f"'{quoted_literal}'") # Convert the format code code = match.group() if code in strftime_to_babel: result_parts.append(strftime_to_babel[code]) last_end = match.end() # Handle any remaining literal text after the last format code if last_end < len(strftime_format): literal = strftime_format[last_end:] if literal: quoted_literal = literal.replace("'", "''") result_parts.append(f"'{quoted_literal}'") return ''.join(result_parts) def localized_date(dt, locale=None, format=None): """Format a datetime in the specified locale's format.""" if dt is None: return '' # Ensure dt is a datetime object from datetime import datetime as dt_class if not isinstance(dt, dt_class): # If it's a string or something else, try to parse it or return empty if isinstance(dt, str): try: dt = dt_class.strptime(dt, '%Y-%m-%d %H:%M:%S') except ValueError: try: dt = dt_class.strptime(dt, '%Y-%m-%dT%H:%M:%S') except ValueError: return str(dt) if dt else '' else: return str(dt) if dt else '' if locale is None: locale = get_locale() try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT date_format FROM languages WHERE code = %s", (locale,)) result = cursor.fetchone() cursor.close() conn.close() if format is None and result: format = strftime_to_babel_pattern(result['date_format']) elif format is None: format = 'MMMM d, yyyy h:mm a' else: # User provided a format from Jinja filter - convert it too format = strftime_to_babel_pattern(format) # Use babel's date formatting for proper locale support from babel.dates import format_datetime return format_datetime(dt, format, locale=locale) except Exception: # Fallback to strftime (will only properly show English) try: return dt.strftime('%B %d, %Y at %H:%M') except Exception: return str(dt) # Jinja2 filters @app.template_filter('localized_date') def localized_date_filter(dt, format=None): return localized_date(dt, get_locale(), format) @app.template_filter('t') def translate_filter(key): # Always use English for translations return get_translation(key, 'en') @app.template_filter('spacify') def spacify_filter(text): """Replace underscores with spaces and capitalize each word.""" if text is None or text == '': return '' return ' '.join(word.capitalize() for word in str(text).split('_')) @app.template_filter('default') def default_filter(value, default_value=''): """Return default_value if value is none or undefined.""" if value is None: return default_value return value @app.template_filter('format_currency') def format_currency_filter(value): """Format a number as currency.""" if value is None or value == '': return '' try: return f'{float(value):.2f}' except (ValueError, TypeError): return str(value) @app.context_processor def utility_processor(): """Add utility functions to template context.""" def get_currency_symbol(): return '€' # Could be made dynamic based on locale return dict(get_currency_symbol=get_currency_symbol) # Update session locale when language changes @app.before_request def before_request(): """Before each request, update locale from URL if provided.""" if 'lang' in request.args: lang = request.args.get('lang') if lang in SUPPORTED_LOCALES: session['locale'] = lang @app.after_request def prevent_caching(response): """Prevent proxy caching to ensure fresh translations.""" response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response # Ensure upload folder exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) def get_db_connection(user=Config.DB_APP_USER, password=Config.DB_APP_PASSWORD): """Get a database connection.""" return mysql.connector.connect( host=Config.DB_HOST, port=Config.DB_PORT, user=user, password=password, database=Config.DB_NAME ) def generate_event_code(): """Generate a unique 10-character alphanumeric event code.""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=10)) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id FROM events WHERE code = %s", (code,)) if not cursor.fetchone(): cursor.close() conn.close() return code cursor.close() conn.close() def generate_session_code(): """Generate a unique 10-character alphanumeric breakout session code.""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=10)) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,)) if not cursor.fetchone(): cursor.close() conn.close() return code cursor.close() conn.close() def generate_attendee_code(): """Generate a unique 10-character alphanumeric attendee code.""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=10)) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id FROM attendees WHERE attendee_code = %s", (code,)) if not cursor.fetchone(): cursor.close() conn.close() return code cursor.close() conn.close() def generate_staff_code(): """Generate a unique 10-character alphanumeric staff code.""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=10)) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id FROM organizers WHERE staff_code = %s", (code,)) if not cursor.fetchone(): cursor.close() conn.close() return code cursor.close() conn.close() def generate_type_code(event_id): """Generate a unique 10-character alphanumeric attendee type code for an event.""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=10)) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id FROM attendee_types WHERE event_id = %s AND code = %s", (event_id, code)) if not cursor.fetchone(): cursor.close() conn.close() return code cursor.close() conn.close() def login_required(f): """Decorator to require login.""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session or 'user_type' not in session: flash('Please log in first.') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def get_current_user(): """Get current logged-in user from session.""" if 'user_id' not in session or 'user_type' not in session: return None, None return session['user_id'], session['user_type'] # Utility functions def hash_password(password): """Hash a password.""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def verify_password(password, hashed): """Verify a password.""" return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def verify_recaptcha(recaptcha_response): """Verify a reCAPTCHA response with Google.""" secret_key = app.config.get('RECAPTCHA_SECRET_KEY') if not secret_key: return True # Skip verification if no secret key configured try: payload = { 'secret': secret_key, 'response': recaptcha_response } resp = requests.post('https://www.google.com/recaptcha/api/siteverify', data=payload, timeout=10) result = resp.json() return result.get('success', False) except Exception: return False # Rate limiting for failed login attempts failed_login_attempts = {} # {ip_address: [(timestamp, count), ...]} # Rate limiting for failed personal page attempts (30-minute block) failed_personal_page_attempts = {} # {ip_address: [(timestamp, count), ...]} PERSONAL_PAGE_BLOCK_DURATION = 30 * 60 # 30 minutes in seconds PERSONAL_PAGE_MAX_ATTEMPTS = 5 # Max failed attempts before blocking def is_ip_blocked(ip_address): """Check if an IP address is blocked due to too many failed login attempts.""" if ip_address not in failed_login_attempts: return False # Clean up old entries (older than 1 hour) current_time = datetime.now() failed_login_attempts[ip_address] = [ (ts, count) for ts, count in failed_login_attempts[ip_address] if (current_time - ts).total_seconds() < 3600 ] # Check if still blocked if len(failed_login_attempts[ip_address]) >= 5: return True # Remove empty entries if not failed_login_attempts[ip_address]: del failed_login_attempts[ip_address] return False def is_ip_blocked_for_personal_page(ip_address): """Check if an IP address is blocked due to too many failed personal page attempts.""" if ip_address not in failed_personal_page_attempts: return False # Clean up old entries (older than 30 minutes) current_time = datetime.now() failed_personal_page_attempts[ip_address] = [ (ts, count) for ts, count in failed_personal_page_attempts[ip_address] if (current_time - ts).total_seconds() < PERSONAL_PAGE_BLOCK_DURATION ] # Check if still blocked if len(failed_personal_page_attempts[ip_address]) >= PERSONAL_PAGE_MAX_ATTEMPTS: return True # Remove empty entries if not failed_personal_page_attempts[ip_address]: del failed_personal_page_attempts[ip_address] return False def record_failed_personal_page_attempt(ip_address): """Record a failed personal page attempt for an IP address.""" if ip_address not in failed_personal_page_attempts: failed_personal_page_attempts[ip_address] = [] failed_personal_page_attempts[ip_address].append((datetime.now(), 1)) def clear_failed_personal_page_attempts(ip_address): """Clear failed personal page attempts after successful access.""" if ip_address in failed_personal_page_attempts: del failed_personal_page_attempts[ip_address] def record_failed_login(ip_address): """Record a failed login attempt for an IP address.""" if ip_address not in failed_login_attempts: failed_login_attempts[ip_address] = [] failed_login_attempts[ip_address].append((datetime.now(), 1)) def clear_failed_logins(ip_address): """Clear failed login attempts after successful login.""" if ip_address in failed_login_attempts: del failed_login_attempts[ip_address] def ensure_staff_code_column(): """Ensure the staff_code column exists in organizers table.""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'organizers' AND COLUMN_NAME = 'staff_code' """, (Config.DB_NAME,)) if not cursor.fetchone(): cursor.execute("ALTER TABLE organizers ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL") conn.commit() print("Added staff_code column to organizers table") cursor.close() conn.close() except Error as e: print(f"Error ensuring staff_code column: {e}") def ensure_all_organizers_have_staff_code(): """Ensure all existing organizers have a staff_code.""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM organizers WHERE staff_code IS NULL") organizers_without_code = cursor.fetchall() for org in organizers_without_code: staff_code = generate_staff_code() cursor.execute("UPDATE organizers SET staff_code = %s WHERE id = %s", (staff_code, org['id'])) if organizers_without_code: conn.commit() print(f"Generated staff_codes for {len(organizers_without_code)} organizers") cursor.close() conn.close() except Error as e: print(f"Error generating staff codes: {e}") def ensure_staff_staff_code_column(): """Ensure the staff_code column exists in staff table.""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'staff' AND COLUMN_NAME = 'staff_code' """, (Config.DB_NAME,)) if not cursor.fetchone(): cursor.execute("ALTER TABLE staff ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL") conn.commit() print("Added staff_code column to staff table") cursor.close() conn.close() except Error as e: print(f"Error ensuring staff staff_code column: {e}") def ensure_all_staff_have_staff_code(): """Ensure all existing staff members have a staff_code.""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM staff WHERE staff_code IS NULL") staff_without_code = cursor.fetchall() for s in staff_without_code: staff_code = generate_staff_code() cursor.execute("UPDATE staff SET staff_code = %s WHERE id = %s", (staff_code, s['id'])) if staff_without_code: conn.commit() print(f"Generated staff_codes for {len(staff_without_code)} staff members") cursor.close() conn.close() except Error as e: print(f"Error generating staff codes: {e}") def allowed_file(filename): """Check if file extension is allowed.""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in Config.ALLOWED_EXTENSIONS def generate_invite_token(): """Generate a unique invite token for staff.""" return uuid.uuid4().hex def send_staff_invite_email(staff_email, staff_name, event_name, invite_token, organizer_email): """Send invite email to staff member.""" invite_url = url_for('staff_invite', token=invite_token, _external=True) subject = f"You're invited to join {event_name} as Staff" body = f"""Hello {staff_name}, You've been invited to join the event "{event_name}" as a staff member. Click the link below to complete your registration and set your password: {invite_url} This link will expire in 7 days. Best regards, The Event Team """ msg = MIMEMultipart() msg['From'] = Config.MAIL_DEFAULT_SENDER msg['To'] = staff_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) try: with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server: if Config.MAIL_USE_TLS: server.starttls() if Config.MAIL_USERNAME: server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False def send_attendee_confirmation_email(attendee_email, attendee_name, event_name, event_date, event_location, personal_page_url=None): """Send confirmation email to attendee after registration.""" subject = f"Registration confirmed for {event_name}" personal_page_section = f""" Your Personal Page: {personal_page_url} View your registered events and breakout sessions anytime. """ if personal_page_url else "" body = f"""Hello {attendee_name}, Your registration for {event_name} has been confirmed! Event Details: - Date: {event_date} - Location: {event_location}{personal_page_section} You can now log in to: - View event details - RSVP for break-out sessions - Connect with other attendees We look forward to seeing you there! Best regards, The Event Team """ msg = MIMEMultipart() msg['From'] = Config.MAIL_DEFAULT_SENDER msg['To'] = attendee_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) try: with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server: if Config.MAIL_USE_TLS: server.starttls() if Config.MAIL_USERNAME: server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False def send_breakout_session_update_email(attendee_email, attendee_name, session_name, event_name, changes_text): """Send update email to attendee when a breakout session is modified.""" subject = f"Update on Breakout Session: {session_name}" body = f"""Hello {attendee_name}, The breakout session "{session_name}" for {event_name} has been updated. Changes made: {changes_text} Please log in to view the updated details. Best regards, The Event Team """ msg = MIMEMultipart() msg['From'] = Config.MAIL_DEFAULT_SENDER msg['To'] = attendee_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) try: with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server: if Config.MAIL_USE_TLS: server.starttls() if Config.MAIL_USERNAME: server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False def send_event_update_email(attendee_email, attendee_name, event_name, changes_text): """Send update email to attendee when an event is modified.""" subject = f"Update on Event: {event_name}" body = f"""Hello {attendee_name}, The event "{event_name}" has been updated. Changes made: {changes_text} Please log in to view the updated details. Best regards, The Event Team """ msg = MIMEMultipart() msg['From'] = Config.MAIL_DEFAULT_SENDER msg['To'] = attendee_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) try: with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server: if Config.MAIL_USE_TLS: server.starttls() if Config.MAIL_USERNAME: server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False def generate_badge_pdf(attendee, event=None): """Generate an A4 PDF badge for an attendee. The PDF is designed to be folded horizontally and vertically once, with the attendee info in the lower-left quarter. Includes dotted fold lines and QR code. """ buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=A4) width, height = A4 # 595.28 x 841.89 points # A4 dimensions in mm a4_width_mm = 210 a4_height_mm = 297 # Margins - 10cm higher than before (10cm = 100mm) left_margin = 15 * mm bottom_margin = 15 * mm + 100 * mm # 10cm higher # Dotted line settings dash_length = 5 gap_length = 5 # Draw vertical dotted line down the middle center_x = width / 2 c.setDash(dash_length, gap_length) c.setLineWidth(0.5) c.line(center_x, 0, center_x, height) # Draw horizontal dotted line over the middle center_y = height / 2 c.line(0, center_y, width, center_y) # Reset dash for other drawing c.setDash([]) # Draw VISITOR bar at top of bottom-left quadrant, left edge to center, below dotted line bar_height = 50 # Height of the bar in points (doubled) c.setFillColorRGB(0, 0, 0) # Black color c.rect(0, center_y - bar_height, center_x, bar_height, fill=True, stroke=False) # Draw VISITOR text in white, centered c.setFillColorRGB(1, 1, 1) # White c.setFont("Helvetica-Bold", 27) # 150% of 18 text = "VISITOR" text_width = c.stringWidth(text, "Helvetica-Bold", 27) text_x = (center_x - text_width) / 2 c.drawString(text_x, center_y - bar_height + 12, text) # Reset fill color to black for subsequent text c.setFillColorRGB(0, 0, 0) # Font setup - 32pt (twice as high as 16pt) for name, 16pt for org/role # Bold for name, regular for org/role c.setFont("Helvetica-Bold", 32) # Position in lower-left quarter # Content y position (from bottom of page) - moved 1.5cm (42.5pt) further up y_name = bottom_margin + 40 - 42.5 # Start with name # Draw attendee first name (bold, 32pt) first_name = attendee.get('first_name', '') last_name = attendee.get('last_name', '') # Check if names fit within the width limit (half of A4 width minus margins) max_name_width = (width / 2) - left_margin - 10 # 10pt padding full_name_width = c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", 32) # If full name is too wide, reduce font size font_size = 32 if full_name_width > max_name_width: # Try smaller sizes for size in [28, 24, 20, 18, 16]: if c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", size) <= max_name_width: font_size = size break c.setFont("Helvetica-Bold", font_size) # Draw first name c.drawString(left_margin, y_name, first_name) # Draw last name on next line y_last_name = y_name - (font_size * 1.2) c.drawString(left_margin, y_last_name, last_name) # Extra blank line after last name y_org = y_last_name - (font_size * 1.4) # Draw organization (regular, 16pt, 150% = 24pt) c.setFont("Helvetica", 24) y_org = y_name - 50 c.drawString(left_margin, y_org, attendee.get('organisation', '') or '') # Draw role (regular, 16pt, 150% = 24pt) c.setFont("Helvetica", 24) y_role = y_org - 40 c.drawString(left_margin, y_role, attendee.get('role', '') or '') # Generate QR code for attendee_code attendee_code = attendee.get('attendee_code', '') if attendee_code: qr = qrcode.QRCode(version=1, box_size=10, border=1) qr.add_data(attendee_code) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") # Convert to bytes and then to ImageReader for reportlab qr_buffer = io.BytesIO() qr_img.save(qr_buffer, format='PNG') qr_buffer.seek(0) from reportlab.lib.utils import ImageReader qr_reader = ImageReader(qr_buffer) # Draw QR code at bottom-left with 15mm left padding, flush to bottom edge qr_size = 40 * mm # QR code size qr_x = 15 * mm # 15mm from left qr_y = 25 * mm # 10mm from top of bottom_bar (15mm + 10mm) c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size) # Draw bottom black bar with event info (from bottom edge, 150mm high) if event: bar_bottom_height = 15 * mm # 15mm high c.setFillColorRGB(0, 0, 0) # Black color c.rect(0, 0, center_x, bar_bottom_height, fill=True, stroke=False) # Draw event name centered c.setFillColorRGB(1, 1, 1) # White c.setFont("Helvetica-Bold", 14) event_name = event.get('name', '') or '' event_name_width = c.stringWidth(event_name, "Helvetica-Bold", 14) event_name_x = (center_x - event_name_width) / 2 c.drawString(event_name_x, bar_bottom_height - 21, event_name) # Draw start and end date/time on the same line, centered c.setFont("Helvetica", 10) start_time = event.get('start_time') end_time = event.get('end_time') if start_time and end_time: start_str = localized_date(start_time, get_locale(), '%d/%m/%Y %H:%M') end_str = localized_date(end_time, get_locale(), '%d/%m/%Y %H:%M') combined_str = start_str + " - " + end_str combined_width = c.stringWidth(combined_str, "Helvetica", 10) combined_x = (center_x - combined_width) / 2 c.drawString(combined_x, bar_bottom_height - 37, combined_str) c.save() buffer.seek(0) return buffer def generate_rectangular_badges_pdf(attendees, event=None, attendee_types_map=None): """Generate an A4 PDF with rectangular labels. Label dimensions: 80mm x 50mm Layout: 2 columns x 5 rows = 10 badges per A4 page. Each badge contains: attendee type (black bar), name, organization, role, and QR code. """ buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=A4) width, height = A4 # 595.28 x 841.89 points # Label dimensions: 80mm x 50mm label_width_mm = 80 label_height_mm = 50 label_width = label_width_mm * mm label_height = label_height_mm * mm badges_per_row = 2 badges_per_col = 5 badges_per_page = badges_per_row * badges_per_col # Page margins margin_left_mm = 15 margin_right_mm = 15 margin_top_mm = 12 margin_bottom_mm = 12 # Calculate spacing available_width = width - (margin_left_mm * mm) - (margin_right_mm * mm) available_height = height - (margin_top_mm * mm) - (margin_bottom_mm * mm) # Gap between labels gap_x = (available_width - badges_per_row * label_width) / (badges_per_row + 1) gap_y = (available_height - badges_per_col * label_height) / (badges_per_col + 1) # Badge positions (starting from top-left) badge_positions = [] for row in range(badges_per_col): for col in range(badges_per_row): x = margin_left_mm * mm + gap_x + col * (label_width + gap_x) y = height - margin_top_mm * mm - gap_y - row * (label_height + gap_y) - label_height badge_positions.append((x, y)) # Draw badges for each attendee for i, attendee in enumerate(attendees): badge_index = i % badges_per_page # New page if needed if badge_index == 0 and i > 0: c.showPage() if badge_index < len(badge_positions): x, y = badge_positions[badge_index] # Draw label border (light gray) c.setStrokeColorRGB(0.7, 0.7, 0.7) c.setLineWidth(0.5) c.rect(x, y, label_width, label_height, fill=False, stroke=True) # Get attendee type name from mapping attendee_type_id = attendee.get('attendee_type_id') attendee_type_name = '' if attendee_type_id and attendee_types_map: type_info = attendee_types_map.get(attendee_type_id, {}) attendee_type_name = type_info.get('name', '') or '' # Draw black bar at top with attendee type in white if attendee_type_name: bar_height = 8 * mm c.setFillColorRGB(0, 0, 0) c.rect(x, y + label_height - bar_height, label_width, bar_height, fill=True, stroke=False) c.setFillColorRGB(1, 1, 1) c.setFont("Helvetica-Bold", 16) c.drawCentredString(x + label_width / 2, y + label_height - bar_height + 1.5 * mm, attendee_type_name[:20]) # Draw attendee name (20pt) first_name = attendee.get('first_name', '') or '' last_name = attendee.get('last_name', '') or '' full_name = f"{first_name} {last_name}".strip() font_size = 20 max_name_width = label_width - 2 * mm name_width = c.stringWidth(full_name, "Helvetica-Bold", font_size) if name_width > max_name_width: # Scale down to fit font_size = int(font_size * max_name_width / name_width) c.setFont("Helvetica-Bold", font_size) c.setFillColorRGB(0, 0, 0) name_y = y + label_height - 22 * mm c.drawCentredString(x + label_width / 2, name_y, full_name) # Draw organization org = attendee.get('organisation', '') or '' c.setFont("Helvetica", 14) org_y = name_y - 8 * mm if org: c.drawCentredString(x + label_width / 2, org_y, org[:22]) # Draw role role = attendee.get('role', '') or '' c.setFont("Helvetica", 14) if role: role_y = org_y - 7 * mm c.drawCentredString(x + label_width / 2, role_y, role[:22]) # Generate QR code at bottom-right attendee_code = attendee.get('attendee_code', '') if attendee_code: qr_size = 12 * mm qr_x = x + label_width - qr_size - 2 * mm qr_y = y + 10 * mm qr = qrcode.QRCode(version=1, box_size=2, border=0) qr.add_data(attendee_code) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") qr_buffer = io.BytesIO() qr_img.save(qr_buffer, format='PNG') qr_buffer.seek(0) from reportlab.lib.utils import ImageReader qr_reader = ImageReader(qr_buffer) c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size) # Draw black bar at bottom with event name and start date bar_height = 8 * mm c.setFillColorRGB(0, 0, 0) c.rect(x, y, label_width, bar_height, fill=True, stroke=False) c.setFillColorRGB(1, 1, 1) c.setFont("Helvetica-Bold", 14) event_name = (event.get('name', '') or '') if event else '' start_time = event.get('start_time', '') if event else '' event_date_str = '' if start_time: if hasattr(start_time, 'strftime'): event_date_str = start_time.strftime('%d-%m-%Y') else: event_date_str = str(start_time)[:10] bar_text = f"{event_name} — {event_date_str}" if event_date_str else event_name c.drawCentredString(x + label_width / 2, y + bar_height / 2 - 1.5 * mm, bar_text) c.save() buffer.seek(0) return buffer def send_badge_email(attendee_email, attendee_name, event_name, pdf_buffer): """Send email with badge PDF attachment.""" subject = f"Your Badge for {event_name}" body = f"""Hello {attendee_name}, Please find attached your badge for {event_name}. The badge is designed to be folded once horizontally and once vertically. Your information appears in the lower-left quarter after folding. We look forward to seeing you there! Best regards, The Event Team """ msg = MIMEMultipart() msg['From'] = Config.MAIL_DEFAULT_SENDER msg['To'] = attendee_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) # Attach PDF pdf_buffer.seek(0) part = MIMEBase('application', 'octet-stream') part.set_payload(pdf_buffer.read()) encode_base64(part) part.add_header('Content-Disposition', 'attachment', filename=f'badge_{attendee_name.replace(" ", "_")}.pdf') msg.attach(part) try: with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server: if Config.MAIL_USE_TLS: server.starttls() if Config.MAIL_USERNAME: server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False # Routes - Main @app.route('/') def index(): """Home page.""" return render_template('index.html') @app.route('/staff/') def staff_login(staff_code): """Staff/organizer login via staff_code (no password required).""" client_ip = request.remote_addr # Check if IP is blocked (reuse login rate limiting) if is_ip_blocked(client_ip): flash('Too many failed login attempts. Please try again later (blocked for 1 hour).') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # First check if it's an organizer cursor.execute("SELECT * FROM organizers WHERE staff_code = %s", (staff_code,)) organizer = cursor.fetchone() if organizer: cursor.close() conn.close() clear_failed_logins(client_ip) # Log in the organizer session['user_id'] = organizer['id'] session['user_type'] = 'organizer' session['event_id'] = None flash(f'Welcome back, {organizer["name"]}!') return redirect(url_for('organizer_dashboard')) # Check if it's a staff member cursor.execute("SELECT * FROM staff WHERE staff_code = %s", (staff_code,)) staff_member = cursor.fetchone() cursor.close() conn.close() if not staff_member: record_failed_login(client_ip) flash('Invalid staff link.') return redirect(url_for('index')) # Clear failed login attempts on successful staff code login clear_failed_logins(client_ip) # Log in the staff member session['user_id'] = staff_member['id'] session['user_type'] = 'staff' session['event_id'] = staff_member['event_id'] flash(f'Welcome back, {staff_member["first_name"]}!') return redirect(url_for('staff_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) # Routes - Auth @app.route('/login', methods=['GET', 'POST']) def login(): """Login page - supports both organizer and attendee login.""" client_ip = request.remote_addr # Check if IP is blocked if is_ip_blocked(client_ip): flash('Too many failed login attempts. Please try again later (blocked for 1 hour).') return render_template('auth/login.html') if request.method == 'POST': email = request.form.get('email', '').strip() password = request.form.get('password', '') user_type = request.form.get('user_type', 'attendee') if not email or not password: flash('Email and password are required.') return render_template('auth/login.html') try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) if user_type == 'organizer': cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,)) elif user_type == 'breakout_organizer': cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,)) elif user_type == 'staff': cursor.execute("SELECT * FROM staff WHERE email = %s", (email,)) else: cursor.execute("SELECT * FROM attendees WHERE email = %s", (email,)) user = cursor.fetchone() cursor.close() conn.close() if user and verify_password(password, user['password_hash']): clear_failed_logins(client_ip) session['user_id'] = user['id'] session['user_type'] = user_type session['event_id'] = user.get('event_id') if user_type in ['attendee', 'staff'] else None if user_type == 'organizer': flash(f'Welcome back, {user["name"]}!') return redirect(url_for('organizer_dashboard')) elif user_type == 'breakout_organizer': flash(f'Welcome back, {user["name"]}!') return redirect(url_for('presenter_dashboard')) elif user_type == 'staff': flash(f'Welcome back, {user["first_name"]}!') return redirect(url_for('staff_dashboard')) else: flash(f'Welcome back, {user["first_name"]}!') return redirect(url_for('attendee_dashboard')) record_failed_login(client_ip) flash('Invalid email or password.') except Error as e: flash(f'Database error: {e}') # Handle pre-selection of user type from query string default_type = request.args.get('type', 'attendee') return render_template('auth/login.html', default_type=default_type) @app.route('/presenter/dashboard') @login_required def presenter_dashboard(): """Presenter (break-out organiser) dashboard.""" if session.get('user_type') != 'breakout_organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get sessions where this organizer is assigned as presenter cursor.execute(""" SELECT bs.*, e.name as event_name FROM breakout_sessions bs JOIN breakout_session_organizers bso ON bs.id = bso.breakout_session_id JOIN events e ON bs.event_id = e.id WHERE bso.organizer_id = %s ORDER BY bs.start_time """, (session['user_id'],)) sessions = cursor.fetchall() cursor.execute("SELECT name FROM organizers WHERE id = %s", (session['user_id'],)) organizer = cursor.fetchone() cursor.close() conn.close() return render_template('presenter/dashboard.html', sessions=sessions, presenter_name=organizer['name']) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/staff/dashboard') @login_required def staff_dashboard(): """Staff dashboard - list events for staff member.""" if session.get('user_type') != 'staff': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],)) staff_member = cursor.fetchone() cursor.execute("SELECT * FROM events WHERE id IN (SELECT event_id FROM staff WHERE id = %s) ORDER BY start_time", (session['user_id'],)) events = cursor.fetchall() cursor.close() conn.close() return render_template('staff/staff_events.html', events=events, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}") except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/staff//dashboard') @login_required def staff_event_dashboard(event_id): """Staff dashboard for a specific event - only accessible by staff members assigned to this event.""" if session.get('user_type') != 'staff': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Verify staff is assigned to this event cursor.execute("SELECT 1 FROM staff WHERE id = %s AND event_id = %s", (session['user_id'], event_id)) if not cursor.fetchone(): flash('Access denied.') cursor.close() conn.close() return redirect(url_for('staff_dashboard')) cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,)) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('staff_dashboard')) cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],)) staff_member = cursor.fetchone() cursor.close() conn.close() return render_template('staff/dashboard.html', event=event, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}") except Error as e: flash(f'Database error: {e}') return redirect(url_for('staff_dashboard')) @app.route('/register/organizer', methods=['GET', 'POST']) def register_organizer(): """Register a new organizer.""" if request.method == 'POST': name = request.form.get('name', '').strip() email = request.form.get('email', '').strip() password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') if not all([name, email, password]): flash('All fields are required.') return render_template('register.html', user_type='organizer') if password != confirm_password: flash('Passwords do not match.') return render_template('register.html', user_type='organizer') if len(password) < 6: flash('Password must be at least 6 characters.') return render_template('register.html', user_type='organizer') try: conn = get_db_connection() cursor = conn.cursor() # Check if email exists cursor.execute("SELECT id FROM organizers WHERE email = %s", (email,)) if cursor.fetchone(): flash('Email already registered.') cursor.close() conn.close() return render_template('register.html', user_type='organizer') # Create organizer password_hash = hash_password(password) staff_code = generate_staff_code() cursor.execute( "INSERT INTO organizers (email, password_hash, name, staff_code) VALUES (%s, %s, %s, %s)", (email, password_hash, name, staff_code) ) conn.commit() cursor.close() conn.close() flash('Registration successful! Please log in.') return redirect(url_for('login')) except Error as e: flash(f'Database error: {e}') return render_template('register.html', user_type='organizer') @app.route('/register/attendee/', methods=['GET', 'POST']) def register_attendee(code): """Register a new attendee for an event using event code.""" # Verify event exists try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE code = %s", (code,)) event = cursor.fetchone() cursor.close() conn.close() if not event: flash('Event not found.') return redirect(url_for('index')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) if request.method == 'POST': first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() email = request.form.get('email', '').strip() password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') organisation = request.form.get('organisation', '').strip() role = request.form.get('role', '').strip() introduction = request.form.get('introduction', '').strip() if not all([first_name, last_name, email, password]): flash('First name, last name, email, and password are required.') return render_template('register.html', user_type='attendee', event=event) if password != confirm_password: flash('Passwords do not match.') return render_template('register.html', user_type='attendee', event=event) try: conn = get_db_connection() cursor = conn.cursor() # Check if email already registered for this event cursor.execute( "SELECT id FROM attendees WHERE email = %s AND event_id = %s", (email, event['id']) ) if cursor.fetchone(): flash('Email already registered for this event.') cursor.close() conn.close() return render_template('register.html', user_type='attendee', event=event) password_hash = hash_password(password) attendee_code = generate_attendee_code() cursor.execute(""" INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, (event['id'], email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code)) conn.commit() cursor.close() conn.close() flash('Registration successful! Please log in.') return redirect(url_for('login')) except Error as e: flash(f'Database error: {e}') return render_template('register.html', user_type='attendee', event=event) @app.route('/logout') def logout(): """Log out current user.""" session.clear() flash('You have been logged out.') return redirect(url_for('index')) # Routes - Organizer @app.route('/organizer/dashboard') @login_required def organizer_dashboard(): """Organizer dashboard.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],)) events = cursor.fetchall() # Get attendee counts, breakout sessions, and staff for each event for event in events: cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s", (event['id'],)) event['attendee_count'] = cursor.fetchone()['count'] cursor.execute(""" SELECT bs.id, bs.name, bs.max_attendees, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (event['id'],)) event['breakout_sessions'] = cursor.fetchall() cursor.execute("SELECT id, first_name, last_name, email FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],)) event['staff'] = cursor.fetchall() cursor.execute("SELECT id, first_name, last_name, email, checked_in FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],)) event['attendees'] = cursor.fetchall() cursor.close() conn.close() return render_template('organizer/dashboard.html', events=events) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/organizer/attendee-types') @login_required def all_attendee_types(): """Show all attendee types across all events.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get all events for this organizer cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],)) events = cursor.fetchall() # Get attendee types for each event for event in events: cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],)) event['attendee_types'] = cursor.fetchall() # Get attendee count per type for at in event['attendee_types']: cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s AND attendee_type_id = %s", (event['id'], at['id'])) at['attendee_count'] = cursor.fetchone()['count'] cursor.close() conn.close() return render_template('organizer/all_attendee_types.html', events=events) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/organizer/event/create', methods=['GET', 'POST']) @login_required def create_event(): """Create a new event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) if request.method == 'POST': name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() start_time = request.form.get('start_time', '') end_time = request.form.get('end_time', '') location = request.form.get('location', '').strip() max_attendees = request.form.get('max_attendees', '') if not all([name, start_time, location]): flash('Name, start time, and location are required.') return render_template('organizer/create_event.html') try: event_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M') except ValueError: flash('Invalid start time format.') return render_template('organizer/create_event.html') event_end = None if end_time: try: event_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M') except ValueError: flash('Invalid end time format.') return render_template('organizer/create_event.html') try: conn = get_db_connection() cursor = conn.cursor() event_code = generate_event_code() cursor.execute(""" INSERT INTO events (organizer_id, code, name, description, start_time, end_time, location, max_attendees) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, (session['user_id'], event_code, name, description, event_start, event_end, location, max_attendees if max_attendees else None)) conn.commit() cursor.close() conn.close() flash('Event created successfully!') return redirect(url_for('organizer_dashboard')) except Error as e: flash(f'Database error: {e}') return render_template('organizer/create_event.html') @app.route('/organizer/event//edit', methods=['GET', 'POST']) @login_required def edit_event(event_id): """Edit an existing event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) if request.method == 'POST': name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() location = request.form.get('location', '').strip() max_attendees = request.form.get('max_attendees', '') if not all([name, location]): flash('Name and location are required.') cursor.close() conn.close() return render_template('organizer/edit_event.html', event=event) cursor.execute(""" UPDATE events SET name = %s, description = %s, location = %s, max_attendees = %s WHERE id = %s """, (name, description, location, max_attendees if max_attendees else None, event_id)) conn.commit() # Capture changes for the confirmation pop-up changes = [] old_event = event if old_event['name'] != name: changes.append(('Name', old_event['name'], name)) if (old_event['description'] or '') != description: changes.append(('Description', old_event['description'] or '', description)) if old_event['location'] != location: changes.append(('Location', old_event['location'], location)) if str(old_event['max_attendees'] or '') != max_attendees: changes.append(('Max Attendees', old_event['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited')) cursor.close() conn.close() if changes: # Get updated event conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,)) updated_event = cursor.fetchone() cursor.close() conn.close() return render_template('organizer/edit_event_confirm.html', event=updated_event, changes=changes) else: flash('Event updated successfully!') return redirect(url_for('event_detail', code=event['code'])) cursor.close() conn.close() return render_template('organizer/edit_event.html', event=event) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//notify', methods=['POST']) @login_required def notify_event_attendees(event_id): """Send email notification to event attendees about changes.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) # Get attendees for this event cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event_id,)) attendees = cursor.fetchall() cursor.close() conn.close() # Get changes text from form changes_text = request.form.get('changes_text', 'Event details have been updated.') # Send emails sent_count = 0 failed_count = 0 for attendee in attendees: full_name = f"{attendee['first_name']} {attendee['last_name']}" if send_event_update_email(attendee['email'], full_name, event['name'], changes_text): sent_count += 1 else: failed_count += 1 if failed_count == 0: flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.') else: flash(f'Notifications sent: {sent_count}, failed: {failed_count}.') return redirect(url_for('event_detail', code=event['code'])) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//breakout-sessions') @login_required def list_breakout_sessions(event_id): """List breakout sessions for an event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute(""" SELECT bs.*, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (event_id,)) sessions = cursor.fetchall() cursor.close() conn.close() return render_template('organizer/breakout_sessions.html', event=event, sessions=sessions) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//breakout-session/create', methods=['GET', 'POST']) @login_required def create_breakout_session(event_id): """Create a new breakout session.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() cursor.close() conn.close() if not event: flash('Event not found.') return redirect(url_for('organizer_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) # Check if event is single-day is_single_day = False prefilled_date = '' if event.get('start_time') and event.get('end_time'): start = event['start_time'] end = event['end_time'] if hasattr(start, 'date') and hasattr(end, 'date'): is_single_day = start.date() == end.date() if is_single_day: prefilled_date = start.strftime('%Y-%m-%d') if request.method == 'POST': name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() start_time = request.form.get('start_time', '') end_time = request.form.get('end_time', '') location = request.form.get('location', '').strip() max_attendees = request.form.get('max_attendees', '') if not all([name, start_time, end_time, location]): flash('Name, start time, end time, and location are required.') return render_template('organizer/create_breakout_session.html', event=event) try: session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M') session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M') except ValueError: flash('Invalid time format.') return render_template('organizer/create_breakout_session.html', event=event) if session_end <= session_start: flash('End time must be after start time.') return render_template('organizer/create_breakout_session.html', event=event) if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']: flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).') return render_template('organizer/create_breakout_session.html', event=event) try: conn = get_db_connection() cursor = conn.cursor() session_code = generate_session_code() cursor.execute(""" INSERT INTO breakout_sessions (code, event_id, name, description, start_time, end_time, location, max_attendees) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, (session_code, event_id, name, description, session_start, session_end, location, max_attendees if max_attendees else None)) breakout_session_id = cursor.lastrowid # Add event organizer as breakout session organizer cursor.execute(""" INSERT INTO breakout_session_organizers (breakout_session_id, organizer_id) VALUES (%s, %s) """, (breakout_session_id, session['user_id'])) conn.commit() cursor.close() conn.close() flash('Breakout session created successfully!') return redirect(url_for('list_breakout_sessions', event_id=event_id)) except Error as e: flash(f'Database error: {e}') return render_template('organizer/create_breakout_session.html', event=event, prefilled_date=prefilled_date) @app.route('/organizer/breakout-session//edit', methods=['GET', 'POST']) @login_required def edit_breakout_session(session_id): """Edit a breakout session.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get breakout session and verify organizer has access cursor.execute(""" SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id, e.max_attendees as event_max_attendees FROM breakout_sessions bs JOIN events e ON bs.event_id = e.id WHERE bs.id = %s """, (session_id,)) breakout_session = cursor.fetchone() if not breakout_session: flash('Breakout session not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) if breakout_session['event_organizer_id'] != session['user_id']: flash('Access denied.') cursor.close() conn.close() return redirect(url_for('index')) event = {'id': breakout_session['event_id'], 'name': breakout_session['event_name'], 'max_attendees': breakout_session['event_max_attendees']} if request.method == 'POST': name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() start_time = request.form.get('start_time', '') end_time = request.form.get('end_time', '') location = request.form.get('location', '').strip() max_attendees = request.form.get('max_attendees', '') if not all([name, start_time, end_time, location]): flash('Name, start time, end time, and location are required.') cursor.close() conn.close() return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event) try: session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M') session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M') except ValueError: flash('Invalid time format.') cursor.close() conn.close() return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event) if session_end <= session_start: flash('End time must be after start time.') cursor.close() conn.close() return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event) if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']: flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).') cursor.close() conn.close() return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event) cursor.execute(""" UPDATE breakout_sessions SET name = %s, description = %s, start_time = %s, end_time = %s, location = %s, max_attendees = %s WHERE id = %s """, (name, description, session_start, session_end, location, max_attendees if max_attendees else None, session_id)) conn.commit() # Capture changes for the confirmation pop-up changes = [] old_session = breakout_session if old_session['name'] != name: changes.append(('Name', old_session['name'], name)) if old_session['description'] != description: changes.append(('Description', old_session['description'] or '', description)) if old_session['start_time'].strftime('%Y-%m-%dT%H:%M') != start_time: changes.append(('Start Time', old_session['start_time'].strftime('%B %d, %Y at %H:%M'), session_start.strftime('%B %d, %Y at %H:%M'))) if old_session['end_time'].strftime('%Y-%m-%dT%H:%M') != end_time: changes.append(('End Time', old_session['end_time'].strftime('%B %d, %Y at %H:%M'), session_end.strftime('%B %d, %Y at %H:%M'))) if old_session['location'] != location: changes.append(('Location', old_session['location'], location)) if str(old_session['max_attendees'] or '') != max_attendees: changes.append(('Max Attendees', old_session['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited')) # Get updated session data cursor.execute(""" SELECT bs.*, e.name as event_name FROM breakout_sessions bs JOIN events e ON bs.event_id = e.id WHERE bs.id = %s """, (session_id,)) updated_session = cursor.fetchone() cursor.close() conn.close() if changes: return render_template('organizer/edit_breakout_session_confirm.html', session=updated_session, event=event, changes=changes) else: flash('Breakout session updated successfully!') return redirect(url_for('view_breakout_session', code=breakout_session['code'])) cursor.close() conn.close() return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/breakout-session//notify', methods=['POST']) @login_required def notify_breakout_session_attendees(session_id): """Send email notification to breakout session attendees about changes.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get breakout session and verify organizer has access cursor.execute(""" SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id FROM breakout_sessions bs JOIN events e ON bs.event_id = e.id WHERE bs.id = %s """, (session_id,)) breakout_session = cursor.fetchone() if not breakout_session: flash('Breakout session not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) if breakout_session['event_organizer_id'] != session['user_id']: flash('Access denied.') cursor.close() conn.close() return redirect(url_for('index')) # Get attendees who RSVPed to this session cursor.execute(""" SELECT a.email, a.first_name, a.last_name FROM breakout_session_rsvps bsr JOIN attendees a ON bsr.attendee_id = a.id WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered' """, (session_id,)) rsvps = cursor.fetchall() cursor.close() conn.close() # Get changes text from form changes_text = request.form.get('changes_text', 'Session details have been updated.') # Send emails sent_count = 0 failed_count = 0 for rsvp in rsvps: full_name = f"{rsvp['first_name']} {rsvp['last_name']}" if send_breakout_session_update_email(rsvp['email'], full_name, breakout_session['name'], breakout_session['event_name'], changes_text): sent_count += 1 else: failed_count += 1 if failed_count == 0: flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.') else: flash(f'Notifications sent: {sent_count}, failed: {failed_count}.') return redirect(url_for('view_breakout_session', code=breakout_session['code'])) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/breakout-session/') @login_required def view_breakout_session(code): """Breakout session detail page for organizers.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get breakout session and verify organizer has access cursor.execute(""" SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id FROM breakout_sessions bs JOIN events e ON bs.event_id = e.id WHERE bs.code = %s """, (code,)) session_data = cursor.fetchone() if not session_data: flash('Breakout session not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) # Check if user is event organizer or breakout session organizer cursor.execute(""" SELECT id FROM breakout_session_organizers WHERE breakout_session_id = %s AND organizer_id = %s """, (session_data['id'], session['user_id'])) is_organizer = cursor.fetchone() if session_data['event_organizer_id'] != session['user_id'] and not is_organizer: flash('Access denied.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) # Get RSVPs cursor.execute(""" SELECT bsr.*, a.first_name, a.last_name, a.email, a.organisation, a.role FROM breakout_session_rsvps bsr JOIN attendees a ON bsr.attendee_id = a.id WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered' ORDER BY a.last_name, a.first_name """, (session_data['id'],)) rsvps = cursor.fetchall() cursor.close() conn.close() return render_template('organizer/breakout_session_detail.html', session=session_data, rsvps=rsvps) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/attendee/breakout-sessions') @login_required def attendee_breakout_sessions(): """List breakout sessions for attendee's event.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],)) event = cursor.fetchone() cursor.execute(""" SELECT bs.*, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count, (SELECT status FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (session['user_id'], session['event_id'])) sessions = cursor.fetchall() cursor.close() conn.close() return render_template('attendee/breakout_sessions.html', event=event, sessions=sessions) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/attendee/breakout-session//rsvp', methods=['POST']) @login_required def rsvp_breakout_session(code): """RSVP for a breakout session.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get breakout session cursor.execute("SELECT * FROM breakout_sessions WHERE code = %s", (code,)) breakout = cursor.fetchone() cursor.close() if not breakout: conn.close() return jsonify({'error': 'Breakout session not found'}), 404 session_id = breakout['id'] # Check capacity if breakout['max_attendees']: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT COUNT(*) as count FROM breakout_session_rsvps WHERE breakout_session_id = %s AND status = 'registered' """, (session_id,)) current_count = cursor.fetchone()['count'] cursor.close() if current_count >= breakout['max_attendees']: conn.close() return jsonify({'error': 'Breakout session is full'}), 400 # Check for overlapping sessions cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT bs.id, bs.name FROM breakout_sessions bs JOIN breakout_session_rsvps bsr ON bs.id = bsr.breakout_session_id WHERE bsr.attendee_id = %s AND bsr.status = 'registered' AND bs.id != %s AND ( (bs.start_time <= %s AND bs.end_time > %s) OR (bs.start_time < %s AND bs.end_time >= %s) OR (bs.start_time >= %s AND bs.end_time <= %s) ) """, (session['user_id'], session_id, breakout['start_time'], breakout['start_time'], breakout['end_time'], breakout['end_time'], breakout['start_time'], breakout['end_time'])) overlapping = cursor.fetchall() cursor.close() if overlapping: names = ', '.join([s['name'] for s in overlapping]) conn.close() return jsonify({'error': f'Overlaps with: {names}'}), 400 # Check if already registered cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT id, status FROM breakout_session_rsvps WHERE breakout_session_id = %s AND attendee_id = %s """, (session_id, session['user_id'])) existing = cursor.fetchone() cursor.close() if existing: if existing['status'] == 'registered': conn.close() return jsonify({'error': 'Already registered'}), 400 else: # Reactivate cancelled RSVP cursor = conn.cursor() cursor.execute(""" UPDATE breakout_session_rsvps SET status = 'registered' WHERE id = %s """, (existing['id'],)) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'message': 'RSVP restored'}) else: # Create new RSVP cursor = conn.cursor() cursor.execute(""" INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status) VALUES (%s, %s, 'registered') """, (session_id, session['user_id'])) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/attendee/breakout-session//cancel-rsvp', methods=['POST']) @login_required def cancel_breakout_session_rsvp(code): """Cancel RSVP for a breakout session.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,)) breakout = cursor.fetchone() cursor.close() if not breakout: conn.close() return jsonify({'error': 'Breakout session not found'}), 404 cursor = conn.cursor() cursor.execute(""" UPDATE breakout_session_rsvps SET status = 'cancelled' WHERE breakout_session_id = %s AND attendee_id = %s AND status = 'registered' """, (breakout['id'], session['user_id'])) affected = cursor.rowcount conn.commit() cursor.close() conn.close() if affected > 0: return jsonify({'success': True}) else: return jsonify({'error': 'RSVP not found'}), 404 except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/attendee/event/attendance', methods=['GET', 'POST']) @login_required def update_event_attendance(): """Update attendance status for main event.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 if request.method == 'GET': try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT attendance_status FROM attendees WHERE id = %s", (session['user_id'],)) result = cursor.fetchone() cursor.close() conn.close() return jsonify({'status': result['attendance_status'] if result else 'attending'}) except Error as e: return jsonify({'error': str(e)}), 500 status = request.json.get('status') if request.json else request.form.get('status') if status not in ['attending', 'not_attending']: return jsonify({'error': 'Invalid status'}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE attendees SET attendance_status = %s WHERE id = %s """, (status, session['user_id'])) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/organizer/event/') @login_required def event_detail(code): """Event detail page for organizer.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE code = %s AND organizer_id = %s", (code, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event['id'],)) attendees = cursor.fetchall() cursor.execute(""" SELECT bs.*, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as registered_count FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (event['id'],)) breakout_sessions = cursor.fetchall() cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],)) staff = cursor.fetchall() # Get attendee types for this event cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],)) attendee_types = cursor.fetchall() # Build a mapping of attendee type id to type name for attendees attendee_type_map = {at['id']: at for at in attendee_types} for attendee in attendees: attendee['attendee_type_name'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('name') if attendee.get('attendee_type_id') else None attendee['attendee_type_price'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('price') if attendee.get('attendee_type_id') else None # Get other events by this organizer for batch staff add cursor.execute("SELECT id, name, code FROM events WHERE organizer_id = %s AND id != %s ORDER BY name", (session['user_id'], event['id'])) other_events = cursor.fetchall() cursor.close() conn.close() return render_template('organizer/event_detail.html', event=event, attendees=attendees, breakout_sessions=breakout_sessions, staff=staff, other_events=other_events, attendee_types=attendee_types) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//attendee-types', methods=['GET', 'POST']) @login_required def manage_attendee_types(event_id): """Manage attendee types for an event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event_id,)) attendee_types = cursor.fetchall() if request.method == 'POST': name = request.form.get('name', '').strip() price = request.form.get('price', '').strip() if not name: flash('Type name is required.') cursor.close() conn.close() return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types) try: price_value = float(price) if price else 0.00 except ValueError: price_value = 0.00 type_code = generate_type_code(event_id) cursor.execute(""" INSERT INTO attendee_types (event_id, code, name, price) VALUES (%s, %s, %s, %s) """, (event_id, type_code, name, price_value)) conn.commit() flash(f'Attendee type "{name}" created successfully!') cursor.close() conn.close() return redirect(url_for('manage_attendee_types', event_id=event_id)) cursor.close() conn.close() return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//attendee-type//edit', methods=['GET', 'POST']) @login_required def edit_attendee_type(event_id, type_id): """Edit an attendee type.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id)) attendee_type = cursor.fetchone() if not attendee_type: flash('Attendee type not found.') cursor.close() conn.close() return redirect(url_for('manage_attendee_types', event_id=event_id)) # Check how many attendees are assigned to this type cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE attendee_type_id = %s", (type_id,)) attendee_count = cursor.fetchone()['count'] if request.method == 'POST': name = request.form.get('name', '').strip() price = request.form.get('price', '').strip() if not name: flash('Type name is required.') cursor.close() conn.close() return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type) try: price_value = float(price) if price else 0.00 except ValueError: price_value = 0.00 cursor.execute(""" UPDATE attendee_types SET name = %s, price = %s WHERE id = %s """, (name, price_value, type_id)) conn.commit() flash(f'Attendee type "{name}" updated successfully!') cursor.close() conn.close() return redirect(url_for('manage_attendee_types', event_id=event_id)) cursor.close() conn.close() return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type, attendee_count=attendee_count) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//attendee-type//delete', methods=['POST']) @login_required def delete_attendee_type(event_id, type_id): """Delete an attendee type.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id)) attendee_type = cursor.fetchone() if not attendee_type: flash('Attendee type not found.') cursor.close() conn.close() return redirect(url_for('manage_attendee_types', event_id=event_id)) # Set attendee_type_id to NULL for all attendees with this type cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE attendee_type_id = %s", (type_id,)) # Delete the attendee type cursor.execute("DELETE FROM attendee_types WHERE id = %s", (type_id,)) conn.commit() flash(f'Attendee type "{attendee_type["name"]}" deleted successfully!') cursor.close() conn.close() return redirect(url_for('manage_attendee_types', event_id=event_id)) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//attendees/batch-assign-type', methods=['POST']) @login_required def batch_assign_attendee_type(event_id): """Batch assign attendee type to selected attendees.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) attendee_ids = request.form.getlist('attendee_ids') attendee_type_id = request.form.get('attendee_type_id', '').strip() if not attendee_ids: flash('No attendees selected.') cursor.close() conn.close() return redirect(url_for('event_detail', code=event['code'])) if attendee_type_id: # Validate attendee type exists for this event cursor.execute("SELECT id FROM attendee_types WHERE id = %s AND event_id = %s", (attendee_type_id, event_id)) if not cursor.fetchone(): flash('Invalid attendee type.') cursor.close() conn.close() return redirect(url_for('event_detail', code=event['code'])) for attendee_id in attendee_ids: cursor.execute("UPDATE attendees SET attendee_type_id = %s WHERE id = %s AND event_id = %s", (attendee_type_id, attendee_id, event_id)) conn.commit() flash(f'Successfully assigned type to {len(attendee_ids)} attendee(s).') else: # Remove type from selected attendees for attendee_id in attendee_ids: cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE id = %s AND event_id = %s", (attendee_id, event_id)) conn.commit() flash(f'Successfully removed type from {len(attendee_ids)} attendee(s).') cursor.close() conn.close() return redirect(url_for('event_detail', code=event['code'])) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//staff', methods=['GET', 'POST']) @login_required def manage_event_staff(event_id): """Manage staff for an event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event_id,)) staff_members = cursor.fetchall() cursor.close() conn.close() if request.method == 'POST': first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() email = request.form.get('email', '').strip() if not all([first_name, last_name, email]): flash('First name, last name, and email are required.') return render_template('organizer/event_staff.html', event=event, staff_members=staff_members) conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Check if staff with this email already exists for this event cursor.execute("SELECT id FROM staff WHERE event_id = %s AND email = %s", (event_id, email)) if cursor.fetchone(): flash('A staff member with this email already exists for this event.') cursor.close() conn.close() return render_template('organizer/event_staff.html', event=event, staff_members=staff_members) invite_token = generate_invite_token() staff_code = generate_staff_code() cursor.execute(""" INSERT INTO staff (event_id, email, first_name, last_name, invite_token, staff_code) VALUES (%s, %s, %s, %s, %s, %s) """, (event_id, email, first_name, last_name, invite_token, staff_code)) conn.commit() # Get organizer email for sending cursor.execute("SELECT email FROM organizers WHERE id = %s", (session['user_id'],)) organizer_email = cursor.fetchone()['email'] cursor.close() conn.close() # Send invite email full_name = f"{first_name} {last_name}" if send_staff_invite_email(email, full_name, event['name'], invite_token, organizer_email): flash(f'Staff member added! Invite email sent to {email}.') else: flash(f'Staff member added but failed to send invite email.') return redirect(url_for('event_detail', code=event['code'])) return render_template('organizer/event_staff.html', event=event, staff_members=staff_members) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/staff/invite/', methods=['GET', 'POST']) def staff_invite(token): """Staff invite acceptance page.""" recaptcha_site_key = app.config.get('RECAPTCHA_SITE_KEY', '') try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM staff WHERE invite_token = %s AND invite_used = FALSE", (token,)) staff_member = cursor.fetchone() if not staff_member: flash('Invalid or expired invite link.') cursor.close() conn.close() return redirect(url_for('index')) cursor.execute("SELECT * FROM events WHERE id = %s", (staff_member['event_id'],)) event = cursor.fetchone() cursor.close() conn.close() if request.method == 'POST': password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') recaptcha_response = request.form.get('g-recaptcha-response', '') if recaptcha_site_key and not verify_recaptcha(recaptcha_response): flash('Please complete the reCAPTCHA verification.') return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key) if not password or len(password) < 6: flash('Password must be at least 6 characters.') return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key) if password != confirm_password: flash('Passwords do not match.') return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key) conn = get_db_connection() cursor = conn.cursor() password_hash = hash_password(password) cursor.execute(""" UPDATE staff SET password_hash = %s, invite_used = TRUE, invite_token = NULL WHERE id = %s """, (password_hash, staff_member['id'])) conn.commit() cursor.close() conn.close() flash('Registration complete! Please log in with your email and password.') return redirect(url_for('login')) return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/organizer/event//staff//edit', methods=['GET', 'POST']) @login_required def edit_staff(event_id, staff_id): """Edit a staff member.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id)) staff_member = cursor.fetchone() if not staff_member: flash('Staff member not found.') cursor.close() conn.close() return redirect(url_for('manage_event_staff', event_id=event_id)) if request.method == 'POST': first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() email = request.form.get('email', '').strip() if not all([first_name, last_name, email]): flash('First name, last name, and email are required.') cursor.close() conn.close() return render_template('organizer/edit_staff.html', event=event, staff=staff_member) cursor.execute(""" UPDATE staff SET first_name = %s, last_name = %s, email = %s WHERE id = %s """, (first_name, last_name, email, staff_id)) conn.commit() cursor.close() conn.close() flash('Staff member updated successfully!') return redirect(url_for('manage_event_staff', event_id=event_id)) cursor.close() conn.close() return render_template('organizer/edit_staff.html', event=event, staff=staff_member) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//staff//delete', methods=['POST']) @login_required def delete_staff(event_id, staff_id): """Delete a staff member.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id)) staff_member = cursor.fetchone() if not staff_member: flash('Staff member not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("DELETE FROM staff WHERE id = %s", (staff_id,)) conn.commit() cursor.close() conn.close() flash('Staff member removed successfully!') return redirect(url_for('organizer_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//batch-add-staff', methods=['POST']) @login_required def batch_add_staff(event_id): """Batch add staff from another event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) source_event_id = request.form.get('source_event_id') if not source_event_id: flash('Please select an event.') return redirect(url_for('organizer_dashboard')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Verify target event belongs to organizer and get its code cursor.execute("SELECT code FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) target_event = cursor.fetchone() if not target_event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) # Verify source event belongs to same organizer cursor.execute("SELECT id FROM events WHERE id = %s AND organizer_id = %s", (source_event_id, session['user_id'])) source_event = cursor.fetchone() if not source_event: flash('Source event not found.') cursor.close() conn.close() return redirect(url_for('event_detail', code=target_event['code'])) # Get staff from source event cursor.execute("SELECT first_name, last_name, email FROM staff WHERE event_id = %s", (source_event_id,)) source_staff = cursor.fetchall() if not source_staff: flash('No staff members found in the selected event.') cursor.close() conn.close() return redirect(url_for('event_detail', code=target_event['code'])) # Get existing staff emails in target event to avoid duplicates cursor.execute("SELECT email FROM staff WHERE event_id = %s", (event_id,)) existing_emails = set(row['email'].lower() for row in cursor.fetchall()) # Add staff that don't already exist added_count = 0 for member in source_staff: if member['email'].lower() not in existing_emails: staff_code = generate_staff_code() cursor.execute(""" INSERT INTO staff (event_id, first_name, last_name, email, invite_token, invite_used, created_at, staff_code) VALUES (%s, %s, %s, %s, UUID(), FALSE, NOW(), %s) """, (event_id, member['first_name'], member['last_name'], member['email'], staff_code)) existing_emails.add(member['email'].lower()) added_count += 1 conn.commit() cursor.close() conn.close() flash(f'{added_count} staff member(s) added from other event.') return redirect(url_for('event_detail', code=target_event['code'])) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/breakout-session//delete', methods=['POST']) @login_required def delete_breakout_session(session_id): """Delete a breakout session.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT bs.*, e.organizer_id, e.code FROM breakout_sessions bs JOIN events e ON bs.event_id = e.id WHERE bs.id = %s """, (session_id,)) session_obj = cursor.fetchone() if not session_obj or session_obj['organizer_id'] != session['user_id']: flash('Session not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("DELETE FROM breakout_session_rsvps WHERE breakout_session_id = %s", (session_id,)) cursor.execute("DELETE FROM breakout_sessions WHERE id = %s", (session_id,)) conn.commit() cursor.close() conn.close() flash('Breakout session deleted successfully!') return redirect(url_for('organizer_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/attendee//edit', methods=['GET', 'POST']) @login_required def edit_attendee(attendee_id): """Edit an attendee.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT a.*, e.organizer_id, e.code FROM attendees a JOIN events e ON a.event_id = e.id WHERE a.id = %s """, (attendee_id,)) attendee = cursor.fetchone() if not attendee or attendee['organizer_id'] != session['user_id']: flash('Attendee not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) # Get attendee types for this event cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (attendee['event_id'],)) attendee_types = cursor.fetchall() if request.method == 'POST': first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() email = request.form.get('email', '').strip() organisation = request.form.get('organisation', '').strip() role = request.form.get('role', '').strip() introduction = request.form.get('introduction', '').strip() attendee_type_id = request.form.get('attendee_type_id', '').strip() cursor.execute(""" UPDATE attendees SET first_name = %s, last_name = %s, email = %s, organisation = %s, role = %s, introduction = %s, attendee_type_id = %s WHERE id = %s """, (first_name, last_name, email, organisation, role, introduction, attendee_type_id if attendee_type_id else None, attendee_id)) conn.commit() cursor.close() conn.close() flash('Attendee updated successfully!') return redirect(url_for('organizer_dashboard')) cursor.close() conn.close() return render_template('organizer/edit_attendee.html', attendee=attendee, attendee_types=attendee_types) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/attendee//delete', methods=['POST']) @login_required def delete_attendee(attendee_id): """Delete an attendee.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT a.*, e.organizer_id FROM attendees a JOIN events e ON a.event_id = e.id WHERE a.id = %s """, (attendee_id,)) attendee = cursor.fetchone() if not attendee or attendee['organizer_id'] != session['user_id']: flash('Attendee not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("DELETE FROM attendees WHERE id = %s", (attendee_id,)) conn.commit() cursor.close() conn.close() flash('Attendee deleted successfully!') return redirect(url_for('organizer_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//badges') @login_required def event_badges(event_id): """Badge printing view for event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,)) attendees = cursor.fetchall() cursor.close() conn.close() # Generate QR codes for each attendee for attendee in attendees: qr_data = f"NETEVENT:{event_id}:{attendee['id']}" qr_img = qrcode.make(qr_data, box_size=4, border=1) buffer = io.BytesIO() qr_img.save(buffer, format='PNG') buffer.seek(0) attendee['qr_code'] = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}" return render_template('organizer/badges.html', event=event, attendees=attendees) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//badges/rectangular/download') @login_required def download_rectangular_badges(event_id): """Download rectangular badge PDFs for event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,)) attendees = cursor.fetchall() # Get attendee types for this event cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,)) attendee_types = cursor.fetchall() attendee_types_map = {at['id']: at for at in attendee_types} cursor.close() conn.close() if not attendees: flash('No attendees found for this event.') return redirect(url_for('event_badges', event_id=event_id)) # Generate rectangular badge PDF pdf_buffer = generate_rectangular_badges_pdf(attendees, event, attendee_types_map) filename = f"badges_rectangular_{event.get('code', event_id)}.pdf" return send_file( pdf_buffer, mimetype='application/pdf', as_attachment=True, download_name=filename ) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @app.route('/organizer/event//attendees/excel/download') @login_required def download_attendees_excel(event_id): """Download all attendees as Excel sheet.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: cursor.close() conn.close() flash('Event not found.') return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,)) attendees = cursor.fetchall() # Get attendee types cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,)) attendee_types = cursor.fetchall() attendee_types_map = {at['id']: at for at in attendee_types} cursor.close() conn.close() if not attendees: flash('No attendees found.') return redirect(url_for('event_badges', event_id=event_id)) # Create Excel workbook wb = openpyxl.Workbook() ws = wb.active ws.title = "Attendees" # Header styling header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill("solid", fgColor="333333") header_alignment = Alignment(horizontal="center") headers = ['ID', 'Voornaam', 'Achternaam', 'Email', 'Organisatie', 'Rol', 'Type', 'Ingecheckt', 'Aangemeld op'] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment # Data rows for row, attendee in enumerate(attendees, 2): ws.cell(row=row, column=1, value=attendee.get('id')) ws.cell(row=row, column=2, value=attendee.get('first_name', '')) ws.cell(row=row, column=3, value=attendee.get('last_name', '')) ws.cell(row=row, column=4, value=attendee.get('email', '')) ws.cell(row=row, column=5, value=attendee.get('organisation', '')) ws.cell(row=row, column=6, value=attendee.get('role', '')) ws.cell(row=row, column=7, value=attendee_types_map.get(attendee.get('attendee_type_id'), {}).get('name', '')) ws.cell(row=row, column=8, value='Ja' if attendee.get('checked_in') else 'Nee') ws.cell(row=row, column=9, value=str(attendee.get('created_at', ''))[:19] if attendee.get('created_at') else '') # Auto-fit column widths for col in ws.columns: max_length = 0 column_letter = col[0].column_letter for cell in col: if cell.value: max_length = max(max_length, len(str(cell.value))) ws.column_dimensions[column_letter].width = min(max_length + 2, 30) filename = f"attendees_{event.get('code', event_id)}.xlsx" buffer = io.BytesIO() wb.save(buffer) buffer.seek(0) return send_file( buffer, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=filename ) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) @login_required def event_stats(event_id): """Attendance statistics for event.""" if session.get('user_type') != 'organizer': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() return redirect(url_for('organizer_dashboard')) cursor.execute("SELECT COUNT(*) as total FROM attendees WHERE event_id = %s", (event_id,)) total = cursor.fetchone()['total'] cursor.execute("SELECT COUNT(*) as checked_in FROM attendees WHERE event_id = %s AND checked_in = TRUE", (event_id,)) checked_in = cursor.fetchone()['checked_in'] cursor.close() conn.close() return jsonify({ 'total': total, 'checked_in': checked_in, 'not_checked_in': total - checked_in }) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/organizer/event//checkin/', methods=['POST']) @login_required def checkin_attendee(event_id, attendee_id): """Check in an attendee.""" if session.get('user_type') != 'organizer': return jsonify({'error': 'Access denied'}), 403 try: conn = get_db_connection() cursor = conn.cursor() # Check if already checked in cursor.execute( "SELECT checked_in FROM attendees WHERE id = %s AND event_id = %s", (attendee_id, event_id) ) result = cursor.fetchone() if not result: cursor.close() conn.close() return jsonify({'error': 'Attendee not found'}), 404 already_checked_in = result[0] cursor.execute( "UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s", (attendee_id, event_id) ) conn.commit() # Get attendee name for response cursor.execute( "SELECT first_name, last_name FROM attendees WHERE id = %s", (attendee_id,) ) attendee = cursor.fetchone() cursor.close() conn.close() return jsonify({ 'success': True, 'already_checked_in': bool(already_checked_in), 'attendee_name': f"{attendee[0]} {attendee[1]}" if attendee else 'Unknown' }) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/organizer/event//checkin/code', methods=['POST']) @login_required def checkin_by_code(event_id): """Check in an attendee by their unique attendee code.""" if session.get('user_type') not in ['organizer', 'staff']: return jsonify({'error': 'Access denied'}), 403 data = request.get_json() attendee_code = data.get('attendee_code') if data else None if not attendee_code: return jsonify({'error': 'Attendee code required'}), 400 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Find attendee by code and event cursor.execute( "SELECT id, first_name, last_name, checked_in FROM attendees WHERE attendee_code = %s AND event_id = %s", (attendee_code, event_id) ) attendee = cursor.fetchone() if not attendee: cursor.close() conn.close() return jsonify({'error': 'Attendee not found'}), 404 already_checked_in = attendee['checked_in'] cursor.execute( "UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s", (attendee['id'], event_id) ) conn.commit() cursor.close() conn.close() return jsonify({ 'success': True, 'already_checked_in': bool(already_checked_in), 'attendee_name': f"{attendee['first_name']} {attendee['last_name']}" }) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/organizer/event//scan') @login_required def event_scan(event_id): """QR code scanner page for check-in.""" if session.get('user_type') not in ['organizer', 'staff']: flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Organizers can access any event they own, staff can only access their event if session.get('user_type') == 'organizer': cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id'])) else: cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,)) # Verify staff belongs to this event if session.get('event_id') != event_id: flash('Access denied.') cursor.close() conn.close() return redirect(url_for('index')) event = cursor.fetchone() if not event: flash('Event not found.') cursor.close() conn.close() if session.get('user_type') == 'organizer': return redirect(url_for('organizer_dashboard')) return redirect(url_for('staff_event_dashboard', event_id=session.get('event_id'))) cursor.execute("SELECT id, first_name, last_name, checked_in FROM attendees WHERE event_id = %s", (event_id,)) attendees = cursor.fetchall() cursor.close() conn.close() if session.get('user_type') == 'staff': return render_template('staff/scan.html', event=event, attendees=attendees) return render_template('organizer/scan.html', event=event, attendees=attendees) except Error as e: flash(f'Database error: {e}') return redirect(url_for('organizer_dashboard')) # Routes - Attendee @app.route('/attendee/dashboard') @login_required def attendee_dashboard(): """Attendee dashboard.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],)) event = cursor.fetchone() if event: # Get attendee info cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],)) attendee = cursor.fetchone() # Get connections cursor.execute(""" SELECT c.*, a.first_name, a.last_name, a.organisation, a.role FROM connections c JOIN attendees a ON c.connected_attendee_id = a.id WHERE c.attendee_id = %s AND c.status = 'accepted' """, (session['user_id'],)) connections = cursor.fetchall() # Get pending connections cursor.execute(""" SELECT c.*, a.first_name, a.last_name, a.organisation, a.role FROM connections c JOIN attendees a ON c.attendee_id = a.id WHERE c.connected_attendee_id = %s AND c.status = 'pending' """, (session['user_id'],)) pending_connections = cursor.fetchall() # Get appointments cursor.execute(""" SELECT ap.*, a.first_name as requester_first_name, a.last_name as requester_last_name, at.first_name as target_first_name, at.last_name as target_last_name FROM appointments ap JOIN attendees a ON ap.requester_id = a.id JOIN attendees at ON ap.target_id = at.id WHERE ap.requester_id = %s OR ap.target_id = %s ORDER BY ap.appointment_time """, (session['user_id'], session['user_id'])) appointments = cursor.fetchall() else: event = None attendee = None connections = [] pending_connections = [] appointments = [] cursor.close() conn.close() return render_template('attendee/dashboard.html', event=event, attendee=attendee, connections=connections, pending_connections=pending_connections, appointments=appointments) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/attendee/badge/download') @login_required def download_badge(): """Generate and download attendee badge PDF.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get attendee info cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],)) attendee = cursor.fetchone() # Get event info cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],)) event = cursor.fetchone() cursor.close() conn.close() if not attendee: flash('Attendee not found.') return redirect(url_for('attendee_dashboard')) # Generate PDF pdf_buffer = generate_badge_pdf(attendee, event) return send_file( pdf_buffer, mimetype='application/pdf', as_attachment=True, download_name=f'badge_{attendee["first_name"]}_{attendee["last_name"]}.pdf' ) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/attendee/badge/email', methods=['POST']) @login_required def email_badge(): """Send badge PDF to attendee via email.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get attendee info cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],)) attendee = cursor.fetchone() # Get event info cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],)) event = cursor.fetchone() cursor.close() conn.close() if not attendee: flash('Attendee not found.') return redirect(url_for('attendee_dashboard')) # Generate PDF pdf_buffer = generate_badge_pdf(attendee, event) # Send email if send_badge_email(attendee['email'], f"{attendee['first_name']} {attendee['last_name']}", event['name'] if event else 'Event', pdf_buffer): flash('Badge sent to your email!') else: flash('Failed to send badge email. Please try again.') return redirect(url_for('attendee_dashboard')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/attendee/profile', methods=['GET', 'POST']) @login_required def attendee_profile(): """Attendee profile page.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],)) attendee = cursor.fetchone() cursor.close() conn.close() if request.method == 'POST': first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() organisation = request.form.get('organisation', '').strip() role = request.form.get('role', '').strip() introduction = request.form.get('introduction', '').strip() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE attendees SET first_name = %s, last_name = %s, organisation = %s, role = %s, introduction = %s, phone = %s, linkedin = %s WHERE id = %s """, (first_name, last_name, organisation, role, introduction, request.form.get('phone', '').strip(), request.form.get('linkedin', '').strip(), session['user_id'])) conn.commit() cursor.close() conn.close() flash('Profile updated successfully!') return redirect(url_for('attendee_profile')) return render_template('attendee/profile.html', attendee=attendee) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/attendee/photo', methods=['POST']) @login_required def upload_photo(): """Upload profile photo.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 if 'photo' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['photo'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if file: # Generate unique filename filename = f"{uuid.uuid4()}.jpg" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) # Delete old photo if exists conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT profile_picture FROM attendees WHERE id = %s", (session['user_id'],)) old_photo = cursor.fetchone()['profile_picture'] if old_photo: old_path = os.path.join(app.config['UPLOAD_FOLDER'], old_photo) if os.path.exists(old_path): os.remove(old_path) file.save(filepath) cursor.execute("UPDATE attendees SET profile_picture = %s WHERE id = %s", (filename, session['user_id'])) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'filename': filename}) return jsonify({'error': 'Invalid file type'}), 400 # Routes - Attendees list and connections @app.route('/attendees') @login_required def list_attendees(): """List other attendees at the same event.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT a.*, (SELECT status FROM connections WHERE attendee_id = %s AND connected_attendee_id = a.id) as my_status FROM attendees a WHERE a.event_id = %s AND a.id != %s """, (session['user_id'], session['event_id'], session['user_id'])) attendees = cursor.fetchall() cursor.close() conn.close() return render_template('attendee/attendees.html', attendees=attendees) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/connections', methods=['POST']) @login_required def create_connection(): """Send a connection request.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 connected_id = request.form.get('connected_attendee_id') if not connected_id: return jsonify({'error': 'Attendee ID required'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # Check if connection already exists cursor.execute(""" SELECT id FROM connections WHERE attendee_id = %s AND connected_attendee_id = %s """, (session['user_id'], connected_id)) if cursor.fetchone(): cursor.close() conn.close() return jsonify({'error': 'Connection already exists'}), 400 cursor.execute(""" INSERT INTO connections (attendee_id, connected_attendee_id, status) VALUES (%s, %s, 'pending') """, (session['user_id'], connected_id)) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/connections/', methods=['PUT']) @login_required def update_connection(connection_id): """Accept or reject a connection request.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 status = request.json.get('status') if request.json else request.form.get('status') if status not in ['accepted', 'rejected']: return jsonify({'error': 'Invalid status'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # Verify this connection is targeting the current user cursor.execute(""" SELECT id FROM connections WHERE id = %s AND connected_attendee_id = %s AND status = 'pending' """, (connection_id, session['user_id'])) if not cursor.fetchone(): cursor.close() conn.close() return jsonify({'error': 'Connection not found'}), 404 cursor.execute("UPDATE connections SET status = %s WHERE id = %s", (status, connection_id)) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/attendee/scan') @login_required def attendee_scan(): """QR scanner page for attendees to scan each other.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) return render_template('attendee/scan.html') @app.route('/attendee/scan-request', methods=['POST']) @login_required def scan_request_connection(): """Send a connection request when scanning someone's QR code.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 scanned_id = request.json.get('scanned_id') if request.json else request.form.get('scanned_id') if not scanned_id: return jsonify({'error': 'Scanned attendee ID required'}), 400 scanned_id = int(scanned_id) # Can't connect to yourself if scanned_id == session['user_id']: return jsonify({'error': 'Cannot connect to yourself'}), 400 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Verify scanned attendee exists and is at same event cursor.execute(""" SELECT id FROM attendees WHERE id = %s AND event_id = %s """, (scanned_id, session['event_id'])) if not cursor.fetchone(): cursor.close() conn.close() return jsonify({'error': 'Attendee not found at this event'}), 404 # Check if connection already exists cursor.execute(""" SELECT id, status FROM connections WHERE attendee_id = %s AND connected_attendee_id = %s """, (session['user_id'], scanned_id)) existing = cursor.fetchone() if existing: if existing['status'] == 'accepted': cursor.close() conn.close() return jsonify({'error': 'Already connected'}), 400 elif existing['status'] == 'pending': cursor.close() conn.close() return jsonify({'error': 'Request already pending'}), 400 else: # Rejected - allow new request cursor.execute(""" UPDATE connections SET status = 'pending', attendee_id = %s, connected_attendee_id = %s WHERE id = %s """, (session['user_id'], scanned_id, existing['id'])) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'message': 'Request sent, awaiting approval'}) else: # Create new connection request cursor.execute(""" INSERT INTO connections (attendee_id, connected_attendee_id, status) VALUES (%s, %s, 'pending') """, (session['user_id'], scanned_id)) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'message': 'Request sent, awaiting approval'}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/attendee/connection-requests') @login_required def connection_requests(): """Show pending connection requests for the current user.""" if session.get('user_type') != 'attendee': flash('Access denied.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Get pending requests where current user is the target cursor.execute(""" SELECT c.*, a.first_name, a.last_name, a.email, a.organisation, a.role, a.introduction, a.profile_picture FROM connections c JOIN attendees a ON c.attendee_id = a.id WHERE c.connected_attendee_id = %s AND c.status = 'pending' ORDER BY c.created_at DESC """, (session['user_id'],)) requests = cursor.fetchall() cursor.close() conn.close() return render_template('attendee/connection_requests.html', requests=requests) except Error as e: flash(f'Database error: {e}') return redirect(url_for('attendee_dashboard')) @app.route('/attendee/connection-request/', methods=['POST']) @login_required def respond_to_connection(connection_id): """Approve or reject a connection request.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 action = request.json.get('action') if request.json else request.form.get('action') if action not in ['approve', 'reject']: return jsonify({'error': 'Invalid action'}), 400 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Verify this request is for the current user cursor.execute(""" SELECT c.*, a.first_name, a.last_name, a.profile_picture, a.email, a.organisation, a.role, a.introduction FROM connections c JOIN attendees a ON c.attendee_id = a.id WHERE c.id = %s AND c.connected_attendee_id = %s AND c.status = 'pending' """, (connection_id, session['user_id'])) connection = cursor.fetchone() if not connection: cursor.close() conn.close() return jsonify({'error': 'Connection request not found'}), 404 if action == 'approve': cursor.execute("UPDATE connections SET status = 'accepted' WHERE id = %s", (connection_id,)) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'status': 'accepted'}) else: cursor.execute("UPDATE connections SET status = 'rejected' WHERE id = %s", (connection_id,)) conn.commit() cursor.close() conn.close() return jsonify({'success': True, 'status': 'rejected'}) except Error as e: return jsonify({'error': str(e)}), 500 # Routes - Appointments @app.route('/appointments', methods=['POST']) @login_required def create_appointment(): """Request an appointment.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 target_id = request.form.get('target_id') appointment_time = request.form.get('appointment_time') location = request.form.get('location', '').strip() notes = request.form.get('notes', '').strip() if not all([target_id, appointment_time]): return jsonify({'error': 'Target and time are required'}), 400 try: apt_datetime = datetime.strptime(appointment_time, '%Y-%m-%dT%H:%M') except ValueError: return jsonify({'error': 'Invalid datetime format'}), 400 try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO appointments (event_id, requester_id, target_id, appointment_time, location, notes, status) VALUES (%s, %s, %s, %s, %s, %s, 'pending') """, (session['event_id'], session['user_id'], target_id, apt_datetime, location, notes)) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 @app.route('/appointments/', methods=['PUT']) @login_required def update_appointment(appointment_id): """Accept or reject an appointment.""" if session.get('user_type') != 'attendee': return jsonify({'error': 'Access denied'}), 403 status = request.json.get('status') if request.json else request.form.get('status') if status not in ['accepted', 'rejected']: return jsonify({'error': 'Invalid status'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # Verify user is the target of this appointment cursor.execute(""" SELECT id FROM appointments WHERE id = %s AND target_id = %s AND status = 'pending' """, (appointment_id, session['user_id'])) if not cursor.fetchone(): cursor.close() conn.close() return jsonify({'error': 'Appointment not found'}), 404 cursor.execute("UPDATE appointments SET status = %s WHERE id = %s", (status, appointment_id)) conn.commit() cursor.close() conn.close() return jsonify({'success': True}) except Error as e: return jsonify({'error': str(e)}), 500 # Routes - Public event listing @app.route('/events') def list_events(): """List all upcoming public events.""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT e.*, o.name as organizer_name, (SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count FROM events e JOIN organizers o ON e.organizer_id = o.id WHERE e.start_time >= NOW() ORDER BY e.start_time ASC """) events = cursor.fetchall() cursor.close() conn.close() return render_template('index.html', events=events, show_events=True) except Error as e: flash(f'Database error: {e}') return render_template('index.html') @app.route('/event/register/', methods=['GET', 'POST']) @app.route('/event/register//', methods=['GET', 'POST']) def register_event(code, type_code=None): """Registration page for an event with breakout session registration.""" # Look up attendee type if type_code is provided preselected_type = None if type_code: try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM attendee_types WHERE code = %s", (type_code,)) preselected_type = cursor.fetchone() cursor.close() conn.close() if not preselected_type: flash('Invalid attendee type.') return redirect(url_for('index')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE code = %s", (code,)) event = cursor.fetchone() cursor.close() conn.close() if not event: flash('Event not found.') return redirect(url_for('index')) # Verify type_code belongs to this event if preselected_type and preselected_type['event_id'] != event['id']: flash('Invalid attendee type for this event.') return redirect(url_for('index')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) # Get breakout sessions for the event try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT bs.*, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (event['id'],)) sessions = cursor.fetchall() cursor.close() conn.close() except Error as e: sessions = [] # Check if user is already registered (for showing breakout sessions) registered = False user_id = session.get('user_id') user_type = session.get('user_type') if user_id and user_type == 'attendee' and session.get('event_id') == event['id']: registered = True # Add my_rsvp_status to sessions for session_item in sessions: try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT status FROM breakout_session_rsvps WHERE breakout_session_id = %s AND attendee_id = %s """, (session_item['id'], user_id)) result = cursor.fetchone() session_item['my_rsvp_status'] = result['status'] if result else None cursor.close() conn.close() except Error: session_item['my_rsvp_status'] = None else: for session_item in sessions: session_item['my_rsvp_status'] = None # Handle registration form submission if request.method == 'POST' and not registered: first_name = request.form.get('first_name', '').strip() last_name = request.form.get('last_name', '').strip() email = request.form.get('email', '').strip() password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') organisation = request.form.get('organisation', '').strip() role = request.form.get('role', '').strip() phone = request.form.get('phone', '').strip() linkedin = request.form.get('linkedin', '').strip() introduction = request.form.get('introduction', '').strip() selected_sessions = request.form.getlist('breakout_sessions') if not all([first_name, last_name, email, password]): flash('First name, last name, email, and password are required.') return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type) if password != confirm_password: flash('Passwords do not match.') return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Check if email already registered for this event cursor.execute( "SELECT id FROM attendees WHERE email = %s AND event_id = %s", (email, event['id']) ) if cursor.fetchone(): flash('Email already registered for this event. Please log in.') cursor.close() conn.close() return redirect(url_for('login')) # Check if this type requires payment if preselected_type and preselected_type.get('price') and preselected_type.get('price') > 0: # Store registration data in session for payment processing session['pending_registration'] = { 'event_id': event['id'], 'email': email, 'password': password, 'first_name': first_name, 'last_name': last_name, 'organisation': organisation, 'role': role, 'phone': phone, 'linkedin': linkedin, 'introduction': introduction, 'selected_sessions': selected_sessions, 'attendee_type_id': preselected_type['id'], 'type_name': preselected_type['name'], 'price': preselected_type['price'] } cursor.close() conn.close() return redirect(url_for('payment_page', event_code=event['code'])) # Complete registration directly for free types confirmation_token = uuid.uuid4().hex password_hash = hash_password(password) attendee_code = generate_attendee_code() cursor.execute(""" INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (event['id'], email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, preselected_type['id'] if preselected_type else None)) attendee_id = cursor.lastrowid # Process breakout session selections for session_id in selected_sessions: cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s", (session_id, event['id'])) session_data = cursor.fetchone() if session_data: if session_data['max_attendees']: cursor.execute(""" SELECT COUNT(*) as count FROM breakout_session_rsvps WHERE breakout_session_id = %s AND status = 'registered' """, (session_id,)) count = cursor.fetchone()['count'] if count >= session_data['max_attendees']: continue cursor.execute(""" INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status) VALUES (%s, %s, 'registered') """, (session_id, attendee_id)) conn.commit() cursor.close() conn.close() event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced' personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True) send_attendee_confirmation_email(email, first_name, event['name'], event_date, event['location'], personal_page_url) session['user_id'] = attendee_id session['user_type'] = 'attendee' session['event_id'] = event['id'] flash('Registration successful!') return redirect(url_for('attendee_personal_page', token=confirmation_token)) except Error as e: flash(f'Database error: {e}') return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=registered, preselected_type=preselected_type) @app.route('/event//payment', methods=['GET', 'POST']) def payment_page(code): """Payment page for paid attendee types.""" pending = session.get('pending_registration') if not pending or pending.get('event_id') is None: flash('No pending registration found.') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM events WHERE code = %s", (code,)) event = cursor.fetchone() cursor.close() conn.close() if not event: flash('Event not found.') return redirect(url_for('index')) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) if request.method == 'POST': # Process payment (simulated - in production, integrate with payment gateway like Stripe) # For now, we just complete the registration try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Generate confirmation token confirmation_token = uuid.uuid4().hex password_hash = hash_password(pending['password']) attendee_code = generate_attendee_code() cursor.execute(""" INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (pending['event_id'], pending['email'], password_hash, pending['first_name'], pending['last_name'], pending['organisation'], pending['role'], pending['phone'], pending['linkedin'], pending['introduction'], confirmation_token, attendee_code, pending['attendee_type_id'])) attendee_id = cursor.lastrowid # Process breakout session selections for session_id in pending.get('selected_sessions', []): cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s", (session_id, pending['event_id'])) session_data = cursor.fetchone() if session_data: if session_data['max_attendees']: cursor.execute(""" SELECT COUNT(*) as count FROM breakout_session_rsvps WHERE breakout_session_id = %s AND status = 'registered' """, (session_id,)) count = cursor.fetchone()['count'] if count >= session_data['max_attendees']: continue cursor.execute(""" INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status) VALUES (%s, %s, 'registered') """, (session_id, attendee_id)) conn.commit() cursor.close() conn.close() # Clear pending registration session.pop('pending_registration', None) # Send confirmation email event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced' personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True) send_attendee_confirmation_email(pending['email'], pending['first_name'], event['name'], event_date, event['location'], personal_page_url) # Auto-login the attendee session['user_id'] = attendee_id session['user_type'] = 'attendee' session['event_id'] = pending['event_id'] flash('Registration and payment successful!') return redirect(url_for('attendee_personal_page', token=confirmation_token)) except Error as e: flash(f'Database error: {e}') return render_template('attendee/payment.html', event=event, pending=pending) @app.route('/attendee/request-link', methods=['GET', 'POST']) def request_attendee_link(): """Page to request a personal page link by email.""" if request.method == 'POST': email = request.form.get('email', '').strip() if not email: flash('Email is required.') return render_template('attendee/request_link.html') try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Check if attendee exists with this email cursor.execute("SELECT * FROM attendees WHERE email = %s", (email,)) attendee = cursor.fetchone() if attendee: # Use confirmation_token if available, otherwise attendee_code token = attendee['confirmation_token'] if attendee['confirmation_token'] else attendee['attendee_code'] personal_page_url = url_for('attendee_personal_page', token=token, _external=True) full_name = f"{attendee['first_name']} {attendee['last_name']}" # Get event name cursor.execute("SELECT name FROM events WHERE id = %s", (attendee['event_id'],)) event = cursor.fetchone() event_name = event['name'] if event else 'Event' # Send email send_attendee_confirmation_email( attendee_email=email, attendee_name=full_name, event_name=event_name, event_date='See your profile', event_location='See your profile', personal_page_url=personal_page_url ) cursor.close() conn.close() # Always show success message for security (don't reveal if email exists) flash(t('email_sent')) return render_template('attendee/request_link.html') except Exception as e: flash(f'Error: {e}') return render_template('attendee/request_link.html') return render_template('attendee/request_link.html') @app.route('/attendee/personal/') def attendee_personal_page(token): """Personal page accessed via confirmation email link.""" client_ip = request.remote_addr # Check if IP is blocked if is_ip_blocked_for_personal_page(client_ip): flash(f'Too many invalid attempts. Please try again later (blocked for 30 minutes).') return redirect(url_for('index')) try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # Find attendee by confirmation token cursor.execute("SELECT * FROM attendees WHERE confirmation_token = %s", (token,)) attendee = cursor.fetchone() # Fallback: also accept attendee_code for old links where token was cleared if not attendee: cursor.execute("SELECT * FROM attendees WHERE attendee_code = %s", (token,)) attendee = cursor.fetchone() if not attendee: record_failed_personal_page_attempt(client_ip) flash('Invalid link.') cursor.close() conn.close() return redirect(url_for('index')) # Clear failed attempts on successful access clear_failed_personal_page_attempts(client_ip) # Auto-login the attendee # Note: confirmation_token is kept so the link remains valid indefinitely session['user_id'] = attendee['id'] session['user_type'] = 'attendee' session['event_id'] = attendee['event_id'] # Get all events for this attendee cursor.execute(""" SELECT e.* FROM events e WHERE e.id = %s """, (attendee['event_id'],)) events = cursor.fetchall() # For each event, get breakout sessions with RSVP status for event in events: cursor.execute(""" SELECT bs.*, (SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count, (SELECT status FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status FROM breakout_sessions bs WHERE bs.event_id = %s ORDER BY bs.start_time """, (attendee['id'], event['id'])) event['breakout_sessions'] = cursor.fetchall() cursor.close() conn.close() return render_template('attendee/personal.html', attendee=attendee, events=events) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) @app.route('/event/') def view_event(event_id): """View event details.""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT e.*, o.name as organizer_name, (SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count FROM events e JOIN organizers o ON e.organizer_id = o.id WHERE e.id = %s """, (event_id,)) event = cursor.fetchone() cursor.close() conn.close() if not event: flash('Event not found.') return redirect(url_for('index')) return render_template('attendee/event.html', event=event) except Error as e: flash(f'Database error: {e}') return redirect(url_for('index')) if __name__ == '__main__': # Ensure staff_code column exists in organizers and staff tables ensure_staff_code_column() ensure_all_organizers_have_staff_code() ensure_staff_staff_code_column() ensure_all_staff_have_staff_code() app.run(debug=True, host='0.0.0.0', port=5002)