Pythonで実験データ整理プログラム[ファイル探索 CSV読み込み txt出力]

Pythonで実験データ整理プログラム ファイル探索 CSV読み込み txt出力

こんにちは。BOSSです。

以前より作っていた実験データ整理プログラム。
その中でも圧縮・引張の材料試験に関して一つ完成しました!

この結果、今までExcelを使って手で行なっていたデータ整理が劇的に早くなりました。

このプログラムでは主に次のような操作を行なっています。

  • 任意のディレクトリー内のCSVファイルを全て探索

  • CSVファイルを読み込み

  • Numpyを用いた数値計算

  • Pandasに変更後カラム名を付け直してタブ区切りのtxtファイルに出力

それでは完成したプログラムがどういうものなのか
Outputしていきたいと思います。

これまでの経過

Pythonで実験データ整理プログラム[CSVの出力] - 機械学習はじめました。

Python 実験データ整理プログラム[ファイル探索] - 機械学習はじめました。

今までの記事で目標とするところ、そしてファイル探索のプログラムは書いています。
今一度あらためて確認していきます。

圧縮・引張試験

今回作ったプログラムは材料試験である圧縮・引張試験のデータ整理プログラムになります。

引張試験 圧縮試験 材料試験 物性取得 弾性率 引張強度 圧縮強度

直方体の試験片に引張、圧縮の荷重を負荷しその時の荷重と変位から応力とひずみを求めます。
材料試験ですので

  • 引張、圧縮強度

  • 応力-ひずみ線図(S-Sカーブ)

の取得が大きな目的となります。細かい材料力学の話は割愛しますのでこのあたりを参照ください。

2.応力の定義|材料力学

7.ひずみの定義|材料力学

8.応力ひずみ線図|材料力学

プログラムで実現すること

今回の試験データは次のように取得でるものとします。

圧縮試験 試験データ 荷重 変位 CH1からCH4まではそれぞれ荷重と変位を2チャンネルずつ取得して
CH5とCH6はClip guageとよばれる計測機で直接試験片の変位を計測しています。

これらのデータと、事前に測定しておいた試験片寸法から真応力と真ひずみを計算しtab区切りのtxtファイルに出力するところを目指します。

プログラム概要

大きく分けて今回のプログラムは以下の3つの要素に分かれます。

入力部

  • 実験データが入った読み込み元と出力先のパスを入力

  • あとでカットするためデータの余計な部分の行数を入力(上の画像なら14行目まで)

  • 試験片寸法の入力

ファイル探索

  • 読み込み元のディレクトリー内からCSVファイルのみを全て探索

  • 各CSVファイルのパスとファイル名を取得

ファイル読み込み、計算、ファイル出力部

  • 探索したCSVファイルをnumpyで読み込み

  • 入力データから目的の物理量を計算

  • 元データに計算した物理量を追加

  • Pandasを用いてカラム名を付け直しtxtファイルに出力

これらの要素のプログラムを一つずつ見ていきます。

入力部

入力するのは

  • 読み込みデータディレクトリーのパス

  • 出力先ディレクトリーのパス

  • データ以外の情報が記載された行数

  • 試験片の寸法3辺

するとこんな感じす。

def main():
    print('Enter the Input Directory Path\n')
    input_directory_path = input()  # 元CSVデータが入ってるディレクトリーパスを入力
    print('Enter the Output Directory Path\n')
    output_directory_path = input()  # 出力先ディレクトリーパスの入力
    print('Enter skip rows\n')
    rows = int(input())  # スキップするヘッダー行数を指定

    # 試験片ジオメトリー
    print('Enter specimen geometries')
    print('Enter b=\t')
    b = float(input())
    print('Enter h=\t')
    h = float(input())
    print('enter l=\t')
    l = float(input())
    specimen = {'b': b, 'h': h, 'l': l}

入力部はmain()に記述し、すべて

input()

を用いて変数に代入しています。数字は型を変換することを忘れずに。

ファイル探索

次に入力されたディレクトリー以下に存在するすべてのCSVファイルを探索しファイル名とパスを取得します。

そこで以下のようなclassを定義します。

########################################################
# ファイルの走査をしてCSVファイルを検出
# ファイル名とファイルパスを取得するclass
########################################################
class get_path:

    def __init__(self, input_directory_path):
        self.input_directory_path = input_directory_path  # inputディレクトリーのパス

    def get_file_path(self):

        file_dict = {}
        for root, directories, files in os.walk(self.input_directory_path):
            for file in files:
                base, ext = os.path.splitext(file)
                file_path = os.path.join(root, file)

                if ext in ['.csv', '.CSV']:
                    file_dict[file] = file_path

        return file_dict
def __init__(self, input_directory_path):
        self.input_directory_path = input_directory_path  # inputディレクトリーのパス

の部分でコンストラクタを定義しています。ここでは引数はデータが格納されているディレクトリーパスとなります。

そしてos.walkを用いることでファイル名とファイルパス取得します。

 if ext in ['.csv', '.CSV']:
     file_dict[file] = file_path

この部分で拡張子の判定を行なっています。.csvあるいは.CSVとなっている拡張子のファイル名とパスを辞書に格納し戻り値としています。


さて、次はmain()の方にインスタンス生成の記述を追記しておきましょう。

def main():
    print('Enter the Input Directory Path\n')
    input_directory_path = input()  # 元CSVデータが入ってるディレクトリーパスを入力
    print('Enter the Output Directory Path\n')
    output_directory_path = input()  # 出力先ディレクトリーパスの入力
    print('Enter skip rows\n')
    rows = int(input())  # スキップするヘッダー行数を指定

    # 試験片ジオメトリー
    print('Enter specimen geometries')
    print('Enter b=\t')
    b = float(input())
    print('Enter h=\t')
    h = float(input())
    print('enter l=\t')
    l = float(input())
    specimen = {'b': b, 'h': h, 'l': l}


    # get_pathクラスのget_file_path関数を実行
    # ディレクトリー内のCSVを検索、パスとファイル名をそれぞれ別のリストに格納して返ってくる
    directory = get_path(input_directory_path)
    file_dict = directory.get_file_path()

ファイル読み込み、計算、ファイル出力部

次はデータの読み込みと計算、ファイル出力をさせるclassを定義します。

データの読み込みと計算はnumpyで行い、カラム名をつけてtxtファイルに書き出すところはPandasで行います。

########################################################
# 取得したファイルパスからCSVの読み込み
# ここで計算を行い、Dataframeに変換後ラベルをつけて書き出す
########################################################

class new_csv:  
    def __init__(self, file_dict, output_directory_path,rows, specimen):
        self.file_dict = file_dict  # inputディレクトリー内のファイルパスを格納したリスト
        self.output_directory_path = output_directory_path  # outputディレクトリーのパス
        self.rows = rows  # スキップしたい任意のヘッダー行数
        self.specimen = specimen # 試験片寸法

    def output_txt(self):
        for file_name, file_path in self.file_dict.items():
            input_arr = np.loadtxt(file_path, delimiter=",", skiprows=self.rows, encoding='Shift_jis')

            ########################################################
            # 値の計算 clip_guage_average, TrueStress, TrueStrain
            ########################################################
            row, col = np.shape(input_arr)

            Clip_ave = np.reshape(( input_arr[:, 5] + input_arr[:, 6] ) / 2, (row, 1))

            n_stress = np.reshape( input_arr[:, 1] / (self.specimen['b'] * self.specimen['h']), (row, 1))

            n_strain = np.reshape( Clip_ave / self.specimen['l'], (row, 1))

            t_stress = np.reshape( n_stress * (1 + n_strain), (row, 1))

            t_strain = np.reshape( np.log1p(n_strain), (row, 1))

            output_arr = np.hstack((input_arr, Clip_ave, n_stress, n_strain, t_stress, t_strain))

            ########################################################
            # pandas DataFrameに変換
            ########################################################
            output_df = pd.DataFrame(output_arr)
            output_df.columns = ["Time (s)", "Load_int (N)", "Disp_int (mm)", "Load_ext (N)", "Disp_ext (mm)",
                                 "Clip_Disp (mm)", "Clip_Disp (mm)", "Clip_ave (mm)", "n_stress (MPa)", "n_strain",
                                  "t_stress (MPa)", "t_strain (ε)"]

            # ファイル名の変更 txt出力
            output_file_name = file_name[0:-4]
            output_path = os.path.join(self.output_directory_path, output_file_name + '.txt')
            output_df.to_csv(output_path, encoding="Shift_jis", index=False, sep="\t", float_format='%.7e')
class new_csv:  
    def __init__(self, file_dict, output_directory_path,rows, specimen):
        self.file_dict = file_dict  # inputディレクトリー内のファイルパスを格納したリスト
        self.output_directory_path = output_directory_path  # outputディレクトリーのパス
        self.rows = rows  # スキップしたい任意のヘッダー行数
        self.specimen = specimen # 試験片寸法

コンストラクタはこのように定義します。引数は

  • データのファイル名とファイルパス辞書

  • 出力先ディレクトリーのパス

  • 読み込みをskipする行数

  • 試験片の寸法

としています。

    def output_txt(self):
        for file_name, file_path in self.file_dict.items():
            input_arr = np.loadtxt(file_path, delimiter=",", skiprows=self.rows, encoding='Shift_jis')

            ########################################################
            # 値の計算 clip_guage_average, TrueStress, TrueStrain
            ########################################################
            row, col = np.shape(input_arr)

            Clip_ave = np.reshape(( input_arr[:, 5] + input_arr[:, 6] ) / 2, (row, 1))

            n_stress = np.reshape( input_arr[:, 1] / (self.specimen['b'] * self.specimen['h']), (row, 1))

            n_strain = np.reshape( Clip_ave / self.specimen['l'], (row, 1))

            t_stress = np.reshape( n_stress * (1 + n_strain), (row, 1))

            t_strain = np.reshape( np.log1p(n_strain), (row, 1))

            output_arr = np.hstack((input_arr, Clip_ave, n_stress, n_strain, t_stress, t_strain))

まずはデータを一つずつ読み込んでいきます。

np.loadtxt()

を用います。ここで読み込み時にスキップする行数を指定します。

次に読み込んだデータを使ってそれぞれ計算を行なって、読み込んだデータに連結するというプロセスになります。

細かい計算内容は割愛しますが、重要なところが一点。

それはarrayの連結には

np.hstack()

を用いますが、実はarrayのサイズが同じものでないと連結ができません。

ただ計算しただけだと1次元のデータとなり連結ができません。そこで2次元のデータとなるように計算結果の次元を変更しておきます。

それがこの記述。

row, col = np.shape(input_arr)

Clip_ave = np.reshape(( input_arr[:, 5] + input_arr[:, 6] ) / 2, (row, 1))

.
.
.
.

まずは元のデータのサイズを取得しておき行数と列数を変数に代入しておきます。

そして、各データを

np.reshape()

を用いてrow行、1列の2次元のarrayに変形します。

こうすることで

output_arr = np.hstack((input_arr, Clip_ave, n_stress, n_strain, t_stress, t_strain))

と元のデータに計算した結果が横方向に連結されていきます。

そして、できたarrayをpandasに変換し、カラム名をつけて出力します。

           ########################################################
            # pandas DataFrameに変換
            ########################################################
            output_df = pd.DataFrame(output_arr)
            output_df.columns = ["Time (s)", "Load_int (N)", "Disp_int (mm)", "Load_ext (N)", "Disp_ext (mm)",
                                 "Clip_Disp (mm)", "Clip_Disp (mm)", "Clip_ave (mm)", "n_stress (MPa)", "n_strain",
                                  "t_stress (MPa)", "t_strain (ε)"]

            # ファイル名の変更 txt出力
            output_file_name = file_name[0:-4]
            output_path = os.path.join(self.output_directory_path, output_file_name + '.txt')
            output_df.to_csv(output_path, encoding="Shift_jis", index=False, sep="\t", float_format='%.7e')
            output_file_name = file_name[0:-4]
            output_path = os.path.join(self.output_directory_path, output_file_name + '.txt')

この部分でcsvファイルからtxtファイルにするために、拡張子を全てtxtに変更したものをファイル名の変数に代入します。

そして

df.to_csv()

を用いることでファイルの書き出しを行います。

ここではsep(区切り文字)は\t(タブ)とし、またfloat_format(データ形式)は%.7e(指数形式で有効数字7桁)としています。

最後にインスタンス生成の記述をdef main()に追記しておきます。

def main():
    print('Enter the Input Directory Path\n')
    input_directory_path = input()  # 元CSVデータが入ってるディレクトリーパスを入力
    print('Enter the Output Directory Path\n')
    output_directory_path = input()  # 出力先ディレクトリーパスの入力
    print('Enter skip rows\n')
    rows = int(input())  # スキップするヘッダー行数を指定

    # 試験片ジオメトリー
    print('Enter specimen geometries')
    print('Enter b=\t')
    b = float(input())
    print('Enter h=\t')
    h = float(input())
    print('enter l=\t')
    l = float(input())
    specimen = {'b': b, 'h': h, 'l': l}


    # get_pathクラスのget_file_path関数を実行
    # ディレクトリー内のCSVを検索、パスとファイル名をそれぞれ別のリストに格納して返ってくる
    directory = get_path(input_directory_path)
    file_dict = directory.get_file_path()

    # new_csvクラスのcut_header関数を実行
    # パスリストを引数に各ファイルについてヘッダー部の削除、outputディレクトリーに名前を変更して保存
    file = new_csv(file_dict, output_directory_path,rows, specimen)
    file.output_txt()

プログラム全体

最後に出来上がったプログラムを全て載せておきます。

import os
import numpy as np
import pandas as pd

########################################################
# ファイルの走査をしてCSVファイルを検出
# ファイル名とファイルパスを取得するclass
########################################################
class get_path:

    def __init__(self, input_directory_path):
        self.input_directory_path = input_directory_path  # inputディレクトリーのパス

    def get_file_path(self):

        file_dict = {}
        for root, directories, files in os.walk(self.input_directory_path):
            for file in files:
                base, ext = os.path.splitext(file)
                file_path = os.path.join(root, file)

                if ext in ['.csv', '.CSV']:
                    file_dict[file] = file_path

        return file_dict


########################################################
# 取得したファイルパスからCSVの読み込み
# ここで計算を行い、Dataframeに変換後ラベルをつけて書き出す
########################################################

class new_csv:  #numpyでファイルを読み書きしてみる
    def __init__(self, file_dict, output_directory_path,rows, specimen):
        self.file_dict = file_dict  # inputディレクトリー内のファイルパスを格納したリスト
        self.output_directory_path = output_directory_path  # outputディレクトリーのパス
        self.rows = rows  # スキップしたい任意のヘッダー行数
        self.specimen = specimen # 試験片寸法

    def output_txt(self):
        for file_name, file_path in self.file_dict.items():
            input_arr = np.loadtxt(file_path, delimiter=",", skiprows=self.rows, encoding='Shift_jis')

            ########################################################
            # 値の計算 clip_guage_average, TrueStress, TrueStrain
            ########################################################
            row, col = np.shape(input_arr)

            Clip_ave = np.reshape(( input_arr[:, 5] + input_arr[:, 6] ) / 2, (row, 1))

            n_stress = np.reshape( input_arr[:, 1] / (self.specimen['b'] * self.specimen['h']), (row, 1))

            n_strain = np.reshape( Clip_ave / self.specimen['l'], (row, 1))

            t_stress = np.reshape( n_stress * (1 + n_strain), (row, 1))

            t_strain = np.reshape( np.log1p(n_strain), (row, 1))

            output_arr = np.hstack((input_arr, Clip_ave, n_stress, n_strain, t_stress, t_strain))

            ########################################################
            # pandas DataFrameに変換
            ########################################################
            output_df = pd.DataFrame(output_arr)
            output_df.columns = ["Time (s)", "Load_int (N)", "Disp_int (mm)", "Load_ext (N)", "Disp_ext (mm)",
                                 "Clip_Disp (mm)", "Clip_Disp (mm)", "Clip_ave (mm)", "n_stress (MPa)", "n_strain",
                                  "t_stress (MPa)", "t_strain (ε)"]

            # ファイル名の変更 txt出力
            output_file_name = file_name[0:-4]
            output_path = os.path.join(self.output_directory_path, output_file_name + '.txt')
            output_df.to_csv(output_path, encoding="Shift_jis", index=False, sep="\t", float_format='%.7e')


def main():
    print('Enter the Input Directory Path\n')
    input_directory_path = input()  # 元CSVデータが入ってるディレクトリーパスを入力
    print('Enter the Output Directory Path\n')
    output_directory_path = input()  # 出力先ディレクトリーパスの入力
    print('Enter skip rows\n')
    rows = int(input())  # スキップするヘッダー行数を指定

    # 試験片ジオメトリー
    print('Enter specimen geometries')
    print('Enter b=\t')
    b = float(input())
    print('Enter h=\t')
    h = float(input())
    print('enter l=\t')
    l = float(input())
    specimen = {'b': b, 'h': h, 'l': l}


    # get_pathクラスのget_file_path関数を実行
    # ディレクトリー内のCSVを検索、パスとファイル名をそれぞれ別のリストに格納して返ってくる
    directory = get_path(input_directory_path)
    file_dict = directory.get_file_path()

    # new_csvクラスのcut_header関数を実行
    # パスリストを引数に各ファイルについてヘッダー部の削除、outputディレクトリーに名前を変更して保存
    file = new_csv(file_dict, output_directory_path,rows, specimen)
    file.output_txt()


if __name__ == '__main__':
    main()