Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use pyflink to read data file in S3 bucket? #22

Open
waynelxb opened this issue Aug 3, 2024 · 8 comments
Open

How to use pyflink to read data file in S3 bucket? #22

waynelxb opened this issue Aug 3, 2024 · 8 comments

Comments

@waynelxb
Copy link

waynelxb commented Aug 3, 2024

Hi Jaehyeon,
This is just a question about my practice, not a question about your solution. I am not sure whether is proper to ask here.
I tried to followed your solution of Lab 2. To make it simple, I just want to have the pyflink read the source file in the local docker container folder, and read the source file in S3 bucket.
Now the pyflink can read the source file in local docker container folder with the simplied pyflink code.
image

`

  from pyflink.table import EnvironmentSettings, TableEnvironment      
  env_settings = EnvironmentSettings.in_streaming_mode()
  table_env = TableEnvironment.create(env_settings)
  
  def create_source_table(table_name: str, file_path: str):
      stmt = f"""
      CREATE TABLE {table_name} (
          id                  VARCHAR,
          vendor_id           INT,
          pickup_datetime     VARCHAR,
          dropoff_datetime    VARCHAR,
          passenger_count     INT,
          pickup_longitude    VARCHAR,
          pickup_latitude     VARCHAR,
          dropoff_longitude   VARCHAR,
          dropoff_latitude    VARCHAR,
          store_and_fwd_flag  VARCHAR,
          gc_distance         DOUBLE,
          trip_duration       INT,
          google_distance     VARCHAR,
          google_duration     VARCHAR
      ) WITH (
          'connector'= 'filesystem',
          'format' = 'csv',
          'path' = '{file_path}'
      )
      """
      return stmt
  
  def main():
  
      source_table_name = "taxi_trip_source"
      source_file_path = "/etc/flink/data"
      
      table_env.execute_sql(create_source_table(source_table_name, source_file_path))
      table_env.sql_query(f'SELECT * FROM {source_table_name} LIMIT 10').execute().print()
  
  if __name__ == "__main__":
      main()

`

But I haven't figured out how to let it read the source file in S3 bucket.
I didn't successfully create the same environment with the terraform code your provide. I just simply create a S3 bucket and uploaded the source file, taxi-trips.csv. With aws_access_key_id, aws_secret_access_key, I can read the source file in S3 bucket as below.
image

In docker container, flink-s3-fs-hadoop-1.17.1.jar is in the folder "/opt/flink/plugins/s3-fs-hadoop"
image

I didn't create IAM user/role for my test, can I let pyflink read/select data in the source file in S3 bucket with aws_access_key_id and aws_secret_access_key? How can I do it?

Thank you so much!

@jaehyeon-kim
Copy link
Owner

Can you try after creating the following environment variables?

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

@waynelxb
Copy link
Author

waynelxb commented Aug 4, 2024

Yes! I have created Environment Variables, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. However, I don't know how to use them in the following pyflink code which is to read data from the source csv file in S3 bucket (specified in the source_file_path below) and also don't know how to use the flink-s3-fs-hadoop-1.17.1.jar is in the folder "/opt/flink/plugins/s3-fs-hadoop".

image

`

from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

def create_source_table(table_name: str, file_path: str):
    stmt = f"""
    CREATE TABLE {table_name} (
        id                  VARCHAR,
        vendor_id           INT,
        pickup_datetime     VARCHAR,
        dropoff_datetime    VARCHAR,
        passenger_count     INT,
        pickup_longitude    VARCHAR,
        pickup_latitude     VARCHAR,
        dropoff_longitude   VARCHAR,
        dropoff_latitude    VARCHAR,
        store_and_fwd_flag  VARCHAR,
        gc_distance         DOUBLE,
        trip_duration       INT,
        google_distance     VARCHAR,
        google_duration     VARCHAR
    ) WITH (
        'connector'= 'filesystem',
        'format' = 'csv',
        'path' = '{file_path}'
    )
    """
    return stmt
def main():
    source_table_name = "taxi_trip_source"
    source_file_path = "s3://lxb-s3-bucket/lxb-taxi-data/"
    
    table_env.execute_sql(create_source_table(source_table_name, source_file_path))
    table_env.sql_query(f'SELECT * FROM {source_table_name} LIMIT 10').execute().print()

if __name__ == "__main__":
    main()

`
The above code returns the following error message
image

@jaehyeon-kim
Copy link
Owner

The error indicates it's an s3 plugin issue. Can you check this doc? https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

@waynelxb
Copy link
Author

waynelxb commented Aug 5, 2024

The error indicates it's an s3 plugin issue. Can you check this doc? https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

I think I might figure it out.
The link mentioned in your early reply says copy /opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar to /opt/flink/plugins/s3-fs-hadoop, then restart flink. However, it doesn't work.
After some research, I find if the jar is copied to flink/lib, then restart flink, the above code in my second post will work.

root@26183775b6ad:/# :cp /opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar /opt/flink/lib
image

Now it works! (Environment Variables, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY have been created and set like the above snapshot in the second post. No need to update /opt/flink/conf/flink-conf.yaml. )
image

But I do not know why adding the jar in a subfolder in "flink/plugins", as described in the flink link page, does not work.

Thank you so much!!!

@jaehyeon-kim
Copy link
Owner

Can you check the using filesystem plugins section of this doc? https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/

@waynelxb
Copy link
Author

waynelxb commented Aug 6, 2024

Can I try it in such way? However, the container cannot start with this line in yml.
image

@jaehyeon-kim
Copy link
Owner

At least, that is what the doc indicates. I haven't tried myself.

@waynelxb
Copy link
Author

waynelxb commented Aug 7, 2024

Totally understand!
What I am confused with is why putting the jar in plugins//s3-fs-hadoop doesn't work for me.
Based on the documentation https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/filesystems/overview/
the jar has to be put in plugins//s3-fs-hadoop.
image

I also tried to have the same jar in lib and plugins//s3-fs-hadoop, it would not work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants