CREATE SOURCE aviation_source (
    flight_date VARCHAR,
    flight_status VARCHAR,
    departure_airport VARCHAR,
    departure_timezone VARCHAR,
    departure_iata VARCHAR,
    departure_icao VARCHAR,
    departure_terminal VARCHAR,
    departure_gate VARCHAR,
    departure_delay VARCHAR,
    departure_scheduled TIMESTAMP WITH TIME ZONE,
    departure_estimated TIMESTAMP WITH TIME ZONE,
    departure_actual TIMESTAMP WITH TIME ZONE,
    departure_estimated_runway TIMESTAMP WITH TIME ZONE,
    departure_actual_runway TIMESTAMP WITH TIME ZONE,
    arrival_airport VARCHAR,
    arrival_timezone VARCHAR,
    arrival_iata VARCHAR,
    arrival_icao VARCHAR,
    arrival_terminal VARCHAR,
    arrival_gate VARCHAR,
    arrival_baggage VARCHAR,
    arrival_delay VARCHAR,
    arrival_scheduled TIMESTAMP WITH TIME ZONE,
    arrival_estimated TIMESTAMP WITH TIME ZONE,
    arrival_actual TIMESTAMP WITH TIME ZONE,
    arrival_estimated_runway TIMESTAMP WITH TIME ZONE,
    arrival_actual_runway TIMESTAMP WITH TIME ZONE,
    airline_name VARCHAR,
    airline_iata VARCHAR,
    airline_icao VARCHAR,
    flight_number VARCHAR,
    flight_iata VARCHAR,
    flight_icao VARCHAR,
    codeshared_airline_name VARCHAR,
    codeshared_airline_iata VARCHAR,
    codeshared_airline_icao VARCHAR,
    codeshared_flight_number VARCHAR,
    codeshared_flight_iata VARCHAR
) WITH (
  connector = 'kafka',
  topic='Insta-topic',
  properties.bootstrap.server = 'x.x.x.x:9092',
  scan.startup.mode = 'earliest',
  properties.sasl.mechanism = 'SCRAM-SHA-256',
  properties.security.protocol = 'SASL_PLAINTEXT',
  properties.sasl.username = 'ickafka',
  properties.sasl.password = 'xxxxxx'
) FORMAT PLAIN ENCODE JSON;