데이터과학 삼학년

GCP Dataflow를 이용한 텍스트 전처리 (feat. universal sentence encoder) 본문

GCP

GCP Dataflow를 이용한 텍스트 전처리 (feat. universal sentence encoder)

Dan-k 2020. 8. 19. 17:23
반응형

Dataflow를 이용한 텍스트 전처리 (feat. universal sentence encoder)

전처리 과정

  1. 형태소 분석 : raw text data를  형태소단위 token 및 stopword 제거

    1. 정제된 raw data 는 GCS 및 Bigquery table 로드

  2. Transfer learning : 정제된 raw data를 universal sentence encoder multilingual을 이용하여 벡터화 (512 dimensional)

  3. Extract : 벡터화 된 데이터를 GCS 및 Bigquery table 로드

예시)

 

전처리 세부 과정

  1. 형태소 분석

    1. stopword 제거

    2. 정제된 한글, 영어 데이터는 GCS, BQ table로 내보냄

    1. 한글 : mecab, 영어 : nltk 사용 

    2. 그 외 언어 형태소 분석 미적용

  2. Transfer learning

    1. universal sentence encoder multilingual (cnn) 모델을 이용하여 벡터화

    2. tensorflow transform 을 이용하여 병렬 처리

  3. Extract

    1. GCS

    2. BQ table로 적용

 

Dataflow Pipeline

  • apache beam을 기반으로 GCP dataflow 가 만들어지고 병렬처리로 돌아감 (ETL 과정, Extract, Transform, Load)

  • runner 설정

    • DirectRunner : 로컬에서 실행  //  DataflowRunner : Dataflow에서 실행

  • python 코드로 beam.Pipeline 문법 간략 소개

    • <이어서 실행시킬 객체> | ‘<stage명>’ >> [실행시킬 함수]

  • 간략한 예제 코드

    • pipeline.py

import os
from time import time

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

import tensorflow as tf
import tensorflow_text
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.beam.tft_beam_io import transform_fn_io


#====== options =========
pipeline_options = PipelineOptions(None)
DEST_DIR = "gs://data/"
job_name = 'dataflow-use'
options = {
	'runner':'DataflowRunner',
    'num_workers' : 10,
    'machine_type' : 'n1-highmem-16',
    'staging_location': DEST_DIR + 'staging',
    'temp_location': DEST_DIR + 'tmp',
    'job_name': job_name,
    'project': 'project_id',
    'region' : 'us-central1',
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True ,  
    'save_main_session': False,
    'service_account_email' : 'user@project-id.iam.gserviceaccount.com',
    'setup_file' : './setup.py'
    
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)

#====== preprocess_for_embedding =========
encoder = None

def preprocess_fn(input_features):
    import tensorflow_transform as tft
    embedding = tft.apply_function(embed_text, input_features['data']) # apply_function은 더 이상 필요(과거 TensorFlow 그래프를 검사하는 tf.Transform 기능의 제한으로 인해 필요)
#     embedding = embed_text(input_features['data']) 
    output_features = {
    'user': input_features['user'],
    'logkey': input_features['logkey'],
    'data': input_features['data'],
    'embedding': embedding
    }
    return output_features

def embed_text(text):
    import tensorflow_hub as hub
    import tensorflow_text
    global encoder
    use_url = "https://tfhub.dev/google/universal-sentence-encoder-multilingual/1" ## hub.Module v1으로 해야 실행됨 : session을 쓸 수 있도록 graph형태로 output이 나와야함
    if encoder is None:
        encoder = hub.Module(use_url)
    outputs = encoder(text)
    
    return outputs
    
 #====== metadata 구성 =========
def get_metadata():
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.tf_metadata import dataset_metadata

    metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec({
        'pid': tf.io.FixedLenFeature([], tf.string),
        'logkey': tf.io.FixedLenFeature([], tf.string),
        'data': tf.io.FixedLenFeature([], tf.string),
    }))
    return metadata


sql="""
SELECT * FROM `project.dataset.data_preprocess` LIMIT 1000
"""

#====== load Bigquey =======
def to_bq_row(entry):
    # might not need to round...
    entry['embedding'] = [round(float(e), 3) for e in entry['embedding']]
    return entry

def get_bigquery_schema():
    """
    Returns a bigquery schema.
    """
    from apache_beam.io.gcp.internal.clients import bigquery

    table_schema = bigquery.TableSchema()
    columns = (('user', 'string', 'nullable'),
             ('logkey', 'string', 'nullable'),
             ('data', 'string', 'nullable'),
             ('embedding', 'float', 'repeated'))

    for column in columns:
        column_schema = bigquery.TableFieldSchema()
        column_schema.name = column[0]
        column_schema.type = column[1]
        column_schema.mode = column[2]
        table_schema.fields.append(column_schema)

    return table_schema
    

#====== pipeline =========
## tfx 버전
transform_temp_dir = DEST_DIR + '/transform_temp_dir'

with tft_beam.Context(transform_temp_dir):
    pipeline = beam.Pipeline('DataflowRunner', options=opts)   
    raw_data =(
        pipeline
        | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(project='nm-dataintelligence', query=sql, use_standard_sql=True))
    )
    dataset  = (raw_data, get_metadata())
  
    result1 = (
        raw_data
        |  'Write raw data to gcs' >> WriteToText(DEST_DIR + job_name + '/raw_output'+ '/output',file_name_suffix='.txt')
    )
#     embeddings_dataset, _ = (
#         dataset 
#         | 'Embedding data' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)  ### input 이 tensor로 들어가야함(기존 use-multilingual은 tensor가 아닌 text를 input으로 받게 되어 있음
#     )
    
    transform_fn = (
        dataset
        | 'Analyse dataset' >> tft_beam.AnalyzeDataset(preprocess_fn)
    )

    transformed_data_with_meta = (
        (dataset, transform_fn)
        | 'Transform dataset' >> tft_beam.TransformDataset()
    )
    
    transformed_data, transformed_metadata = transformed_data_with_meta

    transform_fn | 'Export Transform Fn' >> transform_fn_io.WriteTransformFn(
        DEST_DIR + job_name + '/transform_export_dir')
    
    result2 = (
        transformed_data
        | 'Convert to Insertable data' >> beam.Map(to_bq_row)
        | 'Write to BigQuery table' >> beam.io.WriteToBigQuery(
            project='project_id',
            dataset='dataset',
            table='dataflow_preprocess',
            schema=get_bigquery_schema(),
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    )

### pipeline 실행 
result = pipeline.run()
result.wait_until_finish()
  • setup.py

    • universal sentence encoder multilingual 모델을 사용할때 반드시 sentencepiece 와 tf-sentencepiece 모두 설치해야함

import setuptools

REQUIRED_PACKAGES = [
    'apache-beam[gcp]==2.22.0', ##dataflow Python worker version 2.22.0. 무조건 이렇게 설치해야함
    'numpy==1.18.5',
    'tensorflow==1.13.0',
    'tensorflow-text==1.15.0',
    'tensorflow-hub==0.7.0',
    'tensorflow-transform==0.15.0',
    'tf-sentencepiece==0.1.90',
    'sentencepiece==0.1.91'
]

setuptools.setup(
    name='use_pipeline',
    version='0.0.1',
    author='bdh',
    author_email='user@doamin.com',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages()
)

 

  • 결과

이슈

  • tf 2.x 대 구성된 universal sentence encoder가 tfx에서 실행안됨 (graph 관련 error)

    • 동일한 문제를 갖고 있는 개발자 대다수

      • tensorflow transform git에 이슈 제기됨

https://github.com/tensorflow/transform/issues/160

  • tf 1.x 대 구성한 universal sentence encoder 는 잘 실행됨 (https://tfhub.dev/google/universal-sentence-encoder/2)

    • 그러나, tf1.x 대 구성된 use-multilingual 은 작동 안함

      • sentencepiece 관련 에러

      • Graph ops missing from the python registry ({'SentencepieceEncodeSparse'}) are also absent from the c++ registry.

      • sentencepiece 라이브 관련 에러로 sentence library 구성해줌 (맞는 버전이 따로 있음...)

        • sentencepiece 는 unsupervised 모듈로 soynlp와 비슷하게 corpus를 학습해 bqe 알고리즘에 의해 문장 분리

 

해결

  • tf 2 버전의 transfer learning의 버그로 tf1 버전의 transfer learning으로 dataflow 구성

  • use / use-multilingual 모델 두개 정상작동 확인 → bigquery table 적재 완료

  • 소요시간 Direct Runner

      • CPU times: user 2min 5s, sys: 1min 30s, total: 3min 36s

      • Wall time: 4min 54s

  • embedding 결과 https://console.cloud.google.com/bigquery?project=nm-dataintelligence&p=nm-dataintelligence&d=pan&t=dataflow_preprocess_use_multilingual&page=table

  • dataflow 환경설정 매우 중요

    • 'apache-beam[gcp]==2.22.0', ##dataflow Python worker version 2.22.0. 무조건 이렇게 설치해야함

      • 버전 명시안하면 2.23.0이 설치되어 error 발생 (python worker 미지원)

    • universal sentence encoder multilingual 모델을 사용할때 반드시 sentencepiece 와 tf-sentencepiece 모두 설치해야함

파이프라인 spec 관련 설정 참고

 

파이프라인 실행 매개변수 지정  |  Cloud Dataflow  |  Google Cloud

Apache Beam 프로그램이 파이프라인을 구성하면 사용자는 파이프라인을 실행해야 합니다. 파이프라인 실행은 Apache Beam 프로그램 실행과는 별개입니다. Apache Beam 프로그램에서 파이프라인이 생성되

cloud.google.com

 

728x90
반응형
LIST
Comments