3b14155594
Also update generate_type_code() to accept event_id parameter for proper per-event uniqueness checking. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4696 lines
172 KiB
Python
4696 lines
172 KiB
Python
import base64
|
|
import io
|
|
import os
|
|
import random
|
|
import smtplib
|
|
import string
|
|
import uuid
|
|
import requests
|
|
from datetime import datetime
|
|
from email.encoders import encode_base64
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from functools import wraps
|
|
|
|
import bcrypt
|
|
import mysql.connector
|
|
import qrcode
|
|
from babel import Locale
|
|
from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for, send_file
|
|
from flask_cors import CORS
|
|
from flask_babel import Babel, gettext as _, ngettext, force_locale
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.lib.units import mm
|
|
from reportlab.pdfgen import canvas
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
from mysql.connector import Error
|
|
|
|
from config import Config
|
|
|
|
app = Flask(__name__, template_folder='templates', static_folder='static')
|
|
app.config.from_object(Config)
|
|
CORS(app)
|
|
|
|
# Supported locales
|
|
SUPPORTED_LOCALES = ['en', 'nl', 'de', 'fr', 'es', 'it', 'pl']
|
|
|
|
|
|
def get_locale():
|
|
"""Determine the best locale based on URL parameter, cookie, session, or user preference."""
|
|
from flask import session, request
|
|
from config import Config
|
|
|
|
# 1. Check URL parameter
|
|
lang = request.args.get('lang')
|
|
if lang and lang in SUPPORTED_LOCALES:
|
|
session['locale'] = lang
|
|
return lang
|
|
|
|
# 2. Check session
|
|
if 'locale' in session and session['locale'] in SUPPORTED_LOCALES:
|
|
return session['locale']
|
|
|
|
# 3. Check cookie (for returning visitors)
|
|
lang = request.cookies.get('locale')
|
|
if lang and lang in SUPPORTED_LOCALES:
|
|
session['locale'] = lang
|
|
return lang
|
|
|
|
# 4. Check user preference from database
|
|
if 'user_id' in session and 'user_type' in session:
|
|
user_lang = get_user_preferred_language(session['user_id'], session.get('user_type'))
|
|
if user_lang:
|
|
return user_lang
|
|
|
|
# 5. Default
|
|
return Config.BABEL_DEFAULT_LOCALE
|
|
|
|
|
|
def get_locale_string():
|
|
"""Return locale as string for Babel."""
|
|
locale = get_locale()
|
|
# Convert 'en' to 'en_US' format if needed, but Babel 4.0 should handle short codes
|
|
return locale
|
|
|
|
|
|
# Initialize Babel with locale_selector
|
|
babel = Babel()
|
|
babel.init_app(
|
|
app,
|
|
default_locale=Config.BABEL_DEFAULT_LOCALE,
|
|
default_translation_directories=Config.BABEL_TRANSLATION_DIRECTORIES,
|
|
locale_selector=get_locale_string
|
|
)
|
|
|
|
|
|
def get_user_preferred_language(user_id, user_type):
|
|
"""Get user's preferred language from database."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
if user_type == 'organizer':
|
|
cursor.execute("SELECT preferred_language FROM organizers WHERE id = %s", (user_id,))
|
|
elif user_type == 'attendee':
|
|
cursor.execute("SELECT preferred_language FROM attendees WHERE id = %s", (user_id,))
|
|
elif user_type == 'staff':
|
|
cursor.execute("SELECT preferred_language FROM staff WHERE id = %s", (user_id,))
|
|
else:
|
|
cursor.close()
|
|
conn.close()
|
|
return None
|
|
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if result and result.get('preferred_language') in SUPPORTED_LOCALES:
|
|
return result['preferred_language']
|
|
except Error:
|
|
pass
|
|
return None
|
|
|
|
|
|
# English translations map (replaces database translations)
|
|
ENGLISH_TRANSLATIONS = {
|
|
'dashboard': 'Dashboard',
|
|
'logout': 'Logout',
|
|
'login': 'Login',
|
|
'organizer_login': 'Organizer Login',
|
|
'register': 'Register',
|
|
'events': 'Events',
|
|
'attendees': 'Attendees',
|
|
'profile': 'Profile',
|
|
'create_event': 'Create Event',
|
|
'edit_event': 'Edit Event',
|
|
'delete_event': 'Delete Event',
|
|
'event_details': 'Event Details',
|
|
'staff': 'Staff',
|
|
'add_staff': 'Add Staff',
|
|
'remove_staff': 'Remove Staff',
|
|
'breakout_sessions': 'Break-out Sessions',
|
|
'registered_attendees': 'Registered Attendees',
|
|
'checked_in': 'Checked In',
|
|
'not_checked_in': 'Not Checked In',
|
|
'invite_pending': 'Invite Pending',
|
|
'active': 'Active',
|
|
'name': 'Name',
|
|
'email': 'Email',
|
|
'status': 'Status',
|
|
'actions': 'Actions',
|
|
'start': 'Start',
|
|
'end': 'End',
|
|
'location': 'Location',
|
|
'max_attendees': 'Max Attendees',
|
|
'registration_link': 'Registration Link',
|
|
'details': 'Details',
|
|
'unlimited': 'unlimited',
|
|
'registered': 'registered',
|
|
'my_events': 'My Events',
|
|
'no_events_yet': "You haven't created any events yet.",
|
|
'create_first_event': 'Create your first event',
|
|
'organizer': 'Organizer',
|
|
'first_name': 'First Name',
|
|
'last_name': 'Last Name',
|
|
'description': 'Description',
|
|
'view': 'View',
|
|
'remove': 'Remove',
|
|
'add_attendee': 'Add Attendee',
|
|
'add_breakout_session': 'Add Break-out Session',
|
|
'add_staff_member': 'Add Staff Member',
|
|
'no_staff_yet': 'No staff members added yet.',
|
|
'no_breakout_sessions_yet': 'No break-out sessions created yet.',
|
|
'confirm_remove_staff': 'Are you sure you want to remove',
|
|
'from_event': 'from this event',
|
|
'remove_staff_member': 'Remove Staff Member',
|
|
'time': 'Time',
|
|
'organisation': 'Organisation',
|
|
'role': 'Role',
|
|
'role_profession': 'Role / Profession',
|
|
'phone': 'Phone',
|
|
'linkedin': 'LinkedIn',
|
|
'about_me': 'About Me',
|
|
'cancel': 'Cancel',
|
|
'welcome': 'Welcome to NetEvents',
|
|
'connect_with_professionals': 'Connect with professionals at networking events',
|
|
'register_as_organizer': 'Register as Organizer',
|
|
'already_have_account': 'Already have an account?',
|
|
'dont_have_account': "Don't have an account?",
|
|
'click_here': 'Click here',
|
|
'login_as_organiser': 'login as organiser',
|
|
'attendee_profile_link': 'Are you an attendee and want to check your personal profile page?',
|
|
'attendee_profile_url': '/attendee/personal/',
|
|
'request_profile_link': 'Request Your Personal Page Link',
|
|
'request_link_description': 'Enter your email address and we will send you a link to access your personal attendee profile page.',
|
|
'send_link': 'Send Link',
|
|
'email_sent': 'If that email is registered, we have sent the link.',
|
|
'email_not_found': 'No attendee found with that email address.',
|
|
'remember_password': 'Remember your password?',
|
|
'not_yet_registered': 'Not yet registered?',
|
|
'register_for_free': 'Register for FREE!',
|
|
'presenter': 'Presenter',
|
|
'visitor': 'Visitor',
|
|
'organiser': 'Organiser',
|
|
'upcoming_events': 'Upcoming Events',
|
|
'no_upcoming_events': 'No upcoming events at the moment. Check back soon!',
|
|
'platform_features': 'Platform Features',
|
|
'for_organizers': 'For Organizers',
|
|
'for_attendees': 'For Attendees',
|
|
'view_event': 'View Event',
|
|
'translation_management': 'Translation Management',
|
|
'key': 'Key',
|
|
'translation': 'Translation',
|
|
'language': 'Language',
|
|
'save': 'Save',
|
|
'add': 'Add',
|
|
'translations': 'Translations',
|
|
'presenter_dashboard': 'Presenter Dashboard',
|
|
'my_breakout_sessions': 'My Break-out Sessions',
|
|
'event': 'Event',
|
|
'scan_qr_code': 'Scan QR Code',
|
|
'scan_qr_to_connect': 'Scan QR to connect!',
|
|
'scan_instructions': 'Point your camera at an attendee QR code to check them in',
|
|
'camera_access': 'Camera Access',
|
|
'camera_permission_denied': 'Camera permission denied. Please allow camera access to scan QR codes.',
|
|
'error_reading_qr': 'Error reading QR code',
|
|
'attendee_not_found': 'Attendee not found',
|
|
'already_checked_in': 'Already checked in',
|
|
'check_in_successful': 'Check-in successful',
|
|
'password': 'Password',
|
|
'confirm_password': 'Confirm Password',
|
|
'complete_registration': 'Complete Registration',
|
|
'youre_invited': "You're invited to join",
|
|
'set_password_complete': 'Set a password to complete your registration',
|
|
'your_profile': 'Your Profile',
|
|
'edit_profile': 'Edit Profile',
|
|
'save_profile': 'Save Profile',
|
|
'profile_updated': 'Profile updated successfully',
|
|
'connection_requests': 'Connection Requests',
|
|
'my_connections': 'My Connections',
|
|
'send_connection_request': 'Send Connection Request',
|
|
'connection_sent': 'Connection request sent',
|
|
'accept': 'Accept',
|
|
'reject': 'Reject',
|
|
'pending': 'Pending',
|
|
'accepted': 'Accepted',
|
|
'rejected': 'Rejected',
|
|
'appointments': 'Appointments',
|
|
'request_appointment': 'Request Appointment',
|
|
'appointment_time': 'Appointment Time',
|
|
'appointment_notes': 'Notes',
|
|
'appointment_location': 'Location',
|
|
'request_sent': 'Appointment request sent',
|
|
'appointments_with': 'Appointments with',
|
|
'no_appointments': 'No appointments yet',
|
|
'breakout_session': 'Break-out Session',
|
|
'breakout_session_detail': 'Break-out Session Details',
|
|
'speaker': 'Speaker',
|
|
'session_full': 'Session Full',
|
|
'register_for_session': 'Register for Session',
|
|
'registered_for_session': 'Registered for Session',
|
|
'unregister_from_session': 'Unregister from Session',
|
|
'check_in': 'Check In',
|
|
'check_out': 'Check Out',
|
|
'event_checked_in': 'Event Checked In',
|
|
'event_checked_out': 'Event Checked Out',
|
|
'your_events': 'Your Events',
|
|
'past_events': 'Past Events',
|
|
'no_past_events': 'No past events',
|
|
'event_not_found': 'Event not found',
|
|
'invalid_invite_link': 'Invalid or expired invite link',
|
|
'password_mismatch': 'Passwords do not match',
|
|
'password_too_short': 'Password must be at least 8 characters',
|
|
'registration_successful': 'Registration successful',
|
|
'login_successful': 'Login successful',
|
|
'logout_successful': 'Logout successful',
|
|
'access_denied': 'Access denied',
|
|
'page_not_found': 'Page not found',
|
|
'internal_error': 'Internal server error',
|
|
'db_error': 'Database error',
|
|
'error_adding_staff': 'Error adding staff member',
|
|
'error_checking_in': 'Error checking in attendee',
|
|
'badge_printed': 'Badge printed',
|
|
'no_events': 'No events found',
|
|
'create_event': 'Create Event',
|
|
'edit_breakout_session': 'Edit Break-out Session',
|
|
'delete_breakout_session': 'Delete Break-out Session',
|
|
'view_all': 'View All',
|
|
'loading': 'Loading...',
|
|
'submit': 'Submit',
|
|
'close': 'Close',
|
|
'back': 'Back',
|
|
'next': 'Next',
|
|
'previous': 'Previous',
|
|
'search': 'Search',
|
|
'filter': 'Filter',
|
|
'sort_by': 'Sort by',
|
|
'ascending': 'Ascending',
|
|
'descending': 'Descending',
|
|
'rows_per_page': 'Rows per page',
|
|
'no_data': 'No data available',
|
|
'staff_dashboard': 'Staff Dashboard',
|
|
'select_event': 'Select an Event',
|
|
'no_events_assigned': 'No events assigned to you yet',
|
|
'start_qr_scanner': 'Start QR Scanner',
|
|
'back_to_dashboard': 'Back to Dashboard',
|
|
'download_badge': 'Download Badge',
|
|
'email_badge': 'Email Badge',
|
|
'badge_sent': 'Badge sent to your email!',
|
|
'attendee_types': 'Attendee Types',
|
|
'manage_types': 'Manage Types',
|
|
'create_attendee_type': 'Create Attendee Type',
|
|
'type_name': 'Type Name',
|
|
'e_g_vip_speaker_student': 'e.g., VIP, Speaker, Student',
|
|
'price': 'Price',
|
|
'optional': 'optional',
|
|
'leave_empty_for_free': 'Leave empty for free',
|
|
'create_type': 'Create Type',
|
|
'existing_types': 'Existing Types',
|
|
'registration_link': 'Registration Link',
|
|
'copy': 'Copy',
|
|
'delete': 'Delete',
|
|
'confirm_delete_type': 'Are you sure you want to delete this attendee type?',
|
|
'edit_attendee_type': 'Edit Attendee Type',
|
|
'back_to_types': 'Back to Types',
|
|
'type_details': 'Type Details',
|
|
'save_changes': 'Save Changes',
|
|
'edit': 'Edit',
|
|
'cannot_delete_type_with_attendees': 'Cannot delete: %d attendees assigned to this type',
|
|
'no_attendee_types_yet': 'No attendee types defined yet.',
|
|
'create_first_type': 'Create the first type',
|
|
'back_to_event': 'Back to Event',
|
|
'link_copied': 'Link copied!',
|
|
'copy_failed': 'Failed to copy link.',
|
|
'free': 'Free',
|
|
'select_all': 'Select All',
|
|
'assign_type': 'Assign Type',
|
|
'no_type': 'No Type',
|
|
'apply': 'Apply',
|
|
'selected': 'selected',
|
|
'type': 'Type',
|
|
'registration_type': 'Registration Type',
|
|
'manage_attendee_types_across_events': 'Manage attendee types across all your events',
|
|
'no_attendee_types_for_event': 'No attendee types defined for this event.',
|
|
'attendees': 'Attendees',
|
|
'manage': 'Manage',
|
|
'payment': 'Payment',
|
|
'complete_payment': 'Complete Payment',
|
|
'back_to_registration': 'Back to Registration',
|
|
'order_summary': 'Order Summary',
|
|
'event_registration': 'Event Registration',
|
|
'attendee_type': 'Attendee Type',
|
|
'attendee': 'Attendee',
|
|
'total': 'Total',
|
|
'name_on_card': 'Name on Card',
|
|
'card_number': 'Card Number',
|
|
'expiry_date': 'Expiry Date',
|
|
'cvv': 'CVV',
|
|
'pay_now': 'Pay Now',
|
|
'payment_secure': 'Your payment is secure and encrypted',
|
|
'john_doe': 'John Doe',
|
|
'mm/yy': 'MM/YY',
|
|
}
|
|
|
|
|
|
def get_translation(key, locale=None):
|
|
"""Get translation for a key in English, with underscores replaced by spaces."""
|
|
result = ENGLISH_TRANSLATIONS.get(key, key)
|
|
# Replace underscores with spaces and title case
|
|
if '_' in str(result):
|
|
words = str(result).split('_')
|
|
result = ' '.join(word.capitalize() for word in words)
|
|
return result
|
|
|
|
|
|
def t(key, locale=None):
|
|
"""Shorthand for get_translation."""
|
|
return get_translation(key, locale)
|
|
|
|
|
|
def strftime_to_babel_pattern(strftime_format):
|
|
"""Convert Python strftime format string to Babel CLDR pattern.
|
|
Quotes literal text for proper Babel interpretation.
|
|
"""
|
|
import re
|
|
|
|
# Mapping of strftime codes to Babel CLDR codes
|
|
strftime_to_babel = {
|
|
'%Y': 'yyyy',
|
|
'%y': 'yy',
|
|
'%m': 'MM',
|
|
'%B': 'MMMM',
|
|
'%b': 'MMM',
|
|
'%d': 'dd',
|
|
'%A': 'EEEE',
|
|
'%a': 'EEE',
|
|
'%H': 'HH',
|
|
'%I': 'hh',
|
|
'%M': 'mm',
|
|
'%S': 'ss',
|
|
'%p': 'a',
|
|
'%j': 'DDD',
|
|
'%U': 'ww',
|
|
'%W': 'ww',
|
|
}
|
|
|
|
# Find all strftime format codes and their positions
|
|
format_code_pattern = re.compile(r'%[A-Za-z]')
|
|
|
|
result_parts = []
|
|
last_end = 0
|
|
|
|
for match in format_code_pattern.finditer(strftime_format):
|
|
# Get the literal text before this format code
|
|
if match.start() > last_end:
|
|
literal = strftime_format[last_end:match.start()]
|
|
# Quote literal text for Babel (escape single quotes by doubling)
|
|
if literal:
|
|
quoted_literal = literal.replace("'", "''")
|
|
result_parts.append(f"'{quoted_literal}'")
|
|
|
|
# Convert the format code
|
|
code = match.group()
|
|
if code in strftime_to_babel:
|
|
result_parts.append(strftime_to_babel[code])
|
|
|
|
last_end = match.end()
|
|
|
|
# Handle any remaining literal text after the last format code
|
|
if last_end < len(strftime_format):
|
|
literal = strftime_format[last_end:]
|
|
if literal:
|
|
quoted_literal = literal.replace("'", "''")
|
|
result_parts.append(f"'{quoted_literal}'")
|
|
|
|
return ''.join(result_parts)
|
|
|
|
|
|
def localized_date(dt, locale=None, format=None):
|
|
"""Format a datetime in the specified locale's format."""
|
|
if dt is None:
|
|
return ''
|
|
|
|
# Ensure dt is a datetime object
|
|
from datetime import datetime as dt_class
|
|
if not isinstance(dt, dt_class):
|
|
# If it's a string or something else, try to parse it or return empty
|
|
if isinstance(dt, str):
|
|
try:
|
|
dt = dt_class.strptime(dt, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
dt = dt_class.strptime(dt, '%Y-%m-%dT%H:%M:%S')
|
|
except ValueError:
|
|
return str(dt) if dt else ''
|
|
else:
|
|
return str(dt) if dt else ''
|
|
|
|
if locale is None:
|
|
locale = get_locale()
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT date_format FROM languages WHERE code = %s", (locale,))
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if format is None and result:
|
|
format = strftime_to_babel_pattern(result['date_format'])
|
|
elif format is None:
|
|
format = 'MMMM d, yyyy h:mm a'
|
|
else:
|
|
# User provided a format from Jinja filter - convert it too
|
|
format = strftime_to_babel_pattern(format)
|
|
|
|
# Use babel's date formatting for proper locale support
|
|
from babel.dates import format_datetime
|
|
return format_datetime(dt, format, locale=locale)
|
|
except Exception:
|
|
# Fallback to strftime (will only properly show English)
|
|
try:
|
|
return dt.strftime('%B %d, %Y at %H:%M')
|
|
except Exception:
|
|
return str(dt)
|
|
|
|
|
|
# Jinja2 filters
|
|
@app.template_filter('localized_date')
|
|
def localized_date_filter(dt, format=None):
|
|
return localized_date(dt, get_locale(), format)
|
|
|
|
|
|
@app.template_filter('t')
|
|
def translate_filter(key):
|
|
# Always use English for translations
|
|
return get_translation(key, 'en')
|
|
|
|
|
|
@app.template_filter('spacify')
|
|
def spacify_filter(text):
|
|
"""Replace underscores with spaces and capitalize each word."""
|
|
if text is None or text == '':
|
|
return ''
|
|
return ' '.join(word.capitalize() for word in str(text).split('_'))
|
|
|
|
|
|
@app.template_filter('default')
|
|
def default_filter(value, default_value=''):
|
|
"""Return default_value if value is none or undefined."""
|
|
if value is None:
|
|
return default_value
|
|
return value
|
|
|
|
|
|
@app.template_filter('format_currency')
|
|
def format_currency_filter(value):
|
|
"""Format a number as currency."""
|
|
if value is None or value == '':
|
|
return ''
|
|
try:
|
|
return f'{float(value):.2f}'
|
|
except (ValueError, TypeError):
|
|
return str(value)
|
|
|
|
|
|
@app.context_processor
|
|
def utility_processor():
|
|
"""Add utility functions to template context."""
|
|
def get_currency_symbol():
|
|
return '€' # Could be made dynamic based on locale
|
|
return dict(get_currency_symbol=get_currency_symbol)
|
|
|
|
|
|
# Update session locale when language changes
|
|
@app.before_request
|
|
def before_request():
|
|
"""Before each request, update locale from URL if provided."""
|
|
if 'lang' in request.args:
|
|
lang = request.args.get('lang')
|
|
if lang in SUPPORTED_LOCALES:
|
|
session['locale'] = lang
|
|
|
|
|
|
@app.after_request
|
|
def prevent_caching(response):
|
|
"""Prevent proxy caching to ensure fresh translations."""
|
|
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
|
|
response.headers['Pragma'] = 'no-cache'
|
|
response.headers['Expires'] = '0'
|
|
return response
|
|
|
|
# Ensure upload folder exists
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
|
|
|
|
|
|
def get_db_connection(user=Config.DB_APP_USER, password=Config.DB_APP_PASSWORD):
|
|
"""Get a database connection."""
|
|
return mysql.connector.connect(
|
|
host=Config.DB_HOST,
|
|
port=Config.DB_PORT,
|
|
user=user,
|
|
password=password,
|
|
database=Config.DB_NAME
|
|
)
|
|
|
|
|
|
def generate_event_code():
|
|
"""Generate a unique 10-character alphanumeric event code."""
|
|
chars = string.ascii_uppercase + string.digits
|
|
while True:
|
|
code = ''.join(random.choices(chars, k=10))
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM events WHERE code = %s", (code,))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return code
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def generate_session_code():
|
|
"""Generate a unique 10-character alphanumeric breakout session code."""
|
|
chars = string.ascii_uppercase + string.digits
|
|
while True:
|
|
code = ''.join(random.choices(chars, k=10))
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return code
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def generate_attendee_code():
|
|
"""Generate a unique 10-character alphanumeric attendee code."""
|
|
chars = string.ascii_uppercase + string.digits
|
|
while True:
|
|
code = ''.join(random.choices(chars, k=10))
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM attendees WHERE attendee_code = %s", (code,))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return code
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def generate_staff_code():
|
|
"""Generate a unique 10-character alphanumeric staff code."""
|
|
chars = string.ascii_uppercase + string.digits
|
|
while True:
|
|
code = ''.join(random.choices(chars, k=10))
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM organizers WHERE staff_code = %s", (code,))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return code
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def generate_type_code(event_id):
|
|
"""Generate a unique 10-character alphanumeric attendee type code for an event."""
|
|
chars = string.ascii_uppercase + string.digits
|
|
while True:
|
|
code = ''.join(random.choices(chars, k=10))
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM attendee_types WHERE event_id = %s AND code = %s", (event_id, code))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return code
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def login_required(f):
|
|
"""Decorator to require login."""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session or 'user_type' not in session:
|
|
flash('Please log in first.')
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
def get_current_user():
|
|
"""Get current logged-in user from session."""
|
|
if 'user_id' not in session or 'user_type' not in session:
|
|
return None, None
|
|
return session['user_id'], session['user_type']
|
|
|
|
|
|
# Utility functions
|
|
def hash_password(password):
|
|
"""Hash a password."""
|
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
|
|
|
def verify_password(password, hashed):
|
|
"""Verify a password."""
|
|
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
|
|
def verify_recaptcha(recaptcha_response):
|
|
"""Verify a reCAPTCHA response with Google."""
|
|
secret_key = app.config.get('RECAPTCHA_SECRET_KEY')
|
|
if not secret_key:
|
|
return True # Skip verification if no secret key configured
|
|
try:
|
|
payload = {
|
|
'secret': secret_key,
|
|
'response': recaptcha_response
|
|
}
|
|
resp = requests.post('https://www.google.com/recaptcha/api/siteverify', data=payload, timeout=10)
|
|
result = resp.json()
|
|
return result.get('success', False)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# Rate limiting for failed login attempts
|
|
failed_login_attempts = {} # {ip_address: [(timestamp, count), ...]}
|
|
|
|
# Rate limiting for failed personal page attempts (30-minute block)
|
|
failed_personal_page_attempts = {} # {ip_address: [(timestamp, count), ...]}
|
|
PERSONAL_PAGE_BLOCK_DURATION = 30 * 60 # 30 minutes in seconds
|
|
PERSONAL_PAGE_MAX_ATTEMPTS = 5 # Max failed attempts before blocking
|
|
|
|
def is_ip_blocked(ip_address):
|
|
"""Check if an IP address is blocked due to too many failed login attempts."""
|
|
if ip_address not in failed_login_attempts:
|
|
return False
|
|
|
|
# Clean up old entries (older than 1 hour)
|
|
current_time = datetime.now()
|
|
failed_login_attempts[ip_address] = [
|
|
(ts, count) for ts, count in failed_login_attempts[ip_address]
|
|
if (current_time - ts).total_seconds() < 3600
|
|
]
|
|
|
|
# Check if still blocked
|
|
if len(failed_login_attempts[ip_address]) >= 5:
|
|
return True
|
|
|
|
# Remove empty entries
|
|
if not failed_login_attempts[ip_address]:
|
|
del failed_login_attempts[ip_address]
|
|
|
|
return False
|
|
|
|
|
|
def is_ip_blocked_for_personal_page(ip_address):
|
|
"""Check if an IP address is blocked due to too many failed personal page attempts."""
|
|
if ip_address not in failed_personal_page_attempts:
|
|
return False
|
|
|
|
# Clean up old entries (older than 30 minutes)
|
|
current_time = datetime.now()
|
|
failed_personal_page_attempts[ip_address] = [
|
|
(ts, count) for ts, count in failed_personal_page_attempts[ip_address]
|
|
if (current_time - ts).total_seconds() < PERSONAL_PAGE_BLOCK_DURATION
|
|
]
|
|
|
|
# Check if still blocked
|
|
if len(failed_personal_page_attempts[ip_address]) >= PERSONAL_PAGE_MAX_ATTEMPTS:
|
|
return True
|
|
|
|
# Remove empty entries
|
|
if not failed_personal_page_attempts[ip_address]:
|
|
del failed_personal_page_attempts[ip_address]
|
|
|
|
return False
|
|
|
|
|
|
def record_failed_personal_page_attempt(ip_address):
|
|
"""Record a failed personal page attempt for an IP address."""
|
|
if ip_address not in failed_personal_page_attempts:
|
|
failed_personal_page_attempts[ip_address] = []
|
|
|
|
failed_personal_page_attempts[ip_address].append((datetime.now(), 1))
|
|
|
|
|
|
def clear_failed_personal_page_attempts(ip_address):
|
|
"""Clear failed personal page attempts after successful access."""
|
|
if ip_address in failed_personal_page_attempts:
|
|
del failed_personal_page_attempts[ip_address]
|
|
|
|
|
|
def record_failed_login(ip_address):
|
|
"""Record a failed login attempt for an IP address."""
|
|
if ip_address not in failed_login_attempts:
|
|
failed_login_attempts[ip_address] = []
|
|
|
|
failed_login_attempts[ip_address].append((datetime.now(), 1))
|
|
|
|
|
|
def clear_failed_logins(ip_address):
|
|
"""Clear failed login attempts after successful login."""
|
|
if ip_address in failed_login_attempts:
|
|
del failed_login_attempts[ip_address]
|
|
|
|
|
|
def ensure_staff_code_column():
|
|
"""Ensure the staff_code column exists in organizers table."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'organizers' AND COLUMN_NAME = 'staff_code'
|
|
""", (Config.DB_NAME,))
|
|
if not cursor.fetchone():
|
|
cursor.execute("ALTER TABLE organizers ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
|
|
conn.commit()
|
|
print("Added staff_code column to organizers table")
|
|
cursor.close()
|
|
conn.close()
|
|
except Error as e:
|
|
print(f"Error ensuring staff_code column: {e}")
|
|
|
|
|
|
def ensure_all_organizers_have_staff_code():
|
|
"""Ensure all existing organizers have a staff_code."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT id FROM organizers WHERE staff_code IS NULL")
|
|
organizers_without_code = cursor.fetchall()
|
|
for org in organizers_without_code:
|
|
staff_code = generate_staff_code()
|
|
cursor.execute("UPDATE organizers SET staff_code = %s WHERE id = %s", (staff_code, org['id']))
|
|
if organizers_without_code:
|
|
conn.commit()
|
|
print(f"Generated staff_codes for {len(organizers_without_code)} organizers")
|
|
cursor.close()
|
|
conn.close()
|
|
except Error as e:
|
|
print(f"Error generating staff codes: {e}")
|
|
|
|
|
|
def ensure_staff_staff_code_column():
|
|
"""Ensure the staff_code column exists in staff table."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'staff' AND COLUMN_NAME = 'staff_code'
|
|
""", (Config.DB_NAME,))
|
|
if not cursor.fetchone():
|
|
cursor.execute("ALTER TABLE staff ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
|
|
conn.commit()
|
|
print("Added staff_code column to staff table")
|
|
cursor.close()
|
|
conn.close()
|
|
except Error as e:
|
|
print(f"Error ensuring staff staff_code column: {e}")
|
|
|
|
|
|
def ensure_all_staff_have_staff_code():
|
|
"""Ensure all existing staff members have a staff_code."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT id FROM staff WHERE staff_code IS NULL")
|
|
staff_without_code = cursor.fetchall()
|
|
for s in staff_without_code:
|
|
staff_code = generate_staff_code()
|
|
cursor.execute("UPDATE staff SET staff_code = %s WHERE id = %s", (staff_code, s['id']))
|
|
if staff_without_code:
|
|
conn.commit()
|
|
print(f"Generated staff_codes for {len(staff_without_code)} staff members")
|
|
cursor.close()
|
|
conn.close()
|
|
except Error as e:
|
|
print(f"Error generating staff codes: {e}")
|
|
|
|
|
|
def allowed_file(filename):
|
|
"""Check if file extension is allowed."""
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in Config.ALLOWED_EXTENSIONS
|
|
|
|
|
|
def generate_invite_token():
|
|
"""Generate a unique invite token for staff."""
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
def send_staff_invite_email(staff_email, staff_name, event_name, invite_token, organizer_email):
|
|
"""Send invite email to staff member."""
|
|
invite_url = url_for('staff_invite', token=invite_token, _external=True)
|
|
|
|
subject = f"You're invited to join {event_name} as Staff"
|
|
body = f"""Hello {staff_name},
|
|
|
|
You've been invited to join the event "{event_name}" as a staff member.
|
|
|
|
Click the link below to complete your registration and set your password:
|
|
{invite_url}
|
|
|
|
This link will expire in 7 days.
|
|
|
|
Best regards,
|
|
The Event Team
|
|
"""
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.MAIL_DEFAULT_SENDER
|
|
msg['To'] = staff_email
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
try:
|
|
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
|
|
if Config.MAIL_USE_TLS:
|
|
server.starttls()
|
|
if Config.MAIL_USERNAME:
|
|
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email error: {e}")
|
|
return False
|
|
|
|
|
|
def send_attendee_confirmation_email(attendee_email, attendee_name, event_name, event_date, event_location, personal_page_url=None):
|
|
"""Send confirmation email to attendee after registration."""
|
|
subject = f"Registration confirmed for {event_name}"
|
|
|
|
personal_page_section = f"""
|
|
Your Personal Page:
|
|
{personal_page_url}
|
|
|
|
View your registered events and breakout sessions anytime.
|
|
""" if personal_page_url else ""
|
|
|
|
body = f"""Hello {attendee_name},
|
|
|
|
Your registration for {event_name} has been confirmed!
|
|
|
|
Event Details:
|
|
- Date: {event_date}
|
|
- Location: {event_location}{personal_page_section}
|
|
You can now log in to:
|
|
- View event details
|
|
- RSVP for break-out sessions
|
|
- Connect with other attendees
|
|
|
|
We look forward to seeing you there!
|
|
|
|
Best regards,
|
|
The Event Team
|
|
"""
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.MAIL_DEFAULT_SENDER
|
|
msg['To'] = attendee_email
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
try:
|
|
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
|
|
if Config.MAIL_USE_TLS:
|
|
server.starttls()
|
|
if Config.MAIL_USERNAME:
|
|
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email error: {e}")
|
|
return False
|
|
|
|
|
|
def send_breakout_session_update_email(attendee_email, attendee_name, session_name, event_name, changes_text):
|
|
"""Send update email to attendee when a breakout session is modified."""
|
|
subject = f"Update on Breakout Session: {session_name}"
|
|
|
|
body = f"""Hello {attendee_name},
|
|
|
|
The breakout session "{session_name}" for {event_name} has been updated.
|
|
|
|
Changes made:
|
|
{changes_text}
|
|
|
|
Please log in to view the updated details.
|
|
|
|
Best regards,
|
|
The Event Team
|
|
"""
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.MAIL_DEFAULT_SENDER
|
|
msg['To'] = attendee_email
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
try:
|
|
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
|
|
if Config.MAIL_USE_TLS:
|
|
server.starttls()
|
|
if Config.MAIL_USERNAME:
|
|
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email error: {e}")
|
|
return False
|
|
|
|
|
|
def send_event_update_email(attendee_email, attendee_name, event_name, changes_text):
|
|
"""Send update email to attendee when an event is modified."""
|
|
subject = f"Update on Event: {event_name}"
|
|
|
|
body = f"""Hello {attendee_name},
|
|
|
|
The event "{event_name}" has been updated.
|
|
|
|
Changes made:
|
|
{changes_text}
|
|
|
|
Please log in to view the updated details.
|
|
|
|
Best regards,
|
|
The Event Team
|
|
"""
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.MAIL_DEFAULT_SENDER
|
|
msg['To'] = attendee_email
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
try:
|
|
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
|
|
if Config.MAIL_USE_TLS:
|
|
server.starttls()
|
|
if Config.MAIL_USERNAME:
|
|
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email error: {e}")
|
|
return False
|
|
|
|
|
|
def generate_badge_pdf(attendee, event=None):
|
|
"""Generate an A4 PDF badge for an attendee.
|
|
|
|
The PDF is designed to be folded horizontally and vertically once,
|
|
with the attendee info in the lower-left quarter.
|
|
Includes dotted fold lines and QR code.
|
|
"""
|
|
buffer = io.BytesIO()
|
|
c = canvas.Canvas(buffer, pagesize=A4)
|
|
width, height = A4 # 595.28 x 841.89 points
|
|
|
|
# A4 dimensions in mm
|
|
a4_width_mm = 210
|
|
a4_height_mm = 297
|
|
|
|
# Margins - 10cm higher than before (10cm = 100mm)
|
|
left_margin = 15 * mm
|
|
bottom_margin = 15 * mm + 100 * mm # 10cm higher
|
|
|
|
# Dotted line settings
|
|
dash_length = 5
|
|
gap_length = 5
|
|
|
|
# Draw vertical dotted line down the middle
|
|
center_x = width / 2
|
|
c.setDash(dash_length, gap_length)
|
|
c.setLineWidth(0.5)
|
|
c.line(center_x, 0, center_x, height)
|
|
|
|
# Draw horizontal dotted line over the middle
|
|
center_y = height / 2
|
|
c.line(0, center_y, width, center_y)
|
|
|
|
# Reset dash for other drawing
|
|
c.setDash([])
|
|
|
|
# Draw VISITOR bar at top of bottom-left quadrant, left edge to center, below dotted line
|
|
bar_height = 50 # Height of the bar in points (doubled)
|
|
c.setFillColorRGB(0, 0, 0) # Black color
|
|
c.rect(0, center_y - bar_height, center_x, bar_height, fill=True, stroke=False)
|
|
# Draw VISITOR text in white, centered
|
|
c.setFillColorRGB(1, 1, 1) # White
|
|
c.setFont("Helvetica-Bold", 27) # 150% of 18
|
|
text = "VISITOR"
|
|
text_width = c.stringWidth(text, "Helvetica-Bold", 27)
|
|
text_x = (center_x - text_width) / 2
|
|
c.drawString(text_x, center_y - bar_height + 12, text)
|
|
|
|
# Reset fill color to black for subsequent text
|
|
c.setFillColorRGB(0, 0, 0)
|
|
|
|
# Font setup - 32pt (twice as high as 16pt) for name, 16pt for org/role
|
|
# Bold for name, regular for org/role
|
|
c.setFont("Helvetica-Bold", 32)
|
|
|
|
# Position in lower-left quarter
|
|
# Content y position (from bottom of page) - moved 1.5cm (42.5pt) further up
|
|
y_name = bottom_margin + 40 - 42.5 # Start with name
|
|
|
|
# Draw attendee first name (bold, 32pt)
|
|
first_name = attendee.get('first_name', '')
|
|
last_name = attendee.get('last_name', '')
|
|
|
|
# Check if names fit within the width limit (half of A4 width minus margins)
|
|
max_name_width = (width / 2) - left_margin - 10 # 10pt padding
|
|
full_name_width = c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", 32)
|
|
|
|
# If full name is too wide, reduce font size
|
|
font_size = 32
|
|
if full_name_width > max_name_width:
|
|
# Try smaller sizes
|
|
for size in [28, 24, 20, 18, 16]:
|
|
if c.stringWidth(first_name + ' ' + last_name, "Helvetica-Bold", size) <= max_name_width:
|
|
font_size = size
|
|
break
|
|
c.setFont("Helvetica-Bold", font_size)
|
|
|
|
# Draw first name
|
|
c.drawString(left_margin, y_name, first_name)
|
|
# Draw last name on next line
|
|
y_last_name = y_name - (font_size * 1.2)
|
|
c.drawString(left_margin, y_last_name, last_name)
|
|
# Extra blank line after last name
|
|
y_org = y_last_name - (font_size * 1.4)
|
|
|
|
# Draw organization (regular, 16pt, 150% = 24pt)
|
|
c.setFont("Helvetica", 24)
|
|
y_org = y_name - 50
|
|
c.drawString(left_margin, y_org, attendee.get('organisation', '') or '')
|
|
|
|
# Draw role (regular, 16pt, 150% = 24pt)
|
|
c.setFont("Helvetica", 24)
|
|
y_role = y_org - 40
|
|
c.drawString(left_margin, y_role, attendee.get('role', '') or '')
|
|
|
|
# Generate QR code for attendee_code
|
|
attendee_code = attendee.get('attendee_code', '')
|
|
if attendee_code:
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=1)
|
|
qr.add_data(attendee_code)
|
|
qr.make(fit=True)
|
|
qr_img = qr.make_image(fill_color="black", back_color="white")
|
|
|
|
# Convert to bytes and then to ImageReader for reportlab
|
|
qr_buffer = io.BytesIO()
|
|
qr_img.save(qr_buffer, format='PNG')
|
|
qr_buffer.seek(0)
|
|
|
|
from reportlab.lib.utils import ImageReader
|
|
qr_reader = ImageReader(qr_buffer)
|
|
|
|
# Draw QR code at bottom-left with 15mm left padding, flush to bottom edge
|
|
qr_size = 40 * mm # QR code size
|
|
qr_x = 15 * mm # 15mm from left
|
|
qr_y = 25 * mm # 10mm from top of bottom_bar (15mm + 10mm)
|
|
|
|
c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size)
|
|
|
|
# Draw bottom black bar with event info (from bottom edge, 150mm high)
|
|
if event:
|
|
bar_bottom_height = 15 * mm # 15mm high
|
|
c.setFillColorRGB(0, 0, 0) # Black color
|
|
c.rect(0, 0, center_x, bar_bottom_height, fill=True, stroke=False)
|
|
|
|
# Draw event name centered
|
|
c.setFillColorRGB(1, 1, 1) # White
|
|
c.setFont("Helvetica-Bold", 14)
|
|
event_name = event.get('name', '') or ''
|
|
event_name_width = c.stringWidth(event_name, "Helvetica-Bold", 14)
|
|
event_name_x = (center_x - event_name_width) / 2
|
|
c.drawString(event_name_x, bar_bottom_height - 21, event_name)
|
|
|
|
# Draw start and end date/time on the same line, centered
|
|
c.setFont("Helvetica", 10)
|
|
start_time = event.get('start_time')
|
|
end_time = event.get('end_time')
|
|
if start_time and end_time:
|
|
start_str = localized_date(start_time, get_locale(), '%d/%m/%Y %H:%M')
|
|
end_str = localized_date(end_time, get_locale(), '%d/%m/%Y %H:%M')
|
|
combined_str = start_str + " - " + end_str
|
|
combined_width = c.stringWidth(combined_str, "Helvetica", 10)
|
|
combined_x = (center_x - combined_width) / 2
|
|
c.drawString(combined_x, bar_bottom_height - 37, combined_str)
|
|
|
|
c.save()
|
|
buffer.seek(0)
|
|
return buffer
|
|
|
|
|
|
def generate_rectangular_badges_pdf(attendees, event=None, attendee_types_map=None):
|
|
"""Generate an A4 PDF with rectangular labels.
|
|
|
|
Label dimensions: 80mm x 50mm
|
|
Layout: 2 columns x 5 rows = 10 badges per A4 page.
|
|
Each badge contains: attendee type (black bar), name, organization, role, and QR code.
|
|
"""
|
|
buffer = io.BytesIO()
|
|
c = canvas.Canvas(buffer, pagesize=A4)
|
|
width, height = A4 # 595.28 x 841.89 points
|
|
|
|
# Label dimensions: 80mm x 50mm
|
|
label_width_mm = 80
|
|
label_height_mm = 50
|
|
label_width = label_width_mm * mm
|
|
label_height = label_height_mm * mm
|
|
|
|
badges_per_row = 2
|
|
badges_per_col = 5
|
|
badges_per_page = badges_per_row * badges_per_col
|
|
|
|
# Page margins
|
|
margin_left_mm = 15
|
|
margin_right_mm = 15
|
|
margin_top_mm = 12
|
|
margin_bottom_mm = 12
|
|
|
|
# Calculate spacing
|
|
available_width = width - (margin_left_mm * mm) - (margin_right_mm * mm)
|
|
available_height = height - (margin_top_mm * mm) - (margin_bottom_mm * mm)
|
|
|
|
# Gap between labels
|
|
gap_x = (available_width - badges_per_row * label_width) / (badges_per_row + 1)
|
|
gap_y = (available_height - badges_per_col * label_height) / (badges_per_col + 1)
|
|
|
|
# Badge positions (starting from top-left)
|
|
badge_positions = []
|
|
for row in range(badges_per_col):
|
|
for col in range(badges_per_row):
|
|
x = margin_left_mm * mm + gap_x + col * (label_width + gap_x)
|
|
y = height - margin_top_mm * mm - gap_y - row * (label_height + gap_y) - label_height
|
|
badge_positions.append((x, y))
|
|
|
|
# Draw badges for each attendee
|
|
for i, attendee in enumerate(attendees):
|
|
badge_index = i % badges_per_page
|
|
|
|
# New page if needed
|
|
if badge_index == 0 and i > 0:
|
|
c.showPage()
|
|
|
|
if badge_index < len(badge_positions):
|
|
x, y = badge_positions[badge_index]
|
|
|
|
# Draw label border (light gray)
|
|
c.setStrokeColorRGB(0.7, 0.7, 0.7)
|
|
c.setLineWidth(0.5)
|
|
c.rect(x, y, label_width, label_height, fill=False, stroke=True)
|
|
|
|
# Get attendee type name from mapping
|
|
attendee_type_id = attendee.get('attendee_type_id')
|
|
attendee_type_name = ''
|
|
if attendee_type_id and attendee_types_map:
|
|
type_info = attendee_types_map.get(attendee_type_id, {})
|
|
attendee_type_name = type_info.get('name', '') or ''
|
|
|
|
# Draw black bar at top with attendee type in white
|
|
if attendee_type_name:
|
|
bar_height = 8 * mm
|
|
c.setFillColorRGB(0, 0, 0)
|
|
c.rect(x, y + label_height - bar_height, label_width, bar_height, fill=True, stroke=False)
|
|
c.setFillColorRGB(1, 1, 1)
|
|
c.setFont("Helvetica-Bold", 16)
|
|
c.drawCentredString(x + label_width / 2, y + label_height - bar_height + 1.5 * mm, attendee_type_name[:20])
|
|
|
|
# Draw attendee name (20pt)
|
|
first_name = attendee.get('first_name', '') or ''
|
|
last_name = attendee.get('last_name', '') or ''
|
|
full_name = f"{first_name} {last_name}".strip()
|
|
|
|
font_size = 20
|
|
max_name_width = label_width - 2 * mm
|
|
name_width = c.stringWidth(full_name, "Helvetica-Bold", font_size)
|
|
if name_width > max_name_width:
|
|
# Scale down to fit
|
|
font_size = int(font_size * max_name_width / name_width)
|
|
|
|
c.setFont("Helvetica-Bold", font_size)
|
|
c.setFillColorRGB(0, 0, 0)
|
|
|
|
name_y = y + label_height - 22 * mm
|
|
c.drawCentredString(x + label_width / 2, name_y, full_name)
|
|
|
|
# Draw organization
|
|
org = attendee.get('organisation', '') or ''
|
|
c.setFont("Helvetica", 14)
|
|
org_y = name_y - 8 * mm
|
|
if org:
|
|
c.drawCentredString(x + label_width / 2, org_y, org[:22])
|
|
|
|
# Draw role
|
|
role = attendee.get('role', '') or ''
|
|
c.setFont("Helvetica", 14)
|
|
if role:
|
|
role_y = org_y - 7 * mm
|
|
c.drawCentredString(x + label_width / 2, role_y, role[:22])
|
|
|
|
# Generate QR code at bottom-right
|
|
attendee_code = attendee.get('attendee_code', '')
|
|
if attendee_code:
|
|
qr_size = 12 * mm
|
|
qr_x = x + label_width - qr_size - 2 * mm
|
|
qr_y = y + 10 * mm
|
|
|
|
qr = qrcode.QRCode(version=1, box_size=2, border=0)
|
|
qr.add_data(attendee_code)
|
|
qr.make(fit=True)
|
|
qr_img = qr.make_image(fill_color="black", back_color="white")
|
|
|
|
qr_buffer = io.BytesIO()
|
|
qr_img.save(qr_buffer, format='PNG')
|
|
qr_buffer.seek(0)
|
|
|
|
from reportlab.lib.utils import ImageReader
|
|
qr_reader = ImageReader(qr_buffer)
|
|
c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size)
|
|
|
|
# Draw black bar at bottom with event name and start date
|
|
bar_height = 8 * mm
|
|
c.setFillColorRGB(0, 0, 0)
|
|
c.rect(x, y, label_width, bar_height, fill=True, stroke=False)
|
|
c.setFillColorRGB(1, 1, 1)
|
|
c.setFont("Helvetica-Bold", 14)
|
|
|
|
event_name = (event.get('name', '') or '') if event else ''
|
|
start_time = event.get('start_time', '') if event else ''
|
|
event_date_str = ''
|
|
if start_time:
|
|
if hasattr(start_time, 'strftime'):
|
|
event_date_str = start_time.strftime('%d-%m-%Y')
|
|
else:
|
|
event_date_str = str(start_time)[:10]
|
|
|
|
bar_text = f"{event_name} — {event_date_str}" if event_date_str else event_name
|
|
c.drawCentredString(x + label_width / 2, y + bar_height / 2 - 1.5 * mm, bar_text)
|
|
|
|
c.save()
|
|
buffer.seek(0)
|
|
return buffer
|
|
|
|
|
|
def send_badge_email(attendee_email, attendee_name, event_name, pdf_buffer):
|
|
"""Send email with badge PDF attachment."""
|
|
subject = f"Your Badge for {event_name}"
|
|
|
|
body = f"""Hello {attendee_name},
|
|
|
|
Please find attached your badge for {event_name}.
|
|
|
|
The badge is designed to be folded once horizontally and once vertically.
|
|
Your information appears in the lower-left quarter after folding.
|
|
|
|
We look forward to seeing you there!
|
|
|
|
Best regards,
|
|
The Event Team
|
|
"""
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.MAIL_DEFAULT_SENDER
|
|
msg['To'] = attendee_email
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
# Attach PDF
|
|
pdf_buffer.seek(0)
|
|
part = MIMEBase('application', 'octet-stream')
|
|
part.set_payload(pdf_buffer.read())
|
|
encode_base64(part)
|
|
part.add_header('Content-Disposition', 'attachment', filename=f'badge_{attendee_name.replace(" ", "_")}.pdf')
|
|
msg.attach(part)
|
|
|
|
try:
|
|
with smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) as server:
|
|
if Config.MAIL_USE_TLS:
|
|
server.starttls()
|
|
if Config.MAIL_USERNAME:
|
|
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email error: {e}")
|
|
return False
|
|
|
|
|
|
# Routes - Main
|
|
@app.route('/')
|
|
def index():
|
|
"""Home page."""
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/staff/<staff_code>')
|
|
def staff_login(staff_code):
|
|
"""Staff/organizer login via staff_code (no password required)."""
|
|
client_ip = request.remote_addr
|
|
|
|
# Check if IP is blocked (reuse login rate limiting)
|
|
if is_ip_blocked(client_ip):
|
|
flash('Too many failed login attempts. Please try again later (blocked for 1 hour).')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# First check if it's an organizer
|
|
cursor.execute("SELECT * FROM organizers WHERE staff_code = %s", (staff_code,))
|
|
organizer = cursor.fetchone()
|
|
|
|
if organizer:
|
|
cursor.close()
|
|
conn.close()
|
|
clear_failed_logins(client_ip)
|
|
|
|
# Log in the organizer
|
|
session['user_id'] = organizer['id']
|
|
session['user_type'] = 'organizer'
|
|
session['event_id'] = None
|
|
|
|
flash(f'Welcome back, {organizer["name"]}!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Check if it's a staff member
|
|
cursor.execute("SELECT * FROM staff WHERE staff_code = %s", (staff_code,))
|
|
staff_member = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not staff_member:
|
|
record_failed_login(client_ip)
|
|
flash('Invalid staff link.')
|
|
return redirect(url_for('index'))
|
|
|
|
# Clear failed login attempts on successful staff code login
|
|
clear_failed_logins(client_ip)
|
|
|
|
# Log in the staff member
|
|
session['user_id'] = staff_member['id']
|
|
session['user_type'] = 'staff'
|
|
session['event_id'] = staff_member['event_id']
|
|
|
|
flash(f'Welcome back, {staff_member["first_name"]}!')
|
|
return redirect(url_for('staff_dashboard'))
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
# Routes - Auth
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login page - supports both organizer and attendee login."""
|
|
client_ip = request.remote_addr
|
|
|
|
# Check if IP is blocked
|
|
if is_ip_blocked(client_ip):
|
|
flash('Too many failed login attempts. Please try again later (blocked for 1 hour).')
|
|
return render_template('auth/login.html')
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email', '').strip()
|
|
password = request.form.get('password', '')
|
|
user_type = request.form.get('user_type', 'attendee')
|
|
|
|
if not email or not password:
|
|
flash('Email and password are required.')
|
|
return render_template('auth/login.html')
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
if user_type == 'organizer':
|
|
cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,))
|
|
elif user_type == 'breakout_organizer':
|
|
cursor.execute("SELECT * FROM organizers WHERE email = %s", (email,))
|
|
elif user_type == 'staff':
|
|
cursor.execute("SELECT * FROM staff WHERE email = %s", (email,))
|
|
else:
|
|
cursor.execute("SELECT * FROM attendees WHERE email = %s", (email,))
|
|
|
|
user = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if user and verify_password(password, user['password_hash']):
|
|
clear_failed_logins(client_ip)
|
|
session['user_id'] = user['id']
|
|
session['user_type'] = user_type
|
|
session['event_id'] = user.get('event_id') if user_type in ['attendee', 'staff'] else None
|
|
if user_type == 'organizer':
|
|
flash(f'Welcome back, {user["name"]}!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
elif user_type == 'breakout_organizer':
|
|
flash(f'Welcome back, {user["name"]}!')
|
|
return redirect(url_for('presenter_dashboard'))
|
|
elif user_type == 'staff':
|
|
flash(f'Welcome back, {user["first_name"]}!')
|
|
return redirect(url_for('staff_dashboard'))
|
|
else:
|
|
flash(f'Welcome back, {user["first_name"]}!')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
record_failed_login(client_ip)
|
|
flash('Invalid email or password.')
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
# Handle pre-selection of user type from query string
|
|
default_type = request.args.get('type', 'attendee')
|
|
return render_template('auth/login.html', default_type=default_type)
|
|
|
|
|
|
@app.route('/presenter/dashboard')
|
|
@login_required
|
|
def presenter_dashboard():
|
|
"""Presenter (break-out organiser) dashboard."""
|
|
if session.get('user_type') != 'breakout_organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get sessions where this organizer is assigned as presenter
|
|
cursor.execute("""
|
|
SELECT bs.*, e.name as event_name
|
|
FROM breakout_sessions bs
|
|
JOIN breakout_session_organizers bso ON bs.id = bso.breakout_session_id
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bso.organizer_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (session['user_id'],))
|
|
sessions = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT name FROM organizers WHERE id = %s", (session['user_id'],))
|
|
organizer = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('presenter/dashboard.html',
|
|
sessions=sessions,
|
|
presenter_name=organizer['name'])
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/staff/dashboard')
|
|
@login_required
|
|
def staff_dashboard():
|
|
"""Staff dashboard - list events for staff member."""
|
|
if session.get('user_type') != 'staff':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],))
|
|
staff_member = cursor.fetchone()
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id IN (SELECT event_id FROM staff WHERE id = %s) ORDER BY start_time", (session['user_id'],))
|
|
events = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('staff/staff_events.html', events=events, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}")
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/staff/<int:event_id>/dashboard')
|
|
@login_required
|
|
def staff_event_dashboard(event_id):
|
|
"""Staff dashboard for a specific event - only accessible by staff members assigned to this event."""
|
|
if session.get('user_type') != 'staff':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Verify staff is assigned to this event
|
|
cursor.execute("SELECT 1 FROM staff WHERE id = %s AND event_id = %s", (session['user_id'], event_id))
|
|
if not cursor.fetchone():
|
|
flash('Access denied.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('staff_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('staff_dashboard'))
|
|
|
|
cursor.execute("SELECT first_name, last_name FROM staff WHERE id = %s", (session['user_id'],))
|
|
staff_member = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('staff/dashboard.html', event=event, staff_name=f"{staff_member['first_name']} {staff_member['last_name']}")
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('staff_dashboard'))
|
|
|
|
|
|
@app.route('/register/organizer', methods=['GET', 'POST'])
|
|
def register_organizer():
|
|
"""Register a new organizer."""
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
password = request.form.get('password', '')
|
|
confirm_password = request.form.get('confirm_password', '')
|
|
|
|
if not all([name, email, password]):
|
|
flash('All fields are required.')
|
|
return render_template('register.html', user_type='organizer')
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.')
|
|
return render_template('register.html', user_type='organizer')
|
|
|
|
if len(password) < 6:
|
|
flash('Password must be at least 6 characters.')
|
|
return render_template('register.html', user_type='organizer')
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if email exists
|
|
cursor.execute("SELECT id FROM organizers WHERE email = %s", (email,))
|
|
if cursor.fetchone():
|
|
flash('Email already registered.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('register.html', user_type='organizer')
|
|
|
|
# Create organizer
|
|
password_hash = hash_password(password)
|
|
staff_code = generate_staff_code()
|
|
cursor.execute(
|
|
"INSERT INTO organizers (email, password_hash, name, staff_code) VALUES (%s, %s, %s, %s)",
|
|
(email, password_hash, name, staff_code)
|
|
)
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Registration successful! Please log in.')
|
|
return redirect(url_for('login'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('register.html', user_type='organizer')
|
|
|
|
|
|
@app.route('/register/attendee/<code>', methods=['GET', 'POST'])
|
|
def register_attendee(code):
|
|
"""Register a new attendee for an event using event code."""
|
|
# Verify event exists
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
return redirect(url_for('index'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
password = request.form.get('password', '')
|
|
confirm_password = request.form.get('confirm_password', '')
|
|
organisation = request.form.get('organisation', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
introduction = request.form.get('introduction', '').strip()
|
|
|
|
if not all([first_name, last_name, email, password]):
|
|
flash('First name, last name, email, and password are required.')
|
|
return render_template('register.html', user_type='attendee', event=event)
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.')
|
|
return render_template('register.html', user_type='attendee', event=event)
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if email already registered for this event
|
|
cursor.execute(
|
|
"SELECT id FROM attendees WHERE email = %s AND event_id = %s",
|
|
(email, event['id'])
|
|
)
|
|
if cursor.fetchone():
|
|
flash('Email already registered for this event.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('register.html', user_type='attendee', event=event)
|
|
|
|
password_hash = hash_password(password)
|
|
attendee_code = generate_attendee_code()
|
|
cursor.execute("""
|
|
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (event['id'], email, password_hash, first_name, last_name, organisation, role, introduction, attendee_code))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Registration successful! Please log in.')
|
|
return redirect(url_for('login'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('register.html', user_type='attendee', event=event)
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
"""Log out current user."""
|
|
session.clear()
|
|
flash('You have been logged out.')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
# Routes - Organizer
|
|
@app.route('/organizer/dashboard')
|
|
@login_required
|
|
def organizer_dashboard():
|
|
"""Organizer dashboard."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],))
|
|
events = cursor.fetchall()
|
|
|
|
# Get attendee counts, breakout sessions, and staff for each event
|
|
for event in events:
|
|
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s", (event['id'],))
|
|
event['attendee_count'] = cursor.fetchone()['count']
|
|
|
|
cursor.execute("""
|
|
SELECT bs.id, bs.name, bs.max_attendees,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (event['id'],))
|
|
event['breakout_sessions'] = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT id, first_name, last_name, email FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
|
|
event['staff'] = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT id, first_name, last_name, email, checked_in FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
|
|
event['attendees'] = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/dashboard.html', events=events)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/organizer/attendee-types')
|
|
@login_required
|
|
def all_attendee_types():
|
|
"""Show all attendee types across all events."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get all events for this organizer
|
|
cursor.execute("SELECT * FROM events WHERE organizer_id = %s ORDER BY start_time DESC", (session['user_id'],))
|
|
events = cursor.fetchall()
|
|
|
|
# Get attendee types for each event
|
|
for event in events:
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],))
|
|
event['attendee_types'] = cursor.fetchall()
|
|
|
|
# Get attendee count per type
|
|
for at in event['attendee_types']:
|
|
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE event_id = %s AND attendee_type_id = %s",
|
|
(event['id'], at['id']))
|
|
at['attendee_count'] = cursor.fetchone()['count']
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/all_attendee_types.html', events=events)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/organizer/event/create', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create_event():
|
|
"""Create a new event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
description = request.form.get('description', '').strip()
|
|
start_time = request.form.get('start_time', '')
|
|
end_time = request.form.get('end_time', '')
|
|
location = request.form.get('location', '').strip()
|
|
max_attendees = request.form.get('max_attendees', '')
|
|
|
|
if not all([name, start_time, location]):
|
|
flash('Name, start time, and location are required.')
|
|
return render_template('organizer/create_event.html')
|
|
|
|
try:
|
|
event_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
flash('Invalid start time format.')
|
|
return render_template('organizer/create_event.html')
|
|
|
|
event_end = None
|
|
if end_time:
|
|
try:
|
|
event_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
flash('Invalid end time format.')
|
|
return render_template('organizer/create_event.html')
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
event_code = generate_event_code()
|
|
cursor.execute("""
|
|
INSERT INTO events (organizer_id, code, name, description, start_time, end_time, location, max_attendees)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (session['user_id'], event_code, name, description, event_start, event_end, location, max_attendees if max_attendees else None))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Event created successfully!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('organizer/create_event.html')
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_event(event_id):
|
|
"""Edit an existing event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
description = request.form.get('description', '').strip()
|
|
location = request.form.get('location', '').strip()
|
|
max_attendees = request.form.get('max_attendees', '')
|
|
|
|
if not all([name, location]):
|
|
flash('Name and location are required.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_event.html', event=event)
|
|
|
|
cursor.execute("""
|
|
UPDATE events
|
|
SET name = %s, description = %s, location = %s, max_attendees = %s
|
|
WHERE id = %s
|
|
""", (name, description, location, max_attendees if max_attendees else None, event_id))
|
|
conn.commit()
|
|
|
|
# Capture changes for the confirmation pop-up
|
|
changes = []
|
|
old_event = event
|
|
if old_event['name'] != name:
|
|
changes.append(('Name', old_event['name'], name))
|
|
if (old_event['description'] or '') != description:
|
|
changes.append(('Description', old_event['description'] or '', description))
|
|
if old_event['location'] != location:
|
|
changes.append(('Location', old_event['location'], location))
|
|
if str(old_event['max_attendees'] or '') != max_attendees:
|
|
changes.append(('Max Attendees', old_event['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited'))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if changes:
|
|
# Get updated event
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
|
|
updated_event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_event_confirm.html', event=updated_event, changes=changes)
|
|
else:
|
|
flash('Event updated successfully!')
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_event.html', event=event)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/notify', methods=['POST'])
|
|
@login_required
|
|
def notify_event_attendees(event_id):
|
|
"""Send email notification to event attendees about changes."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Get attendees for this event
|
|
cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event_id,))
|
|
attendees = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Get changes text from form
|
|
changes_text = request.form.get('changes_text', 'Event details have been updated.')
|
|
|
|
# Send emails
|
|
sent_count = 0
|
|
failed_count = 0
|
|
for attendee in attendees:
|
|
full_name = f"{attendee['first_name']} {attendee['last_name']}"
|
|
if send_event_update_email(attendee['email'], full_name, event['name'], changes_text):
|
|
sent_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
if failed_count == 0:
|
|
flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.')
|
|
else:
|
|
flash(f'Notifications sent: {sent_count}, failed: {failed_count}.')
|
|
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/breakout-sessions')
|
|
@login_required
|
|
def list_breakout_sessions(event_id):
|
|
"""List breakout sessions for an event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("""
|
|
SELECT bs.*,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (event_id,))
|
|
sessions = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('organizer/breakout_sessions.html', event=event, sessions=sessions)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/breakout-session/create', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create_breakout_session(event_id):
|
|
"""Create a new breakout session."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Check if event is single-day
|
|
is_single_day = False
|
|
prefilled_date = ''
|
|
if event.get('start_time') and event.get('end_time'):
|
|
start = event['start_time']
|
|
end = event['end_time']
|
|
if hasattr(start, 'date') and hasattr(end, 'date'):
|
|
is_single_day = start.date() == end.date()
|
|
if is_single_day:
|
|
prefilled_date = start.strftime('%Y-%m-%d')
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
description = request.form.get('description', '').strip()
|
|
start_time = request.form.get('start_time', '')
|
|
end_time = request.form.get('end_time', '')
|
|
location = request.form.get('location', '').strip()
|
|
max_attendees = request.form.get('max_attendees', '')
|
|
|
|
if not all([name, start_time, end_time, location]):
|
|
flash('Name, start time, end time, and location are required.')
|
|
return render_template('organizer/create_breakout_session.html', event=event)
|
|
|
|
try:
|
|
session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
|
|
session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
flash('Invalid time format.')
|
|
return render_template('organizer/create_breakout_session.html', event=event)
|
|
|
|
if session_end <= session_start:
|
|
flash('End time must be after start time.')
|
|
return render_template('organizer/create_breakout_session.html', event=event)
|
|
|
|
if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']:
|
|
flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).')
|
|
return render_template('organizer/create_breakout_session.html', event=event)
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
session_code = generate_session_code()
|
|
cursor.execute("""
|
|
INSERT INTO breakout_sessions (code, event_id, name, description, start_time, end_time, location, max_attendees)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (session_code, event_id, name, description, session_start, session_end, location, max_attendees if max_attendees else None))
|
|
breakout_session_id = cursor.lastrowid
|
|
|
|
# Add event organizer as breakout session organizer
|
|
cursor.execute("""
|
|
INSERT INTO breakout_session_organizers (breakout_session_id, organizer_id)
|
|
VALUES (%s, %s)
|
|
""", (breakout_session_id, session['user_id']))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Breakout session created successfully!')
|
|
return redirect(url_for('list_breakout_sessions', event_id=event_id))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('organizer/create_breakout_session.html', event=event, prefilled_date=prefilled_date)
|
|
|
|
|
|
@app.route('/organizer/breakout-session/<int:session_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_breakout_session(session_id):
|
|
"""Edit a breakout session."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get breakout session and verify organizer has access
|
|
cursor.execute("""
|
|
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id, e.max_attendees as event_max_attendees
|
|
FROM breakout_sessions bs
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bs.id = %s
|
|
""", (session_id,))
|
|
breakout_session = cursor.fetchone()
|
|
|
|
if not breakout_session:
|
|
flash('Breakout session not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
if breakout_session['event_organizer_id'] != session['user_id']:
|
|
flash('Access denied.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('index'))
|
|
|
|
event = {'id': breakout_session['event_id'], 'name': breakout_session['event_name'], 'max_attendees': breakout_session['event_max_attendees']}
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
description = request.form.get('description', '').strip()
|
|
start_time = request.form.get('start_time', '')
|
|
end_time = request.form.get('end_time', '')
|
|
location = request.form.get('location', '').strip()
|
|
max_attendees = request.form.get('max_attendees', '')
|
|
|
|
if not all([name, start_time, end_time, location]):
|
|
flash('Name, start time, end time, and location are required.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
|
|
|
|
try:
|
|
session_start = datetime.strptime(start_time, '%Y-%m-%dT%H:%M')
|
|
session_end = datetime.strptime(end_time, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
flash('Invalid time format.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
|
|
|
|
if session_end <= session_start:
|
|
flash('End time must be after start time.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
|
|
|
|
if max_attendees and event['max_attendees'] and int(max_attendees) > event['max_attendees']:
|
|
flash(f'Break-out session attendees cannot exceed event capacity ({event["max_attendees"]}).')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
|
|
|
|
cursor.execute("""
|
|
UPDATE breakout_sessions
|
|
SET name = %s, description = %s, start_time = %s, end_time = %s, location = %s, max_attendees = %s
|
|
WHERE id = %s
|
|
""", (name, description, session_start, session_end, location, max_attendees if max_attendees else None, session_id))
|
|
conn.commit()
|
|
|
|
# Capture changes for the confirmation pop-up
|
|
changes = []
|
|
old_session = breakout_session
|
|
if old_session['name'] != name:
|
|
changes.append(('Name', old_session['name'], name))
|
|
if old_session['description'] != description:
|
|
changes.append(('Description', old_session['description'] or '', description))
|
|
if old_session['start_time'].strftime('%Y-%m-%dT%H:%M') != start_time:
|
|
changes.append(('Start Time', old_session['start_time'].strftime('%B %d, %Y at %H:%M'), session_start.strftime('%B %d, %Y at %H:%M')))
|
|
if old_session['end_time'].strftime('%Y-%m-%dT%H:%M') != end_time:
|
|
changes.append(('End Time', old_session['end_time'].strftime('%B %d, %Y at %H:%M'), session_end.strftime('%B %d, %Y at %H:%M')))
|
|
if old_session['location'] != location:
|
|
changes.append(('Location', old_session['location'], location))
|
|
if str(old_session['max_attendees'] or '') != max_attendees:
|
|
changes.append(('Max Attendees', old_session['max_attendees'] or 'Unlimited', max_attendees or 'Unlimited'))
|
|
|
|
# Get updated session data
|
|
cursor.execute("""
|
|
SELECT bs.*, e.name as event_name
|
|
FROM breakout_sessions bs
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bs.id = %s
|
|
""", (session_id,))
|
|
updated_session = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if changes:
|
|
return render_template('organizer/edit_breakout_session_confirm.html',
|
|
session=updated_session, event=event, changes=changes)
|
|
else:
|
|
flash('Breakout session updated successfully!')
|
|
return redirect(url_for('view_breakout_session', code=breakout_session['code']))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_breakout_session.html', session=breakout_session, event=event)
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/breakout-session/<int:session_id>/notify', methods=['POST'])
|
|
@login_required
|
|
def notify_breakout_session_attendees(session_id):
|
|
"""Send email notification to breakout session attendees about changes."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get breakout session and verify organizer has access
|
|
cursor.execute("""
|
|
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id
|
|
FROM breakout_sessions bs
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bs.id = %s
|
|
""", (session_id,))
|
|
breakout_session = cursor.fetchone()
|
|
|
|
if not breakout_session:
|
|
flash('Breakout session not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
if breakout_session['event_organizer_id'] != session['user_id']:
|
|
flash('Access denied.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('index'))
|
|
|
|
# Get attendees who RSVPed to this session
|
|
cursor.execute("""
|
|
SELECT a.email, a.first_name, a.last_name
|
|
FROM breakout_session_rsvps bsr
|
|
JOIN attendees a ON bsr.attendee_id = a.id
|
|
WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered'
|
|
""", (session_id,))
|
|
rsvps = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Get changes text from form
|
|
changes_text = request.form.get('changes_text', 'Session details have been updated.')
|
|
|
|
# Send emails
|
|
sent_count = 0
|
|
failed_count = 0
|
|
for rsvp in rsvps:
|
|
full_name = f"{rsvp['first_name']} {rsvp['last_name']}"
|
|
if send_breakout_session_update_email(rsvp['email'], full_name, breakout_session['name'],
|
|
breakout_session['event_name'], changes_text):
|
|
sent_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
if failed_count == 0:
|
|
flash(f'Notification emails sent to {sent_count} attendee{"s" if sent_count != 1 else ""}.')
|
|
else:
|
|
flash(f'Notifications sent: {sent_count}, failed: {failed_count}.')
|
|
|
|
return redirect(url_for('view_breakout_session', code=breakout_session['code']))
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/breakout-session/<code>')
|
|
@login_required
|
|
def view_breakout_session(code):
|
|
"""Breakout session detail page for organizers."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get breakout session and verify organizer has access
|
|
cursor.execute("""
|
|
SELECT bs.*, e.name as event_name, e.organizer_id as event_organizer_id
|
|
FROM breakout_sessions bs
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bs.code = %s
|
|
""", (code,))
|
|
session_data = cursor.fetchone()
|
|
|
|
if not session_data:
|
|
flash('Breakout session not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Check if user is event organizer or breakout session organizer
|
|
cursor.execute("""
|
|
SELECT id FROM breakout_session_organizers
|
|
WHERE breakout_session_id = %s AND organizer_id = %s
|
|
""", (session_data['id'], session['user_id']))
|
|
is_organizer = cursor.fetchone()
|
|
|
|
if session_data['event_organizer_id'] != session['user_id'] and not is_organizer:
|
|
flash('Access denied.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Get RSVPs
|
|
cursor.execute("""
|
|
SELECT bsr.*, a.first_name, a.last_name, a.email, a.organisation, a.role
|
|
FROM breakout_session_rsvps bsr
|
|
JOIN attendees a ON bsr.attendee_id = a.id
|
|
WHERE bsr.breakout_session_id = %s AND bsr.status = 'registered'
|
|
ORDER BY a.last_name, a.first_name
|
|
""", (session_data['id'],))
|
|
rsvps = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('organizer/breakout_session_detail.html', session=session_data, rsvps=rsvps)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/breakout-sessions')
|
|
@login_required
|
|
def attendee_breakout_sessions():
|
|
"""List breakout sessions for attendee's event."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
|
|
event = cursor.fetchone()
|
|
|
|
cursor.execute("""
|
|
SELECT bs.*,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count,
|
|
(SELECT status FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (session['user_id'], session['event_id']))
|
|
sessions = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('attendee/breakout_sessions.html', event=event, sessions=sessions)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/breakout-session/<code>/rsvp', methods=['POST'])
|
|
@login_required
|
|
def rsvp_breakout_session(code):
|
|
"""RSVP for a breakout session."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get breakout session
|
|
cursor.execute("SELECT * FROM breakout_sessions WHERE code = %s", (code,))
|
|
breakout = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not breakout:
|
|
conn.close()
|
|
return jsonify({'error': 'Breakout session not found'}), 404
|
|
|
|
session_id = breakout['id']
|
|
|
|
# Check capacity
|
|
if breakout['max_attendees']:
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = %s AND status = 'registered'
|
|
""", (session_id,))
|
|
current_count = cursor.fetchone()['count']
|
|
cursor.close()
|
|
|
|
if current_count >= breakout['max_attendees']:
|
|
conn.close()
|
|
return jsonify({'error': 'Breakout session is full'}), 400
|
|
|
|
# Check for overlapping sessions
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT bs.id, bs.name FROM breakout_sessions bs
|
|
JOIN breakout_session_rsvps bsr ON bs.id = bsr.breakout_session_id
|
|
WHERE bsr.attendee_id = %s
|
|
AND bsr.status = 'registered'
|
|
AND bs.id != %s
|
|
AND (
|
|
(bs.start_time <= %s AND bs.end_time > %s)
|
|
OR (bs.start_time < %s AND bs.end_time >= %s)
|
|
OR (bs.start_time >= %s AND bs.end_time <= %s)
|
|
)
|
|
""", (session['user_id'], session_id,
|
|
breakout['start_time'], breakout['start_time'],
|
|
breakout['end_time'], breakout['end_time'],
|
|
breakout['start_time'], breakout['end_time']))
|
|
overlapping = cursor.fetchall()
|
|
cursor.close()
|
|
|
|
if overlapping:
|
|
names = ', '.join([s['name'] for s in overlapping])
|
|
conn.close()
|
|
return jsonify({'error': f'Overlaps with: {names}'}), 400
|
|
|
|
# Check if already registered
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT id, status FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = %s AND attendee_id = %s
|
|
""", (session_id, session['user_id']))
|
|
existing = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if existing:
|
|
if existing['status'] == 'registered':
|
|
conn.close()
|
|
return jsonify({'error': 'Already registered'}), 400
|
|
else:
|
|
# Reactivate cancelled RSVP
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE breakout_session_rsvps SET status = 'registered'
|
|
WHERE id = %s
|
|
""", (existing['id'],))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True, 'message': 'RSVP restored'})
|
|
else:
|
|
# Create new RSVP
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
|
|
VALUES (%s, %s, 'registered')
|
|
""", (session_id, session['user_id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True})
|
|
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/attendee/breakout-session/<code>/cancel-rsvp', methods=['POST'])
|
|
@login_required
|
|
def cancel_breakout_session_rsvp(code):
|
|
"""Cancel RSVP for a breakout session."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT id FROM breakout_sessions WHERE code = %s", (code,))
|
|
breakout = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if not breakout:
|
|
conn.close()
|
|
return jsonify({'error': 'Breakout session not found'}), 404
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE breakout_session_rsvps SET status = 'cancelled'
|
|
WHERE breakout_session_id = %s AND attendee_id = %s AND status = 'registered'
|
|
""", (breakout['id'], session['user_id']))
|
|
affected = cursor.rowcount
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if affected > 0:
|
|
return jsonify({'success': True})
|
|
else:
|
|
return jsonify({'error': 'RSVP not found'}), 404
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/attendee/event/attendance', methods=['GET', 'POST'])
|
|
@login_required
|
|
def update_event_attendance():
|
|
"""Update attendance status for main event."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
if request.method == 'GET':
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT attendance_status FROM attendees WHERE id = %s", (session['user_id'],))
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'status': result['attendance_status'] if result else 'attending'})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
status = request.json.get('status') if request.json else request.form.get('status')
|
|
if status not in ['attending', 'not_attending']:
|
|
return jsonify({'error': 'Invalid status'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE attendees SET attendance_status = %s WHERE id = %s
|
|
""", (status, session['user_id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/organizer/event/<code>')
|
|
@login_required
|
|
def event_detail(code):
|
|
"""Event detail page for organizer."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE code = %s AND organizer_id = %s", (code, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendees WHERE event_id = %s", (event['id'],))
|
|
attendees = cursor.fetchall()
|
|
|
|
cursor.execute("""
|
|
SELECT bs.*,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as registered_count
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (event['id'],))
|
|
breakout_sessions = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event['id'],))
|
|
staff = cursor.fetchall()
|
|
|
|
# Get attendee types for this event
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event['id'],))
|
|
attendee_types = cursor.fetchall()
|
|
|
|
# Build a mapping of attendee type id to type name for attendees
|
|
attendee_type_map = {at['id']: at for at in attendee_types}
|
|
for attendee in attendees:
|
|
attendee['attendee_type_name'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('name') if attendee.get('attendee_type_id') else None
|
|
attendee['attendee_type_price'] = attendee_type_map.get(attendee.get('attendee_type_id'), {}).get('price') if attendee.get('attendee_type_id') else None
|
|
|
|
# Get other events by this organizer for batch staff add
|
|
cursor.execute("SELECT id, name, code FROM events WHERE organizer_id = %s AND id != %s ORDER BY name", (session['user_id'], event['id']))
|
|
other_events = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('organizer/event_detail.html', event=event, attendees=attendees, breakout_sessions=breakout_sessions, staff=staff, other_events=other_events, attendee_types=attendee_types)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/attendee-types', methods=['GET', 'POST'])
|
|
@login_required
|
|
def manage_attendee_types(event_id):
|
|
"""Manage attendee types for an event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (event_id,))
|
|
attendee_types = cursor.fetchall()
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
price = request.form.get('price', '').strip()
|
|
|
|
if not name:
|
|
flash('Type name is required.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types)
|
|
|
|
try:
|
|
price_value = float(price) if price else 0.00
|
|
except ValueError:
|
|
price_value = 0.00
|
|
|
|
type_code = generate_type_code(event_id)
|
|
cursor.execute("""
|
|
INSERT INTO attendee_types (event_id, code, name, price)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (event_id, type_code, name, price_value))
|
|
conn.commit()
|
|
flash(f'Attendee type "{name}" created successfully!')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/attendee_types.html', event=event, attendee_types=attendee_types)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_attendee_type(event_id, type_id):
|
|
"""Edit an attendee type."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id))
|
|
attendee_type = cursor.fetchone()
|
|
|
|
if not attendee_type:
|
|
flash('Attendee type not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
|
|
|
# Check how many attendees are assigned to this type
|
|
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE attendee_type_id = %s", (type_id,))
|
|
attendee_count = cursor.fetchone()['count']
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name', '').strip()
|
|
price = request.form.get('price', '').strip()
|
|
|
|
if not name:
|
|
flash('Type name is required.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type)
|
|
|
|
try:
|
|
price_value = float(price) if price else 0.00
|
|
except ValueError:
|
|
price_value = 0.00
|
|
|
|
cursor.execute("""
|
|
UPDATE attendee_types SET name = %s, price = %s WHERE id = %s
|
|
""", (name, price_value, type_id))
|
|
conn.commit()
|
|
flash(f'Attendee type "{name}" updated successfully!')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type, attendee_count=attendee_count)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_attendee_type(event_id, type_id):
|
|
"""Delete an attendee type."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id))
|
|
attendee_type = cursor.fetchone()
|
|
|
|
if not attendee_type:
|
|
flash('Attendee type not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
|
|
|
# Set attendee_type_id to NULL for all attendees with this type
|
|
cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE attendee_type_id = %s", (type_id,))
|
|
|
|
# Delete the attendee type
|
|
cursor.execute("DELETE FROM attendee_types WHERE id = %s", (type_id,))
|
|
conn.commit()
|
|
|
|
flash(f'Attendee type "{attendee_type["name"]}" deleted successfully!')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/attendees/batch-assign-type', methods=['POST'])
|
|
@login_required
|
|
def batch_assign_attendee_type(event_id):
|
|
"""Batch assign attendee type to selected attendees."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
attendee_ids = request.form.getlist('attendee_ids')
|
|
attendee_type_id = request.form.get('attendee_type_id', '').strip()
|
|
|
|
if not attendee_ids:
|
|
flash('No attendees selected.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
|
|
if attendee_type_id:
|
|
# Validate attendee type exists for this event
|
|
cursor.execute("SELECT id FROM attendee_types WHERE id = %s AND event_id = %s", (attendee_type_id, event_id))
|
|
if not cursor.fetchone():
|
|
flash('Invalid attendee type.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
|
|
for attendee_id in attendee_ids:
|
|
cursor.execute("UPDATE attendees SET attendee_type_id = %s WHERE id = %s AND event_id = %s",
|
|
(attendee_type_id, attendee_id, event_id))
|
|
conn.commit()
|
|
flash(f'Successfully assigned type to {len(attendee_ids)} attendee(s).')
|
|
else:
|
|
# Remove type from selected attendees
|
|
for attendee_id in attendee_ids:
|
|
cursor.execute("UPDATE attendees SET attendee_type_id = NULL WHERE id = %s AND event_id = %s",
|
|
(attendee_id, event_id))
|
|
conn.commit()
|
|
flash(f'Successfully removed type from {len(attendee_ids)} attendee(s).')
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/staff', methods=['GET', 'POST'])
|
|
@login_required
|
|
def manage_event_staff(event_id):
|
|
"""Manage staff for an event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM staff WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
|
staff_members = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if request.method == 'POST':
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
|
|
if not all([first_name, last_name, email]):
|
|
flash('First name, last name, and email are required.')
|
|
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Check if staff with this email already exists for this event
|
|
cursor.execute("SELECT id FROM staff WHERE event_id = %s AND email = %s", (event_id, email))
|
|
if cursor.fetchone():
|
|
flash('A staff member with this email already exists for this event.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
|
|
|
|
invite_token = generate_invite_token()
|
|
staff_code = generate_staff_code()
|
|
cursor.execute("""
|
|
INSERT INTO staff (event_id, email, first_name, last_name, invite_token, staff_code)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
""", (event_id, email, first_name, last_name, invite_token, staff_code))
|
|
conn.commit()
|
|
|
|
# Get organizer email for sending
|
|
cursor.execute("SELECT email FROM organizers WHERE id = %s", (session['user_id'],))
|
|
organizer_email = cursor.fetchone()['email']
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Send invite email
|
|
full_name = f"{first_name} {last_name}"
|
|
if send_staff_invite_email(email, full_name, event['name'], invite_token, organizer_email):
|
|
flash(f'Staff member added! Invite email sent to {email}.')
|
|
else:
|
|
flash(f'Staff member added but failed to send invite email.')
|
|
|
|
return redirect(url_for('event_detail', code=event['code']))
|
|
|
|
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/staff/invite/<token>', methods=['GET', 'POST'])
|
|
def staff_invite(token):
|
|
"""Staff invite acceptance page."""
|
|
recaptcha_site_key = app.config.get('RECAPTCHA_SITE_KEY', '')
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM staff WHERE invite_token = %s AND invite_used = FALSE", (token,))
|
|
staff_member = cursor.fetchone()
|
|
|
|
if not staff_member:
|
|
flash('Invalid or expired invite link.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('index'))
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (staff_member['event_id'],))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password', '')
|
|
confirm_password = request.form.get('confirm_password', '')
|
|
recaptcha_response = request.form.get('g-recaptcha-response', '')
|
|
|
|
if recaptcha_site_key and not verify_recaptcha(recaptcha_response):
|
|
flash('Please complete the reCAPTCHA verification.')
|
|
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
|
|
|
|
if not password or len(password) < 6:
|
|
flash('Password must be at least 6 characters.')
|
|
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.')
|
|
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
password_hash = hash_password(password)
|
|
cursor.execute("""
|
|
UPDATE staff SET password_hash = %s, invite_used = TRUE, invite_token = NULL
|
|
WHERE id = %s
|
|
""", (password_hash, staff_member['id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Registration complete! Please log in with your email and password.')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('staff_invite.html', staff=staff_member, event=event, recaptcha_site_key=recaptcha_site_key)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/staff/<int:staff_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_staff(event_id, staff_id):
|
|
"""Edit a staff member."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id))
|
|
staff_member = cursor.fetchone()
|
|
|
|
if not staff_member:
|
|
flash('Staff member not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('manage_event_staff', event_id=event_id))
|
|
|
|
if request.method == 'POST':
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
|
|
if not all([first_name, last_name, email]):
|
|
flash('First name, last name, and email are required.')
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_staff.html', event=event, staff=staff_member)
|
|
|
|
cursor.execute("""
|
|
UPDATE staff SET first_name = %s, last_name = %s, email = %s
|
|
WHERE id = %s
|
|
""", (first_name, last_name, email, staff_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Staff member updated successfully!')
|
|
return redirect(url_for('manage_event_staff', event_id=event_id))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_staff.html', event=event, staff=staff_member)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/staff/<int:staff_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_staff(event_id, staff_id):
|
|
"""Delete a staff member."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM staff WHERE id = %s AND event_id = %s", (staff_id, event_id))
|
|
staff_member = cursor.fetchone()
|
|
|
|
if not staff_member:
|
|
flash('Staff member not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("DELETE FROM staff WHERE id = %s", (staff_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Staff member removed successfully!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/batch-add-staff', methods=['POST'])
|
|
@login_required
|
|
def batch_add_staff(event_id):
|
|
"""Batch add staff from another event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
source_event_id = request.form.get('source_event_id')
|
|
if not source_event_id:
|
|
flash('Please select an event.')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Verify target event belongs to organizer and get its code
|
|
cursor.execute("SELECT code FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
target_event = cursor.fetchone()
|
|
if not target_event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Verify source event belongs to same organizer
|
|
cursor.execute("SELECT id FROM events WHERE id = %s AND organizer_id = %s", (source_event_id, session['user_id']))
|
|
source_event = cursor.fetchone()
|
|
if not source_event:
|
|
flash('Source event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('event_detail', code=target_event['code']))
|
|
|
|
# Get staff from source event
|
|
cursor.execute("SELECT first_name, last_name, email FROM staff WHERE event_id = %s", (source_event_id,))
|
|
source_staff = cursor.fetchall()
|
|
|
|
if not source_staff:
|
|
flash('No staff members found in the selected event.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('event_detail', code=target_event['code']))
|
|
|
|
# Get existing staff emails in target event to avoid duplicates
|
|
cursor.execute("SELECT email FROM staff WHERE event_id = %s", (event_id,))
|
|
existing_emails = set(row['email'].lower() for row in cursor.fetchall())
|
|
|
|
# Add staff that don't already exist
|
|
added_count = 0
|
|
for member in source_staff:
|
|
if member['email'].lower() not in existing_emails:
|
|
staff_code = generate_staff_code()
|
|
cursor.execute("""
|
|
INSERT INTO staff (event_id, first_name, last_name, email, invite_token, invite_used, created_at, staff_code)
|
|
VALUES (%s, %s, %s, %s, UUID(), FALSE, NOW(), %s)
|
|
""", (event_id, member['first_name'], member['last_name'], member['email'], staff_code))
|
|
existing_emails.add(member['email'].lower())
|
|
added_count += 1
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash(f'{added_count} staff member(s) added from other event.')
|
|
return redirect(url_for('event_detail', code=target_event['code']))
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/breakout-session/<int:session_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_breakout_session(session_id):
|
|
"""Delete a breakout session."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("""
|
|
SELECT bs.*, e.organizer_id, e.code
|
|
FROM breakout_sessions bs
|
|
JOIN events e ON bs.event_id = e.id
|
|
WHERE bs.id = %s
|
|
""", (session_id,))
|
|
session_obj = cursor.fetchone()
|
|
|
|
if not session_obj or session_obj['organizer_id'] != session['user_id']:
|
|
flash('Session not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("DELETE FROM breakout_session_rsvps WHERE breakout_session_id = %s", (session_id,))
|
|
cursor.execute("DELETE FROM breakout_sessions WHERE id = %s", (session_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Breakout session deleted successfully!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/attendee/<int:attendee_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_attendee(attendee_id):
|
|
"""Edit an attendee."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("""
|
|
SELECT a.*, e.organizer_id, e.code
|
|
FROM attendees a
|
|
JOIN events e ON a.event_id = e.id
|
|
WHERE a.id = %s
|
|
""", (attendee_id,))
|
|
attendee = cursor.fetchone()
|
|
|
|
if not attendee or attendee['organizer_id'] != session['user_id']:
|
|
flash('Attendee not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
# Get attendee types for this event
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (attendee['event_id'],))
|
|
attendee_types = cursor.fetchall()
|
|
|
|
if request.method == 'POST':
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
organisation = request.form.get('organisation', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
introduction = request.form.get('introduction', '').strip()
|
|
attendee_type_id = request.form.get('attendee_type_id', '').strip()
|
|
|
|
cursor.execute("""
|
|
UPDATE attendees
|
|
SET first_name = %s, last_name = %s, email = %s, organisation = %s, role = %s, introduction = %s, attendee_type_id = %s
|
|
WHERE id = %s
|
|
""", (first_name, last_name, email, organisation, role, introduction, attendee_type_id if attendee_type_id else None, attendee_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Attendee updated successfully!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('organizer/edit_attendee.html', attendee=attendee, attendee_types=attendee_types)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/attendee/<int:attendee_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_attendee(attendee_id):
|
|
"""Delete an attendee."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("""
|
|
SELECT a.*, e.organizer_id
|
|
FROM attendees a
|
|
JOIN events e ON a.event_id = e.id
|
|
WHERE a.id = %s
|
|
""", (attendee_id,))
|
|
attendee = cursor.fetchone()
|
|
|
|
if not attendee or attendee['organizer_id'] != session['user_id']:
|
|
flash('Attendee not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("DELETE FROM attendees WHERE id = %s", (attendee_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Attendee deleted successfully!')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/badges')
|
|
@login_required
|
|
def event_badges(event_id):
|
|
"""Badge printing view for event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
|
attendees = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Generate QR codes for each attendee
|
|
for attendee in attendees:
|
|
qr_data = f"NETEVENT:{event_id}:{attendee['id']}"
|
|
qr_img = qrcode.make(qr_data, box_size=4, border=1)
|
|
buffer = io.BytesIO()
|
|
qr_img.save(buffer, format='PNG')
|
|
buffer.seek(0)
|
|
attendee['qr_code'] = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
|
|
|
|
return render_template('organizer/badges.html', event=event, attendees=attendees)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/badges/rectangular/download')
|
|
@login_required
|
|
def download_rectangular_badges(event_id):
|
|
"""Download rectangular badge PDFs for event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
|
attendees = cursor.fetchall()
|
|
|
|
# Get attendee types for this event
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
|
|
attendee_types = cursor.fetchall()
|
|
attendee_types_map = {at['id']: at for at in attendee_types}
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not attendees:
|
|
flash('No attendees found for this event.')
|
|
return redirect(url_for('event_badges', event_id=event_id))
|
|
|
|
# Generate rectangular badge PDF
|
|
pdf_buffer = generate_rectangular_badges_pdf(attendees, event, attendee_types_map)
|
|
|
|
filename = f"badges_rectangular_{event.get('code', event_id)}.pdf"
|
|
return send_file(
|
|
pdf_buffer,
|
|
mimetype='application/pdf',
|
|
as_attachment=True,
|
|
download_name=filename
|
|
)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/attendees/excel/download')
|
|
@login_required
|
|
def download_attendees_excel(event_id):
|
|
"""Download all attendees as Excel sheet."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
cursor.close()
|
|
conn.close()
|
|
flash('Event not found.')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
|
attendees = cursor.fetchall()
|
|
|
|
# Get attendee types
|
|
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
|
|
attendee_types = cursor.fetchall()
|
|
attendee_types_map = {at['id']: at for at in attendee_types}
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not attendees:
|
|
flash('No attendees found.')
|
|
return redirect(url_for('event_badges', event_id=event_id))
|
|
|
|
# Create Excel workbook
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Attendees"
|
|
|
|
# Header styling
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
header_fill = PatternFill("solid", fgColor="333333")
|
|
header_alignment = Alignment(horizontal="center")
|
|
|
|
headers = ['ID', 'Voornaam', 'Achternaam', 'Email', 'Organisatie', 'Rol', 'Type', 'Ingecheckt', 'Aangemeld op']
|
|
for col, header in enumerate(headers, 1):
|
|
cell = ws.cell(row=1, column=col, value=header)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = header_alignment
|
|
|
|
# Data rows
|
|
for row, attendee in enumerate(attendees, 2):
|
|
ws.cell(row=row, column=1, value=attendee.get('id'))
|
|
ws.cell(row=row, column=2, value=attendee.get('first_name', ''))
|
|
ws.cell(row=row, column=3, value=attendee.get('last_name', ''))
|
|
ws.cell(row=row, column=4, value=attendee.get('email', ''))
|
|
ws.cell(row=row, column=5, value=attendee.get('organisation', ''))
|
|
ws.cell(row=row, column=6, value=attendee.get('role', ''))
|
|
ws.cell(row=row, column=7, value=attendee_types_map.get(attendee.get('attendee_type_id'), {}).get('name', ''))
|
|
ws.cell(row=row, column=8, value='Ja' if attendee.get('checked_in') else 'Nee')
|
|
ws.cell(row=row, column=9, value=str(attendee.get('created_at', ''))[:19] if attendee.get('created_at') else '')
|
|
|
|
# Auto-fit column widths
|
|
for col in ws.columns:
|
|
max_length = 0
|
|
column_letter = col[0].column_letter
|
|
for cell in col:
|
|
if cell.value:
|
|
max_length = max(max_length, len(str(cell.value)))
|
|
ws.column_dimensions[column_letter].width = min(max_length + 2, 30)
|
|
|
|
filename = f"attendees_{event.get('code', event_id)}.xlsx"
|
|
buffer = io.BytesIO()
|
|
wb.save(buffer)
|
|
buffer.seek(0)
|
|
|
|
return send_file(
|
|
buffer,
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
as_attachment=True,
|
|
download_name=filename
|
|
)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
@login_required
|
|
def event_stats(event_id):
|
|
"""Attendance statistics for event."""
|
|
if session.get('user_type') != 'organizer':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
cursor.execute("SELECT COUNT(*) as total FROM attendees WHERE event_id = %s", (event_id,))
|
|
total = cursor.fetchone()['total']
|
|
|
|
cursor.execute("SELECT COUNT(*) as checked_in FROM attendees WHERE event_id = %s AND checked_in = TRUE", (event_id,))
|
|
checked_in = cursor.fetchone()['checked_in']
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'total': total,
|
|
'checked_in': checked_in,
|
|
'not_checked_in': total - checked_in
|
|
})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/checkin/<int:attendee_id>', methods=['POST'])
|
|
@login_required
|
|
def checkin_attendee(event_id, attendee_id):
|
|
"""Check in an attendee."""
|
|
if session.get('user_type') != 'organizer':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if already checked in
|
|
cursor.execute(
|
|
"SELECT checked_in FROM attendees WHERE id = %s AND event_id = %s",
|
|
(attendee_id, event_id)
|
|
)
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Attendee not found'}), 404
|
|
|
|
already_checked_in = result[0]
|
|
|
|
cursor.execute(
|
|
"UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s",
|
|
(attendee_id, event_id)
|
|
)
|
|
conn.commit()
|
|
|
|
# Get attendee name for response
|
|
cursor.execute(
|
|
"SELECT first_name, last_name FROM attendees WHERE id = %s",
|
|
(attendee_id,)
|
|
)
|
|
attendee = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'already_checked_in': bool(already_checked_in),
|
|
'attendee_name': f"{attendee[0]} {attendee[1]}" if attendee else 'Unknown'
|
|
})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/checkin/code', methods=['POST'])
|
|
@login_required
|
|
def checkin_by_code(event_id):
|
|
"""Check in an attendee by their unique attendee code."""
|
|
if session.get('user_type') not in ['organizer', 'staff']:
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
data = request.get_json()
|
|
attendee_code = data.get('attendee_code') if data else None
|
|
|
|
if not attendee_code:
|
|
return jsonify({'error': 'Attendee code required'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Find attendee by code and event
|
|
cursor.execute(
|
|
"SELECT id, first_name, last_name, checked_in FROM attendees WHERE attendee_code = %s AND event_id = %s",
|
|
(attendee_code, event_id)
|
|
)
|
|
attendee = cursor.fetchone()
|
|
|
|
if not attendee:
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Attendee not found'}), 404
|
|
|
|
already_checked_in = attendee['checked_in']
|
|
|
|
cursor.execute(
|
|
"UPDATE attendees SET checked_in = TRUE WHERE id = %s AND event_id = %s",
|
|
(attendee['id'], event_id)
|
|
)
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'already_checked_in': bool(already_checked_in),
|
|
'attendee_name': f"{attendee['first_name']} {attendee['last_name']}"
|
|
})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/organizer/event/<int:event_id>/scan')
|
|
@login_required
|
|
def event_scan(event_id):
|
|
"""QR code scanner page for check-in."""
|
|
if session.get('user_type') not in ['organizer', 'staff']:
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Organizers can access any event they own, staff can only access their event
|
|
if session.get('user_type') == 'organizer':
|
|
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
|
else:
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (event_id,))
|
|
# Verify staff belongs to this event
|
|
if session.get('event_id') != event_id:
|
|
flash('Access denied.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('index'))
|
|
|
|
event = cursor.fetchone()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
cursor.close()
|
|
conn.close()
|
|
if session.get('user_type') == 'organizer':
|
|
return redirect(url_for('organizer_dashboard'))
|
|
return redirect(url_for('staff_event_dashboard', event_id=session.get('event_id')))
|
|
|
|
cursor.execute("SELECT id, first_name, last_name, checked_in FROM attendees WHERE event_id = %s", (event_id,))
|
|
attendees = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if session.get('user_type') == 'staff':
|
|
return render_template('staff/scan.html', event=event, attendees=attendees)
|
|
return render_template('organizer/scan.html', event=event, attendees=attendees)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('organizer_dashboard'))
|
|
|
|
|
|
# Routes - Attendee
|
|
@app.route('/attendee/dashboard')
|
|
@login_required
|
|
def attendee_dashboard():
|
|
"""Attendee dashboard."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
|
|
event = cursor.fetchone()
|
|
|
|
if event:
|
|
# Get attendee info
|
|
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
|
|
attendee = cursor.fetchone()
|
|
|
|
# Get connections
|
|
cursor.execute("""
|
|
SELECT c.*, a.first_name, a.last_name, a.organisation, a.role
|
|
FROM connections c
|
|
JOIN attendees a ON c.connected_attendee_id = a.id
|
|
WHERE c.attendee_id = %s AND c.status = 'accepted'
|
|
""", (session['user_id'],))
|
|
connections = cursor.fetchall()
|
|
|
|
# Get pending connections
|
|
cursor.execute("""
|
|
SELECT c.*, a.first_name, a.last_name, a.organisation, a.role
|
|
FROM connections c
|
|
JOIN attendees a ON c.attendee_id = a.id
|
|
WHERE c.connected_attendee_id = %s AND c.status = 'pending'
|
|
""", (session['user_id'],))
|
|
pending_connections = cursor.fetchall()
|
|
|
|
# Get appointments
|
|
cursor.execute("""
|
|
SELECT ap.*, a.first_name as requester_first_name, a.last_name as requester_last_name,
|
|
at.first_name as target_first_name, at.last_name as target_last_name
|
|
FROM appointments ap
|
|
JOIN attendees a ON ap.requester_id = a.id
|
|
JOIN attendees at ON ap.target_id = at.id
|
|
WHERE ap.requester_id = %s OR ap.target_id = %s
|
|
ORDER BY ap.appointment_time
|
|
""", (session['user_id'], session['user_id']))
|
|
appointments = cursor.fetchall()
|
|
else:
|
|
event = None
|
|
attendee = None
|
|
connections = []
|
|
pending_connections = []
|
|
appointments = []
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('attendee/dashboard.html', event=event, attendee=attendee,
|
|
connections=connections, pending_connections=pending_connections,
|
|
appointments=appointments)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/attendee/badge/download')
|
|
@login_required
|
|
def download_badge():
|
|
"""Generate and download attendee badge PDF."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get attendee info
|
|
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
|
|
attendee = cursor.fetchone()
|
|
|
|
# Get event info
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
|
|
event = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not attendee:
|
|
flash('Attendee not found.')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
# Generate PDF
|
|
pdf_buffer = generate_badge_pdf(attendee, event)
|
|
|
|
return send_file(
|
|
pdf_buffer,
|
|
mimetype='application/pdf',
|
|
as_attachment=True,
|
|
download_name=f'badge_{attendee["first_name"]}_{attendee["last_name"]}.pdf'
|
|
)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/badge/email', methods=['POST'])
|
|
@login_required
|
|
def email_badge():
|
|
"""Send badge PDF to attendee via email."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get attendee info
|
|
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
|
|
attendee = cursor.fetchone()
|
|
|
|
# Get event info
|
|
cursor.execute("SELECT * FROM events WHERE id = %s", (session['event_id'],))
|
|
event = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not attendee:
|
|
flash('Attendee not found.')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
# Generate PDF
|
|
pdf_buffer = generate_badge_pdf(attendee, event)
|
|
|
|
# Send email
|
|
if send_badge_email(attendee['email'], f"{attendee['first_name']} {attendee['last_name']}",
|
|
event['name'] if event else 'Event', pdf_buffer):
|
|
flash('Badge sent to your email!')
|
|
else:
|
|
flash('Failed to send badge email. Please try again.')
|
|
|
|
return redirect(url_for('attendee_dashboard'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def attendee_profile():
|
|
"""Attendee profile page."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM attendees WHERE id = %s", (session['user_id'],))
|
|
attendee = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if request.method == 'POST':
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
organisation = request.form.get('organisation', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
introduction = request.form.get('introduction', '').strip()
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE attendees SET first_name = %s, last_name = %s, organisation = %s, role = %s, introduction = %s, phone = %s, linkedin = %s
|
|
WHERE id = %s
|
|
""", (first_name, last_name, organisation, role, introduction, request.form.get('phone', '').strip(), request.form.get('linkedin', '').strip(), session['user_id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
flash('Profile updated successfully!')
|
|
return redirect(url_for('attendee_profile'))
|
|
|
|
return render_template('attendee/profile.html', attendee=attendee)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/photo', methods=['POST'])
|
|
@login_required
|
|
def upload_photo():
|
|
"""Upload profile photo."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
if 'photo' not in request.files:
|
|
return jsonify({'error': 'No file provided'}), 400
|
|
|
|
file = request.files['photo']
|
|
if file.filename == '':
|
|
return jsonify({'error': 'No file selected'}), 400
|
|
|
|
if file:
|
|
# Generate unique filename
|
|
filename = f"{uuid.uuid4()}.jpg"
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
|
|
# Delete old photo if exists
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT profile_picture FROM attendees WHERE id = %s", (session['user_id'],))
|
|
old_photo = cursor.fetchone()['profile_picture']
|
|
if old_photo:
|
|
old_path = os.path.join(app.config['UPLOAD_FOLDER'], old_photo)
|
|
if os.path.exists(old_path):
|
|
os.remove(old_path)
|
|
|
|
file.save(filepath)
|
|
|
|
cursor.execute("UPDATE attendees SET profile_picture = %s WHERE id = %s", (filename, session['user_id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'success': True, 'filename': filename})
|
|
|
|
return jsonify({'error': 'Invalid file type'}), 400
|
|
|
|
|
|
# Routes - Attendees list and connections
|
|
@app.route('/attendees')
|
|
@login_required
|
|
def list_attendees():
|
|
"""List other attendees at the same event."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("""
|
|
SELECT a.*,
|
|
(SELECT status FROM connections WHERE attendee_id = %s AND connected_attendee_id = a.id) as my_status
|
|
FROM attendees a
|
|
WHERE a.event_id = %s AND a.id != %s
|
|
""", (session['user_id'], session['event_id'], session['user_id']))
|
|
attendees = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('attendee/attendees.html', attendees=attendees)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/connections', methods=['POST'])
|
|
@login_required
|
|
def create_connection():
|
|
"""Send a connection request."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
connected_id = request.form.get('connected_attendee_id')
|
|
if not connected_id:
|
|
return jsonify({'error': 'Attendee ID required'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if connection already exists
|
|
cursor.execute("""
|
|
SELECT id FROM connections
|
|
WHERE attendee_id = %s AND connected_attendee_id = %s
|
|
""", (session['user_id'], connected_id))
|
|
if cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Connection already exists'}), 400
|
|
|
|
cursor.execute("""
|
|
INSERT INTO connections (attendee_id, connected_attendee_id, status)
|
|
VALUES (%s, %s, 'pending')
|
|
""", (session['user_id'], connected_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'success': True})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/connections/<int:connection_id>', methods=['PUT'])
|
|
@login_required
|
|
def update_connection(connection_id):
|
|
"""Accept or reject a connection request."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
status = request.json.get('status') if request.json else request.form.get('status')
|
|
if status not in ['accepted', 'rejected']:
|
|
return jsonify({'error': 'Invalid status'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Verify this connection is targeting the current user
|
|
cursor.execute("""
|
|
SELECT id FROM connections
|
|
WHERE id = %s AND connected_attendee_id = %s AND status = 'pending'
|
|
""", (connection_id, session['user_id']))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Connection not found'}), 404
|
|
|
|
cursor.execute("UPDATE connections SET status = %s WHERE id = %s", (status, connection_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'success': True})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/attendee/scan')
|
|
@login_required
|
|
def attendee_scan():
|
|
"""QR scanner page for attendees to scan each other."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
return render_template('attendee/scan.html')
|
|
|
|
|
|
@app.route('/attendee/scan-request', methods=['POST'])
|
|
@login_required
|
|
def scan_request_connection():
|
|
"""Send a connection request when scanning someone's QR code."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
scanned_id = request.json.get('scanned_id') if request.json else request.form.get('scanned_id')
|
|
if not scanned_id:
|
|
return jsonify({'error': 'Scanned attendee ID required'}), 400
|
|
|
|
scanned_id = int(scanned_id)
|
|
|
|
# Can't connect to yourself
|
|
if scanned_id == session['user_id']:
|
|
return jsonify({'error': 'Cannot connect to yourself'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Verify scanned attendee exists and is at same event
|
|
cursor.execute("""
|
|
SELECT id FROM attendees WHERE id = %s AND event_id = %s
|
|
""", (scanned_id, session['event_id']))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Attendee not found at this event'}), 404
|
|
|
|
# Check if connection already exists
|
|
cursor.execute("""
|
|
SELECT id, status FROM connections
|
|
WHERE attendee_id = %s AND connected_attendee_id = %s
|
|
""", (session['user_id'], scanned_id))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
if existing['status'] == 'accepted':
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Already connected'}), 400
|
|
elif existing['status'] == 'pending':
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Request already pending'}), 400
|
|
else:
|
|
# Rejected - allow new request
|
|
cursor.execute("""
|
|
UPDATE connections SET status = 'pending', attendee_id = %s, connected_attendee_id = %s
|
|
WHERE id = %s
|
|
""", (session['user_id'], scanned_id, existing['id']))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True, 'message': 'Request sent, awaiting approval'})
|
|
else:
|
|
# Create new connection request
|
|
cursor.execute("""
|
|
INSERT INTO connections (attendee_id, connected_attendee_id, status)
|
|
VALUES (%s, %s, 'pending')
|
|
""", (session['user_id'], scanned_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True, 'message': 'Request sent, awaiting approval'})
|
|
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/attendee/connection-requests')
|
|
@login_required
|
|
def connection_requests():
|
|
"""Show pending connection requests for the current user."""
|
|
if session.get('user_type') != 'attendee':
|
|
flash('Access denied.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Get pending requests where current user is the target
|
|
cursor.execute("""
|
|
SELECT c.*, a.first_name, a.last_name, a.email, a.organisation, a.role,
|
|
a.introduction, a.profile_picture
|
|
FROM connections c
|
|
JOIN attendees a ON c.attendee_id = a.id
|
|
WHERE c.connected_attendee_id = %s AND c.status = 'pending'
|
|
ORDER BY c.created_at DESC
|
|
""", (session['user_id'],))
|
|
requests = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('attendee/connection_requests.html', requests=requests)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('attendee_dashboard'))
|
|
|
|
|
|
@app.route('/attendee/connection-request/<int:connection_id>', methods=['POST'])
|
|
@login_required
|
|
def respond_to_connection(connection_id):
|
|
"""Approve or reject a connection request."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
action = request.json.get('action') if request.json else request.form.get('action')
|
|
if action not in ['approve', 'reject']:
|
|
return jsonify({'error': 'Invalid action'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Verify this request is for the current user
|
|
cursor.execute("""
|
|
SELECT c.*, a.first_name, a.last_name, a.profile_picture,
|
|
a.email, a.organisation, a.role, a.introduction
|
|
FROM connections c
|
|
JOIN attendees a ON c.attendee_id = a.id
|
|
WHERE c.id = %s AND c.connected_attendee_id = %s AND c.status = 'pending'
|
|
""", (connection_id, session['user_id']))
|
|
connection = cursor.fetchone()
|
|
|
|
if not connection:
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Connection request not found'}), 404
|
|
|
|
if action == 'approve':
|
|
cursor.execute("UPDATE connections SET status = 'accepted' WHERE id = %s", (connection_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True, 'status': 'accepted'})
|
|
else:
|
|
cursor.execute("UPDATE connections SET status = 'rejected' WHERE id = %s", (connection_id,))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'success': True, 'status': 'rejected'})
|
|
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# Routes - Appointments
|
|
@app.route('/appointments', methods=['POST'])
|
|
@login_required
|
|
def create_appointment():
|
|
"""Request an appointment."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
target_id = request.form.get('target_id')
|
|
appointment_time = request.form.get('appointment_time')
|
|
location = request.form.get('location', '').strip()
|
|
notes = request.form.get('notes', '').strip()
|
|
|
|
if not all([target_id, appointment_time]):
|
|
return jsonify({'error': 'Target and time are required'}), 400
|
|
|
|
try:
|
|
apt_datetime = datetime.strptime(appointment_time, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
return jsonify({'error': 'Invalid datetime format'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO appointments (event_id, requester_id, target_id, appointment_time, location, notes, status)
|
|
VALUES (%s, %s, %s, %s, %s, %s, 'pending')
|
|
""", (session['event_id'], session['user_id'], target_id, apt_datetime, location, notes))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'success': True})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/appointments/<int:appointment_id>', methods=['PUT'])
|
|
@login_required
|
|
def update_appointment(appointment_id):
|
|
"""Accept or reject an appointment."""
|
|
if session.get('user_type') != 'attendee':
|
|
return jsonify({'error': 'Access denied'}), 403
|
|
|
|
status = request.json.get('status') if request.json else request.form.get('status')
|
|
if status not in ['accepted', 'rejected']:
|
|
return jsonify({'error': 'Invalid status'}), 400
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Verify user is the target of this appointment
|
|
cursor.execute("""
|
|
SELECT id FROM appointments
|
|
WHERE id = %s AND target_id = %s AND status = 'pending'
|
|
""", (appointment_id, session['user_id']))
|
|
if not cursor.fetchone():
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify({'error': 'Appointment not found'}), 404
|
|
|
|
cursor.execute("UPDATE appointments SET status = %s WHERE id = %s", (status, appointment_id))
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'success': True})
|
|
except Error as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# Routes - Public event listing
|
|
@app.route('/events')
|
|
def list_events():
|
|
"""List all upcoming public events."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT e.*, o.name as organizer_name,
|
|
(SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count
|
|
FROM events e
|
|
JOIN organizers o ON e.organizer_id = o.id
|
|
WHERE e.start_time >= NOW()
|
|
ORDER BY e.start_time ASC
|
|
""")
|
|
events = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('index.html', events=events, show_events=True)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/event/register/<code>', methods=['GET', 'POST'])
|
|
@app.route('/event/register/<code>/<type_code>', methods=['GET', 'POST'])
|
|
def register_event(code, type_code=None):
|
|
"""Registration page for an event with breakout session registration."""
|
|
# Look up attendee type if type_code is provided
|
|
preselected_type = None
|
|
if type_code:
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM attendee_types WHERE code = %s", (type_code,))
|
|
preselected_type = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not preselected_type:
|
|
flash('Invalid attendee type.')
|
|
return redirect(url_for('index'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
return redirect(url_for('index'))
|
|
|
|
# Verify type_code belongs to this event
|
|
if preselected_type and preselected_type['event_id'] != event['id']:
|
|
flash('Invalid attendee type for this event.')
|
|
return redirect(url_for('index'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
# Get breakout sessions for the event
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT bs.*,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (event['id'],))
|
|
sessions = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
except Error as e:
|
|
sessions = []
|
|
|
|
# Check if user is already registered (for showing breakout sessions)
|
|
registered = False
|
|
user_id = session.get('user_id')
|
|
user_type = session.get('user_type')
|
|
|
|
if user_id and user_type == 'attendee' and session.get('event_id') == event['id']:
|
|
registered = True
|
|
# Add my_rsvp_status to sessions
|
|
for session_item in sessions:
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT status FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = %s AND attendee_id = %s
|
|
""", (session_item['id'], user_id))
|
|
result = cursor.fetchone()
|
|
session_item['my_rsvp_status'] = result['status'] if result else None
|
|
cursor.close()
|
|
conn.close()
|
|
except Error:
|
|
session_item['my_rsvp_status'] = None
|
|
else:
|
|
for session_item in sessions:
|
|
session_item['my_rsvp_status'] = None
|
|
|
|
# Handle registration form submission
|
|
if request.method == 'POST' and not registered:
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
password = request.form.get('password', '')
|
|
confirm_password = request.form.get('confirm_password', '')
|
|
organisation = request.form.get('organisation', '').strip()
|
|
role = request.form.get('role', '').strip()
|
|
phone = request.form.get('phone', '').strip()
|
|
linkedin = request.form.get('linkedin', '').strip()
|
|
introduction = request.form.get('introduction', '').strip()
|
|
selected_sessions = request.form.getlist('breakout_sessions')
|
|
|
|
if not all([first_name, last_name, email, password]):
|
|
flash('First name, last name, email, and password are required.')
|
|
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type)
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.')
|
|
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=False, preselected_type=preselected_type)
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Check if email already registered for this event
|
|
cursor.execute(
|
|
"SELECT id FROM attendees WHERE email = %s AND event_id = %s",
|
|
(email, event['id'])
|
|
)
|
|
if cursor.fetchone():
|
|
flash('Email already registered for this event. Please log in.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('login'))
|
|
|
|
# Check if this type requires payment
|
|
if preselected_type and preselected_type.get('price') and preselected_type.get('price') > 0:
|
|
# Store registration data in session for payment processing
|
|
session['pending_registration'] = {
|
|
'event_id': event['id'],
|
|
'email': email,
|
|
'password': password,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'organisation': organisation,
|
|
'role': role,
|
|
'phone': phone,
|
|
'linkedin': linkedin,
|
|
'introduction': introduction,
|
|
'selected_sessions': selected_sessions,
|
|
'attendee_type_id': preselected_type['id'],
|
|
'type_name': preselected_type['name'],
|
|
'price': preselected_type['price']
|
|
}
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('payment_page', event_code=event['code']))
|
|
|
|
# Complete registration directly for free types
|
|
confirmation_token = uuid.uuid4().hex
|
|
password_hash = hash_password(password)
|
|
attendee_code = generate_attendee_code()
|
|
cursor.execute("""
|
|
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (event['id'], email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, preselected_type['id'] if preselected_type else None))
|
|
attendee_id = cursor.lastrowid
|
|
|
|
# Process breakout session selections
|
|
for session_id in selected_sessions:
|
|
cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s",
|
|
(session_id, event['id']))
|
|
session_data = cursor.fetchone()
|
|
|
|
if session_data:
|
|
if session_data['max_attendees']:
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = %s AND status = 'registered'
|
|
""", (session_id,))
|
|
count = cursor.fetchone()['count']
|
|
if count >= session_data['max_attendees']:
|
|
continue
|
|
|
|
cursor.execute("""
|
|
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
|
|
VALUES (%s, %s, 'registered')
|
|
""", (session_id, attendee_id))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced'
|
|
personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True)
|
|
send_attendee_confirmation_email(email, first_name, event['name'], event_date, event['location'], personal_page_url)
|
|
|
|
session['user_id'] = attendee_id
|
|
session['user_type'] = 'attendee'
|
|
session['event_id'] = event['id']
|
|
|
|
flash('Registration successful!')
|
|
return redirect(url_for('attendee_personal_page', token=confirmation_token))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('attendee/event_register.html', event=event, sessions=sessions, registered=registered, preselected_type=preselected_type)
|
|
|
|
|
|
@app.route('/event/<code>/payment', methods=['GET', 'POST'])
|
|
def payment_page(code):
|
|
"""Payment page for paid attendee types."""
|
|
pending = session.get('pending_registration')
|
|
|
|
if not pending or pending.get('event_id') is None:
|
|
flash('No pending registration found.')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM events WHERE code = %s", (code,))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
return redirect(url_for('index'))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
# Process payment (simulated - in production, integrate with payment gateway like Stripe)
|
|
# For now, we just complete the registration
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Generate confirmation token
|
|
confirmation_token = uuid.uuid4().hex
|
|
password_hash = hash_password(pending['password'])
|
|
attendee_code = generate_attendee_code()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO attendees (event_id, email, password_hash, first_name, last_name, organisation, role, phone, linkedin, introduction, confirmation_token, attendee_code, attendee_type_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (pending['event_id'], pending['email'], password_hash, pending['first_name'], pending['last_name'],
|
|
pending['organisation'], pending['role'], pending['phone'], pending['linkedin'],
|
|
pending['introduction'], confirmation_token, attendee_code, pending['attendee_type_id']))
|
|
attendee_id = cursor.lastrowid
|
|
|
|
# Process breakout session selections
|
|
for session_id in pending.get('selected_sessions', []):
|
|
cursor.execute("SELECT * FROM breakout_sessions WHERE id = %s AND event_id = %s",
|
|
(session_id, pending['event_id']))
|
|
session_data = cursor.fetchone()
|
|
|
|
if session_data:
|
|
if session_data['max_attendees']:
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = %s AND status = 'registered'
|
|
""", (session_id,))
|
|
count = cursor.fetchone()['count']
|
|
if count >= session_data['max_attendees']:
|
|
continue
|
|
|
|
cursor.execute("""
|
|
INSERT INTO breakout_session_rsvps (breakout_session_id, attendee_id, status)
|
|
VALUES (%s, %s, 'registered')
|
|
""", (session_id, attendee_id))
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Clear pending registration
|
|
session.pop('pending_registration', None)
|
|
|
|
# Send confirmation email
|
|
event_date = localized_date(event['start_time']) if event['start_time'] else 'To be announced'
|
|
personal_page_url = url_for('attendee_personal_page', token=confirmation_token, _external=True)
|
|
send_attendee_confirmation_email(pending['email'], pending['first_name'], event['name'], event_date, event['location'], personal_page_url)
|
|
|
|
# Auto-login the attendee
|
|
session['user_id'] = attendee_id
|
|
session['user_type'] = 'attendee'
|
|
session['event_id'] = pending['event_id']
|
|
|
|
flash('Registration and payment successful!')
|
|
return redirect(url_for('attendee_personal_page', token=confirmation_token))
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
|
|
return render_template('attendee/payment.html', event=event, pending=pending)
|
|
|
|
|
|
@app.route('/attendee/request-link', methods=['GET', 'POST'])
|
|
def request_attendee_link():
|
|
"""Page to request a personal page link by email."""
|
|
if request.method == 'POST':
|
|
email = request.form.get('email', '').strip()
|
|
|
|
if not email:
|
|
flash('Email is required.')
|
|
return render_template('attendee/request_link.html')
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Check if attendee exists with this email
|
|
cursor.execute("SELECT * FROM attendees WHERE email = %s", (email,))
|
|
attendee = cursor.fetchone()
|
|
|
|
if attendee:
|
|
# Use confirmation_token if available, otherwise attendee_code
|
|
token = attendee['confirmation_token'] if attendee['confirmation_token'] else attendee['attendee_code']
|
|
personal_page_url = url_for('attendee_personal_page', token=token, _external=True)
|
|
|
|
full_name = f"{attendee['first_name']} {attendee['last_name']}"
|
|
|
|
# Get event name
|
|
cursor.execute("SELECT name FROM events WHERE id = %s", (attendee['event_id'],))
|
|
event = cursor.fetchone()
|
|
event_name = event['name'] if event else 'Event'
|
|
|
|
# Send email
|
|
send_attendee_confirmation_email(
|
|
attendee_email=email,
|
|
attendee_name=full_name,
|
|
event_name=event_name,
|
|
event_date='See your profile',
|
|
event_location='See your profile',
|
|
personal_page_url=personal_page_url
|
|
)
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Always show success message for security (don't reveal if email exists)
|
|
flash(t('email_sent'))
|
|
return render_template('attendee/request_link.html')
|
|
|
|
except Exception as e:
|
|
flash(f'Error: {e}')
|
|
return render_template('attendee/request_link.html')
|
|
|
|
return render_template('attendee/request_link.html')
|
|
|
|
|
|
@app.route('/attendee/personal/<token>')
|
|
def attendee_personal_page(token):
|
|
"""Personal page accessed via confirmation email link."""
|
|
client_ip = request.remote_addr
|
|
|
|
# Check if IP is blocked
|
|
if is_ip_blocked_for_personal_page(client_ip):
|
|
flash(f'Too many invalid attempts. Please try again later (blocked for 30 minutes).')
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# Find attendee by confirmation token
|
|
cursor.execute("SELECT * FROM attendees WHERE confirmation_token = %s", (token,))
|
|
attendee = cursor.fetchone()
|
|
|
|
# Fallback: also accept attendee_code for old links where token was cleared
|
|
if not attendee:
|
|
cursor.execute("SELECT * FROM attendees WHERE attendee_code = %s", (token,))
|
|
attendee = cursor.fetchone()
|
|
|
|
if not attendee:
|
|
record_failed_personal_page_attempt(client_ip)
|
|
flash('Invalid link.')
|
|
cursor.close()
|
|
conn.close()
|
|
return redirect(url_for('index'))
|
|
|
|
# Clear failed attempts on successful access
|
|
clear_failed_personal_page_attempts(client_ip)
|
|
|
|
# Auto-login the attendee
|
|
# Note: confirmation_token is kept so the link remains valid indefinitely
|
|
session['user_id'] = attendee['id']
|
|
session['user_type'] = 'attendee'
|
|
session['event_id'] = attendee['event_id']
|
|
|
|
# Get all events for this attendee
|
|
cursor.execute("""
|
|
SELECT e.* FROM events e
|
|
WHERE e.id = %s
|
|
""", (attendee['event_id'],))
|
|
events = cursor.fetchall()
|
|
|
|
# For each event, get breakout sessions with RSVP status
|
|
for event in events:
|
|
cursor.execute("""
|
|
SELECT bs.*,
|
|
(SELECT COUNT(*) FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = bs.id AND status = 'registered') as rsvp_count,
|
|
(SELECT status FROM breakout_session_rsvps
|
|
WHERE breakout_session_id = bs.id AND attendee_id = %s) as my_rsvp_status
|
|
FROM breakout_sessions bs
|
|
WHERE bs.event_id = %s
|
|
ORDER BY bs.start_time
|
|
""", (attendee['id'], event['id']))
|
|
event['breakout_sessions'] = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return render_template('attendee/personal.html', attendee=attendee, events=events)
|
|
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/event/<int:event_id>')
|
|
def view_event(event_id):
|
|
"""View event details."""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT e.*, o.name as organizer_name,
|
|
(SELECT COUNT(*) FROM attendees WHERE event_id = e.id) as attendee_count
|
|
FROM events e
|
|
JOIN organizers o ON e.organizer_id = o.id
|
|
WHERE e.id = %s
|
|
""", (event_id,))
|
|
event = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not event:
|
|
flash('Event not found.')
|
|
return redirect(url_for('index'))
|
|
|
|
return render_template('attendee/event.html', event=event)
|
|
except Error as e:
|
|
flash(f'Database error: {e}')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Ensure staff_code column exists in organizers and staff tables
|
|
ensure_staff_code_column()
|
|
ensure_all_organizers_have_staff_code()
|
|
ensure_staff_staff_code_column()
|
|
ensure_all_staff_have_staff_code()
|
|
|
|
app.run(debug=True, host='0.0.0.0', port=5002)
|